Compare commits

...

83 Commits

Author SHA1 Message Date
Erik Johnston
15844040c2 Bump version and changelog 2016-03-23 16:57:41 +00:00
Erik Johnston
7a3815b372 Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.14.0 2016-03-23 16:55:29 +00:00
Erik Johnston
647b041d1a Merge pull request #666 from matrix-org/erikj/intern
Intern lots of strings
2016-03-23 16:54:59 +00:00
Erik Johnston
8122ad7bab Simplify intern_dict 2016-03-23 16:41:54 +00:00
Erik Johnston
2f0180b09e Don't bother interning keys that are already interned 2016-03-23 16:29:46 +00:00
Erik Johnston
acdfef7b14 Intern all the things 2016-03-23 16:25:54 +00:00
Erik Johnston
f96526ffc2 Intern sender, event_id and room_id in events 2016-03-23 15:04:11 +00:00
Erik Johnston
fe9794706a Intern type and state_key on events 2016-03-23 14:58:08 +00:00
Erik Johnston
75daede92f String intern 2016-03-23 14:53:53 +00:00
Erik Johnston
fbdeb1778d Merge pull request #664 from matrix-org/erikj/public_room_list
Don't require alias in public room list.
2016-03-23 14:42:01 +00:00
Erik Johnston
b275765545 Comment about weird SQL 2016-03-23 14:15:32 +00:00
Erik Johnston
0c1a27b787 SQLite and postgres doesn't share a true literal 2016-03-23 14:10:49 +00:00
Erik Johnston
84afeb41f3 Ensure all old public rooms have aliases 2016-03-23 13:59:34 +00:00
Erik Johnston
b2802a1351 Ensure published rooms have public join rules 2016-03-23 13:59:31 +00:00
Erik Johnston
0677fc1c4e Comment 2016-03-23 13:25:22 +00:00
Erik Johnston
2749da542c Merge pull request #663 from matrix-org/erikj/invite_for_user
Make get_invites return RoomsForUser
2016-03-23 13:19:26 +00:00
Erik Johnston
e14baa7a3b Merge pull request #665 from matrix-org/erikj/dont_cache_events
Only cache events in the event cache
2016-03-23 13:19:12 +00:00
Erik Johnston
0e7363e0b3 Merge pull request #662 from matrix-org/erikj/state_cache
Make StateHandler._state_cache only store event_ids.
2016-03-23 11:43:03 +00:00
Erik Johnston
d87a846ebc Don't cache events in get_recent_events_for_room 2016-03-23 11:42:50 +00:00
Erik Johnston
8b0dfc9fc4 Don't cache events in get_current_state_for_key 2016-03-23 11:42:17 +00:00
Erik Johnston
34473a9c7f Don't require alias in public room list.
Rooms now no longer require an alias to be published.

Also, changes the way we pull out state of each room to not require
fetching all state events.
2016-03-23 10:42:19 +00:00
Erik Johnston
b6507869cd Make get_invites return RoomsForUser 2016-03-23 10:32:10 +00:00
Erik Johnston
9e2e994395 Reduce cache size 2016-03-23 09:28:07 +00:00
Erik Johnston
d531ebcb57 Key StateHandler._state_cache off of state groups 2016-03-22 18:02:36 +00:00
Erik Johnston
c4a8cbd15a Make LruCache use a dedicated _Node class 2016-03-22 16:06:21 +00:00
Erik Johnston
99f929f36b Make StateHandler._state_cache only store event_ids. 2016-03-22 16:06:04 +00:00
Erik Johnston
d787e41b20 Measure StateHandler._resolve_events 2016-03-22 14:44:48 +00:00
Erik Johnston
6cf0ba1466 Bump get_unread_event_push_actions_by_room_for_user cache 2016-03-22 14:18:21 +00:00
Erik Johnston
76d18a5776 Bump get_aliases_for_room cache 2016-03-22 14:08:13 +00:00
Mark Haines
cd9ba1ed89 Merge pull request #661 from matrix-org/markjh/member_count
Fix membership count for BulkPushRuleEvaluator
2016-03-22 13:59:56 +00:00
Mark Haines
5defb25ac6 Use get_users_in_room to count the number of room members rather than using read_receipts 2016-03-22 13:52:45 +00:00
Erik Johnston
fa2f96c2e3 Merge pull request #660 from matrix-org/erikj/state_cache
Don't cache events in _state_group_cache
2016-03-22 13:12:06 +00:00
Erik Johnston
f93304e77f Merge pull request #659 from matrix-org/erikj/state_cache_factor
Make stateGroupCache honour CACHE_SIZE_FACTOR
2016-03-22 13:12:01 +00:00
Erik Johnston
2c86187a1b Don't cache events in _state_group_cache
Instead, simply cache the event ids, relying on the event cache to cache
the actual events.

The problem was that while the state groups cache was limited in the
number of groups it could hold, each individual group could consist of
thousands of events.
2016-03-22 12:00:09 +00:00
Erik Johnston
d6ac752538 Merge pull request #657 from matrix-org/erikj/roomlist
Add published room list edit API
2016-03-22 11:57:39 +00:00
Erik Johnston
97785bfc0f Doc string 2016-03-22 10:41:44 +00:00
Erik Johnston
b591277620 Make stateGroupCache honour CACHE_SIZE_FACTOR 2016-03-22 10:32:50 +00:00
Mark Haines
63137bb901 Merge pull request #658 from matrix-org/markjh/cleanup
Remove some unused parameters from persist_event
2016-03-22 10:18:39 +00:00
Matthew Hodgson
d3654694d0 an invalide is something else... 2016-03-22 00:52:31 +00:00
Mark Haines
5244c0b48e Remove unused backfilled parameter from persist_event 2016-03-21 18:06:08 +00:00
Erik Johnston
3e7fac0d56 Add published room list edit API 2016-03-21 15:06:07 +00:00
Mark Haines
58f8226c7f remove unused current_state variable from on_receive_pdu 2016-03-21 14:20:34 +00:00
Erik Johnston
e4054abfdc Merge pull request #655 from matrix-org/erikj/edu_yield
Yield on EDU handling
2016-03-18 15:23:20 +00:00
Erik Johnston
9adf0e92bc Catch exceptions from EDU handling 2016-03-18 15:12:50 +00:00
Erik Johnston
1660145a08 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/edu_yield 2016-03-18 15:02:43 +00:00
Erik Johnston
7f79a6405b Merge pull request #656 from matrix-org/erikj/get_event
Dedupe requested event list in _get_events
2016-03-18 14:53:13 +00:00
Erik Johnston
8595ff7842 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/get_event 2016-03-18 14:47:19 +00:00
Erik Johnston
58e207cd77 Don't assume existence of event_id in __str__ 2016-03-18 14:31:44 +00:00
Erik Johnston
67ed8065db Dedupe requested event list in _get_events 2016-03-18 14:31:31 +00:00
Erik Johnston
916227b4df Merge pull request #652 from matrix-org/erikj/delete_alias
Update aliases event after deletion
2016-03-18 14:02:46 +00:00
Erik Johnston
3c5f25507b Yield on EDU handling 2016-03-18 13:55:16 +00:00
Erik Johnston
56aa4e7a9a Check canonical alias event exists 2016-03-17 15:24:19 +00:00
David Baker
384ee6eafb Merge pull request #650 from matrix-org/dbkr/register_idempotent_with_username
Make registration idempotent, part 2
2016-03-17 14:34:08 +00:00
Mark Haines
2ec3460967 Merge pull request #651 from matrix-org/markjh/unused_II
Remove dead code left over from presence changes
2016-03-17 13:37:53 +00:00
Mark Haines
7a38612620 Remove another unused function from presence 2016-03-17 11:54:19 +00:00
Erik Johnston
2cd9260500 Update aliases event after deletion
Attempt to update the appropriate `m.room.aliases` event after deleting
an alias. This may fail due to the deleter not being in the room.

Will also check if the canonical alias of the event is set to the
deleted alias, and if so will attempt to delete it.
2016-03-17 11:42:00 +00:00
Mark Haines
673c96ce97 Remove dead code left over from presence changes 2016-03-17 11:03:47 +00:00
Mark Haines
4ebb688f4f Add option to definitions.py to search for functions a function refers to 2016-03-17 10:59:12 +00:00
David Baker
5670205e2a remove debug logging 2016-03-16 19:49:42 +00:00
David Baker
f984decd66 Unused import 2016-03-16 19:40:48 +00:00
David Baker
a7daa5ae13 Make registration idempotent, part 2: be idempotent if the client specifies a username. 2016-03-16 19:36:57 +00:00
David Baker
48b2e853a8 Merge pull request #649 from matrix-org/dbkr/idempotent_registration
Make registration idempotent
2016-03-16 16:35:45 +00:00
David Baker
b58d10a875 pep8 2016-03-16 16:22:20 +00:00
David Baker
3ee7d7dc7f time_msec() 2016-03-16 16:18:52 +00:00
David Baker
3176aebf9d string with symbols is a bit too symboly. 2016-03-16 15:55:49 +00:00
David Baker
9671e6750c Replace other time.time(). 2016-03-16 15:51:28 +00:00
David Baker
742b6c6d15 Use hs get_clock instead of time.time() 2016-03-16 15:42:35 +00:00
David Baker
f5e90422f5 take extra return val from check_auth in account too 2016-03-16 14:33:19 +00:00
David Baker
ff7d3dc3a0 Fix tests 2016-03-16 14:25:14 +00:00
David Baker
99797947aa pep8 & remove debug logging 2016-03-16 12:51:34 +00:00
David Baker
c12b9d719a Make registration idempotent: if you specify the same session, make it give you an access token for the user that was registered on previous uses of that session. Tweak the UI auth layer to not delete sessions when their auth has completed and hence expire themn so they don't hang around until server restart. Allow server-side data to be associated with UI auth sessions. 2016-03-16 11:56:24 +00:00
Mark Haines
add89a03a6 Merge pull request #647 from matrix-org/markjh/pushers_stream
Add replication stream for pushers
2016-03-16 11:26:22 +00:00
Richard van der Hoff
467c1599c9 Merge pull request #648 from matrix-org/rav/password_reset
Password reset docs and script
2016-03-16 10:47:33 +00:00
Richard van der Hoff
660ae8e0f3 Clarify that we do have reset functionality via the IS 2016-03-16 10:40:38 +00:00
Mark Haines
ba660ecde2 Add a comment to offer a hint to an explanation for why we have a unique constraint on (app_id, pushkey, user_id) 2016-03-16 10:35:00 +00:00
Richard van der Hoff
a877209c8b Password reset docs and script
Replace the bash/perl gen_password script with a python one, and write a note
on how to use it.
2016-03-16 09:45:37 +00:00
Mark Haines
ee32d622ce Fix a couple of errors when deleting pushers 2016-03-15 17:47:36 +00:00
Mark Haines
6df1c79c22 Merge branch 'develop' into markjh/pushers_stream 2016-03-15 17:44:39 +00:00
Mark Haines
12904932c4 Hook up adding a pusher to the notifier for replication. 2016-03-15 17:42:03 +00:00
Mark Haines
b6e8420aee Add replication stream for pushers 2016-03-15 17:33:10 +00:00
Erik Johnston
91779b49c4 Merge pull request #646 from matrix-org/erikj/reject_invite_federation
Persist rejection of invites over federation
2016-03-15 15:03:00 +00:00
Erik Johnston
e5f0e58931 Remove needless PreserveLoggingContext 2016-03-15 13:48:40 +00:00
Erik Johnston
9e982750ee Persist rejection of invites over federation 2016-03-15 13:24:31 +00:00
47 changed files with 943 additions and 473 deletions

View File

@@ -1,3 +1,24 @@
Changes in synapse v0.14.0-rc2 (2016-03-23)
===========================================
Features:
* Add published room list API (PR #657)
Changes:
* Change various caches to consume less memory (PR #656, #658, #660, #662,
#663, #665)
* Allow rooms to be published without requiring an alias (PR #664)
* Intern common strings in caches to reduce memory footprint (#666)
Bug fixes:
* Fix reject invites over federation (PR #646)
* Fix bug where registration was not idempotent (PR #649)
* Update aliases event after deleting aliases (PR #652)
* Fix unread notification count, which was sometimes wrong (PR #661)
Changes in synapse v0.14.0-rc1 (2016-03-14)
===========================================

View File

@@ -525,7 +525,6 @@ Logging In To An Existing Account
Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into
the form and click the Login button.
Identity Servers
================
@@ -545,6 +544,26 @@ as the primary means of identity and E2E encryption is not complete. As such,
we are running a single identity server (https://matrix.org) at the current
time.
Password reset
==============
If a user has registered an email address to their account using an identity
server, they can request a password-reset token via clients such as Vector.
A manual password reset can be done via direct database access as follows.
First calculate the hash of the new password:
$ source ~/.synapse/bin/activate
$ ./scripts/hash_password
Password:
Confirm password:
$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Then update the `users` table in the database:
UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
WHERE name='@test:test.com';
Where's the spec?!
==================

View File

@@ -86,9 +86,12 @@ def used_names(prefix, item, defs, names):
for name, funcs in defs.get('class', {}).items():
used_names(prefix + name + ".", name, funcs, names)
path = prefix.rstrip('.')
for used in defs.get('uses', ()):
if used in names:
names[used].setdefault('used', {}).setdefault(item, []).append(prefix.rstrip('.'))
if item:
names[item].setdefault('uses', []).append(used)
names[used].setdefault('used', {}).setdefault(item, []).append(path)
if __name__ == '__main__':
@@ -113,6 +116,10 @@ if __name__ == '__main__':
"--referrers", default=0, type=int,
help="Include referrers up to the given depth"
)
parser.add_argument(
"--referred", default=0, type=int,
help="Include referred down to the given depth"
)
parser.add_argument(
"--format", default="yaml",
help="Output format, one of 'yaml' or 'dot'"
@@ -161,6 +168,20 @@ if __name__ == '__main__':
continue
result[name] = definition
referred_depth = args.referred
referred = set()
while referred_depth:
referred_depth -= 1
for entry in result.values():
for uses in entry.get("uses", ()):
referred.add(uses)
for name, definition in names.items():
if not name in referred:
continue
if ignore and any(pattern.match(name) for pattern in ignore):
continue
result[name] = definition
if args.format == 'yaml':
yaml.dump(result, sys.stdout, default_flow_style=False)
elif args.format == 'dot':

View File

@@ -1 +0,0 @@
perl -MCrypt::Random -MCrypt::Eksblowfish::Bcrypt -e 'print Crypt::Eksblowfish::Bcrypt::bcrypt("secret", "\$2\$12\$" . Crypt::Eksblowfish::Bcrypt::en_base64(Crypt::Random::makerandom_octet(Length=>16)))."\n"'

39
scripts/hash_password Executable file
View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python
import argparse
import bcrypt
import getpass
bcrypt_rounds=12
def prompt_for_pass():
password = getpass.getpass("Password: ")
if not password:
raise Exception("Password cannot be blank.")
confirm_password = getpass.getpass("Confirm password: ")
if password != confirm_password:
raise Exception("Passwords do not match.")
return password
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Calculate the hash of a new password, so that passwords"
" can be reset")
parser.add_argument(
"-p", "--password",
default=None,
help="New password for user. Will prompt if omitted.",
)
args = parser.parse_args()
password = args.password
if not password:
password = prompt_for_pass()
print bcrypt.hashpw(password, bcrypt.gensalt(bcrypt_rounds))

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.14.0-rc1"
__version__ = "0.14.0-rc2"

View File

@@ -814,17 +814,16 @@ class Auth(object):
return auth_ids
@log_function
def _can_send_event(self, event, auth_events):
def _get_send_level(self, etype, state_key, auth_events):
key = (EventTypes.PowerLevels, "", )
send_level_event = auth_events.get(key)
send_level = None
if send_level_event:
send_level = send_level_event.content.get("events", {}).get(
event.type
etype
)
if send_level is None:
if hasattr(event, "state_key"):
if state_key is not None:
send_level = send_level_event.content.get(
"state_default", 50
)
@@ -838,6 +837,13 @@ class Auth(object):
else:
send_level = 0
return send_level
@log_function
def _can_send_event(self, event, auth_events):
send_level = self._get_send_level(
event.type, event.get("state_key", None), auth_events
)
user_level = self._get_user_power_level(event.user_id, auth_events)
if user_level < send_level:
@@ -982,3 +988,43 @@ class Auth(object):
"You don't have permission to add ops level greater "
"than your own"
)
@defer.inlineCallbacks
def check_can_change_room_list(self, room_id, user):
"""Check if the user is allowed to edit the room's entry in the
published room list.
Args:
room_id (str)
user (UserID)
"""
is_admin = yield self.is_server_admin(user)
if is_admin:
defer.returnValue(True)
user_id = user.to_string()
yield self.check_joined_room(room_id, user_id)
# We currently require the user is a "moderator" in the room. We do this
# by checking if they would (theoretically) be able to change the
# m.room.aliases events
power_level_event = yield self.state.get_current_state(
room_id, EventTypes.PowerLevels, ""
)
auth_events = {}
if power_level_event:
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = self._get_send_level(
EventTypes.Aliases, "", auth_events
)
user_level = self._get_user_power_level(user_id, auth_events)
if user_level < send_level:
raise AuthError(
403,
"This server requires you to be a moderator in the room to"
" edit its room list entry"
)

View File

@@ -14,6 +14,7 @@
# limitations under the License.
from synapse.util.frozenutils import freeze
from synapse.util.caches import intern_dict
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
@@ -140,6 +141,10 @@ class FrozenEvent(EventBase):
unsigned = dict(event_dict.pop("unsigned", {}))
# We intern these strings because they turn up a lot (especially when
# caching).
event_dict = intern_dict(event_dict)
if USE_FROZEN_DICTS:
frozen_dict = freeze(event_dict)
else:
@@ -168,5 +173,7 @@ class FrozenEvent(EventBase):
def __repr__(self):
return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % (
self.event_id, self.type, self.get("state_key", None),
self.get("event_id", None),
self.get("type", None),
self.get("state_key", None),
)

View File

@@ -418,6 +418,7 @@ class FederationClient(FederationBase):
"Failed to make_%s via %s: %s",
membership, destination, e.message
)
raise
raise RuntimeError("Failed to send to any server.")

View File

@@ -137,8 +137,8 @@ class FederationServer(FederationBase):
logger.exception("Failed to handle PDU")
if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]:
self.received_edu(
for edu in (Edu(**x) for x in transaction.edus):
yield self.received_edu(
transaction.origin,
edu.edu_type,
edu.content
@@ -161,11 +161,17 @@ class FederationServer(FederationBase):
)
defer.returnValue((200, response))
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
if edu_type in self.edu_handlers:
self.edu_handlers[edu_type](origin, content)
try:
yield self.edu_handlers[edu_type](origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
logger.exception("Failed to handle edu %r", edu_type, e)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
@@ -525,7 +531,6 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu(
origin,
pdu,
backfilled=False,
state=state,
auth_chain=auth_chain,
)

View File

@@ -175,7 +175,7 @@ class BaseFederationServlet(object):
class FederationSendServlet(BaseFederationServlet):
PATH = "/send/([^/]*)/"
PATH = "/send/(?P<transaction_id>[^/]*)/"
def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__(
@@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet):
class FederationEventServlet(BaseFederationServlet):
PATH = "/event/([^/]*)/"
PATH = "/event/(?P<event_id>[^/]*)/"
# This is when someone asks for a data item for a given server data_id pair.
def on_GET(self, origin, content, query, event_id):
@@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet):
class FederationStateServlet(BaseFederationServlet):
PATH = "/state/([^/]*)/"
PATH = "/state/(?P<context>[^/]*)/"
# This is when someone asks for all data for a given context.
def on_GET(self, origin, content, query, context):
@@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet):
class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/([^/]*)/"
PATH = "/backfill/(?P<context>[^/]*)/"
def on_GET(self, origin, content, query, context):
versions = query["v"]
@@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet):
class FederationQueryServlet(BaseFederationServlet):
PATH = "/query/([^/]*)"
PATH = "/query/(?P<query_type>[^/]*)"
# This is when we receive a server-server Query
def on_GET(self, origin, content, query, query_type):
@@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet):
class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/([^/]*)/([^/]*)"
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
@@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
class FederationMakeLeaveServlet(BaseFederationServlet):
PATH = "/make_leave/([^/]*)/([^/]*)"
PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
@@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/([^/]*)/([^/]*)"
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id, txid):
@@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet):
class FederationEventAuthServlet(BaseFederationServlet):
PATH = "/event_auth/([^/]*)/([^/]*)"
PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
def on_GET(self, origin, content, query, context, event_id):
return self.handler.on_event_auth(origin, context, event_id)
class FederationSendJoinServlet(BaseFederationServlet):
PATH = "/send_join/([^/]*)/([^/]*)"
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id):
@@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
class FederationInviteServlet(BaseFederationServlet):
PATH = "/invite/([^/]*)/([^/]*)"
PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id):
@@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet):
class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
PATH = "/exchange_third_party_invite/([^/]*)"
PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id):
@@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet):
class FederationQueryAuthServlet(BaseFederationServlet):
PATH = "/query_auth/([^/]*)/([^/]*)"
PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, context, event_id):
@@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
class FederationGetMissingEventsServlet(BaseFederationServlet):
# TODO(paul): Why does this path alone end with "/?" optional?
PATH = "/get_missing_events/([^/]*)/?"
PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, room_id):

View File

@@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
class AuthHandler(BaseHandler):
SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
def __init__(self, hs):
super(AuthHandler, self).__init__(hs)
@@ -66,15 +67,18 @@ class AuthHandler(BaseHandler):
'auth' key: this method prompts for auth if none is sent.
clientip (str): The IP address of the client.
Returns:
A tuple of (authed, dict, dict) where authed is true if the client
has successfully completed an auth flow. If it is true, the first
dict contains the authenticated credentials of each stage.
A tuple of (authed, dict, dict, session_id) where authed is true if
the client has successfully completed an auth flow. If it is true
the first dict contains the authenticated credentials of each stage.
If authed is false, the first dictionary is the server response to
the login request and should be passed back to the client.
In either case, the second dict contains the parameters for this
request (which may have been given only in a previous call).
session_id is the ID of this session, either passed in by the client
or assigned by the call to check_auth
"""
authdict = None
@@ -103,7 +107,10 @@ class AuthHandler(BaseHandler):
if not authdict:
defer.returnValue(
(False, self._auth_dict_for_flows(flows, session), clientdict)
(
False, self._auth_dict_for_flows(flows, session),
clientdict, session['id']
)
)
if 'creds' not in session:
@@ -122,12 +129,11 @@ class AuthHandler(BaseHandler):
for f in flows:
if len(set(f) - set(creds.keys())) == 0:
logger.info("Auth completed with creds: %r", creds)
self._remove_session(session)
defer.returnValue((True, creds, clientdict))
defer.returnValue((True, creds, clientdict, session['id']))
ret = self._auth_dict_for_flows(flows, session)
ret['completed'] = creds.keys()
defer.returnValue((False, ret, clientdict))
defer.returnValue((False, ret, clientdict, session['id']))
@defer.inlineCallbacks
def add_oob_auth(self, stagetype, authdict, clientip):
@@ -154,6 +160,43 @@ class AuthHandler(BaseHandler):
defer.returnValue(True)
defer.returnValue(False)
def get_session_id(self, clientdict):
"""
Gets the session ID for a client given the client dictionary
:param clientdict: The dictionary sent by the client in the request
:return: The string session ID the client sent. If the client did not
send a session ID, returns None.
"""
sid = None
if clientdict and 'auth' in clientdict:
authdict = clientdict['auth']
if 'session' in authdict:
sid = authdict['session']
return sid
def set_session_data(self, session_id, key, value):
"""
Store a key-value pair into the sessions data associated with this
request. This data is stored server-side and cannot be modified by
the client.
:param session_id: (string) The ID of this session as returned from check_auth
:param key: (string) The key to store the data under
:param value: (any) The data to store
"""
sess = self._get_session_info(session_id)
sess.setdefault('serverdict', {})[key] = value
self._save_session(sess)
def get_session_data(self, session_id, key, default=None):
"""
Retrieve data stored with set_session_data
:param session_id: (string) The ID of this session as returned from check_auth
:param key: (string) The key to store the data under
:param default: (any) Value to return if the key has not been set
"""
sess = self._get_session_info(session_id)
return sess.setdefault('serverdict', {}).get(key, default)
@defer.inlineCallbacks
def _check_password_auth(self, authdict, _):
if "user" not in authdict or "password" not in authdict:
@@ -455,11 +498,18 @@ class AuthHandler(BaseHandler):
def _save_session(self, session):
# TODO: Persistent storage
logger.debug("Saving session %s", session)
session["last_used"] = self.hs.get_clock().time_msec()
self.sessions[session["id"]] = session
self._prune_sessions()
def _remove_session(self, session):
logger.debug("Removing session %s", session)
del self.sessions[session["id"]]
def _prune_sessions(self):
for sid, sess in self.sessions.items():
last_used = 0
if 'last_used' in sess:
last_used = sess['last_used']
now = self.hs.get_clock().time_msec()
if last_used < now - AuthHandler.SESSION_EXPIRE_MS:
del self.sessions[sid]
def hash(self, password):
"""Computes a secure hash of password.

View File

@@ -32,6 +32,8 @@ class DirectoryHandler(BaseHandler):
def __init__(self, hs):
super(DirectoryHandler, self).__init__(hs)
self.state = hs.get_state_handler()
self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
"directory", self.on_directory_query
@@ -93,7 +95,7 @@ class DirectoryHandler(BaseHandler):
yield self._create_association(room_alias, room_id, servers)
@defer.inlineCallbacks
def delete_association(self, user_id, room_alias):
def delete_association(self, requester, user_id, room_alias):
# association deletion for human users
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
@@ -112,7 +114,25 @@ class DirectoryHandler(BaseHandler):
errcode=Codes.EXCLUSIVE
)
yield self._delete_association(room_alias)
room_id = yield self._delete_association(room_alias)
try:
yield self.send_room_alias_update_event(
requester,
requester.user.to_string(),
room_id
)
yield self._update_canonical_alias(
requester,
requester.user.to_string(),
room_id,
room_alias,
)
except AuthError as e:
logger.info("Failed to update alias events: %s", e)
defer.returnValue(room_id)
@defer.inlineCallbacks
def delete_appservice_association(self, service, room_alias):
@@ -129,11 +149,9 @@ class DirectoryHandler(BaseHandler):
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
yield self.store.delete_room_alias(room_alias)
room_id = yield self.store.delete_room_alias(room_alias)
# TODO - Looks like _update_room_alias_event has never been implemented
# if room_id:
# yield self._update_room_alias_events(user_id, room_id)
defer.returnValue(room_id)
@defer.inlineCallbacks
def get_association(self, room_alias):
@@ -233,6 +251,29 @@ class DirectoryHandler(BaseHandler):
ratelimit=False
)
@defer.inlineCallbacks
def _update_canonical_alias(self, requester, user_id, room_id, room_alias):
alias_event = yield self.state.get_current_state(
room_id, EventTypes.CanonicalAlias, ""
)
alias_str = room_alias.to_string()
if not alias_event or alias_event.content.get("alias", "") != alias_str:
return
msg_handler = self.hs.get_handlers().message_handler
yield msg_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.CanonicalAlias,
"state_key": "",
"room_id": room_id,
"sender": user_id,
"content": {},
},
ratelimit=False
)
@defer.inlineCallbacks
def get_association_from_room_alias(self, room_alias):
result = yield self.store.get_association_from_room_alias(
@@ -276,3 +317,25 @@ class DirectoryHandler(BaseHandler):
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
defer.returnValue(is_admin)
@defer.inlineCallbacks
def edit_published_room_list(self, requester, room_id, visibility):
"""Edit the entry of the room in the published room list.
requester
room_id (str)
visibility (str): "public" or "private"
"""
if requester.is_guest:
raise AuthError(403, "Guests cannot edit the published room list")
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")
yield self.auth.check_can_change_room_list(room_id, requester.user)
yield self.store.set_room_is_public(room_id, visibility == "public")

View File

@@ -18,7 +18,6 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.types import UserID
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_context_over_fn
from synapse.api.constants import Membership, EventTypes
from synapse.events import EventBase
@@ -31,20 +30,6 @@ import random
logger = logging.getLogger(__name__)
def started_user_eventstream(distributor, user):
return preserve_context_over_fn(
distributor.fire,
"started_user_eventstream", user
)
def stopped_user_eventstream(distributor, user):
return preserve_context_over_fn(
distributor.fire,
"stopped_user_eventstream", user
)
class EventStreamHandler(BaseHandler):
def __init__(self, hs):
@@ -63,61 +48,6 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def started_stream(self, user):
"""Tells the presence handler that we have started an eventstream for
the user:
Args:
user (User): The user who started a stream.
Returns:
A deferred that completes once their presence has been updated.
"""
if user not in self._streams_per_user:
# Make sure we set the streams per user to 1 here rather than
# setting it to zero and incrementing the value below.
# Otherwise this may race with stopped_stream causing the
# user to be erased from the map before we have a chance
# to increment it.
self._streams_per_user[user] = 1
if user in self._stop_timer_per_user:
try:
self.clock.cancel_call_later(
self._stop_timer_per_user.pop(user)
)
except:
logger.exception("Failed to cancel event timer")
else:
yield started_user_eventstream(self.distributor, user)
else:
self._streams_per_user[user] += 1
def stopped_stream(self, user):
"""If there are no streams for a user this starts a timer that will
notify the presence handler that we haven't got an event stream for
the user unless the user starts a new stream in 30 seconds.
Args:
user (User): The user who stopped a stream.
"""
self._streams_per_user[user] -= 1
if not self._streams_per_user[user]:
del self._streams_per_user[user]
# 30 seconds of grace to allow the client to reconnect again
# before we think they're gone
def _later():
logger.debug("_later stopped_user_eventstream %s", user)
self._stop_timer_per_user.pop(user, None)
return stopped_user_eventstream(self.distributor, user)
logger.debug("Scheduling _later: for %s", user)
self._stop_timer_per_user[user] = (
self.clock.call_later(30, _later)
)
@defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0,

View File

@@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, backfilled, state=None,
def on_receive_pdu(self, origin, pdu, state=None,
auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
@@ -123,7 +123,6 @@ class FederationHandler(BaseHandler):
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
current_state = None
is_in_room = yield self.auth.check_host_in_room(
event.room_id,
self.server_name
@@ -186,8 +185,6 @@ class FederationHandler(BaseHandler):
origin,
event,
state=state,
backfilled=backfilled,
current_state=current_state,
)
except AuthError as e:
raise FederationError(
@@ -216,18 +213,17 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
if not backfilled:
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
with PreserveLoggingContext():
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
with PreserveLoggingContext():
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -647,7 +643,7 @@ class FederationHandler(BaseHandler):
continue
try:
self.on_receive_pdu(origin, p, backfilled=False)
self.on_receive_pdu(origin, p)
except:
logger.exception("Couldn't handle pdu")
@@ -779,7 +775,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=False,
)
target_user = UserID.from_string(event.state_key)
@@ -813,7 +808,21 @@ class FederationHandler(BaseHandler):
target_hosts,
signed_event
)
defer.returnValue(None)
context = yield self.state_handler.compute_event_context(event)
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
)
target_user = UserID.from_string(event.state_key)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
@@ -1059,8 +1068,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def _handle_new_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
def _handle_new_event(self, origin, event, state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
@@ -1070,7 +1078,7 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
if not backfilled and not event.internal_metadata.is_outlier():
if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, context, self
@@ -1079,9 +1087,7 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
is_new_state=(not outlier and not backfilled),
current_state=current_state,
is_new_state=not outlier,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1179,7 +1185,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
backfilled=False,
is_new_state=True,
current_state=state,
)

View File

@@ -73,14 +73,6 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
def user_presence_changed(distributor, user, statuscache):
return distributor.fire("user_presence_changed", user, statuscache)
def collect_presencelike_data(distributor, user, content):
return distributor.fire("collect_presencelike_data", user, content)
class PresenceHandler(BaseHandler):
def __init__(self, hs):

View File

@@ -47,7 +47,8 @@ class RegistrationHandler(BaseHandler):
self._next_generated_user_id = None
@defer.inlineCallbacks
def check_username(self, localpart, guest_access_token=None):
def check_username(self, localpart, guest_access_token=None,
assigned_user_id=None):
yield run_on_reactor()
if urllib.quote(localpart.encode('utf-8')) != localpart:
@@ -60,6 +61,15 @@ class RegistrationHandler(BaseHandler):
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
if assigned_user_id:
if user_id == assigned_user_id:
return
else:
raise SynapseError(
400,
"A different user ID has already been registered for this session",
)
yield self.check_user_id_not_appservice_exclusive(user_id)
users = yield self.store.get_users_by_id_case_insensitive(user_id)

View File

@@ -119,7 +119,8 @@ class RoomCreationHandler(BaseHandler):
invite_3pid_list = config.get("invite_3pid", [])
is_public = config.get("visibility", None) == "public"
visibility = config.get("visibility", None)
is_public = visibility == "public"
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
@@ -155,9 +156,9 @@ class RoomCreationHandler(BaseHandler):
preset_config = config.get(
"preset",
RoomCreationPreset.PUBLIC_CHAT
if is_public
else RoomCreationPreset.PRIVATE_CHAT
RoomCreationPreset.PRIVATE_CHAT
if visibility == "private"
else RoomCreationPreset.PUBLIC_CHAT
)
raw_initial_state = config.get("initial_state", [])
@@ -946,53 +947,62 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def handle_room(room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
if not aliases:
defer.returnValue(None)
state = yield self.state_handler.get_current_state(room_id)
# We pull each bit of state out indvidually to avoid pulling the
# full state into memory. Due to how the caching works this should
# be fairly quick, even if not originally in the cache.
def get_state(etype, state_key):
return self.state_handler.get_current_state(room_id, etype, state_key)
result = {"aliases": aliases, "room_id": room_id}
# Double check that this is actually a public room.
join_rules_event = yield get_state(EventTypes.JoinRules, "")
if join_rules_event:
join_rule = join_rules_event.content.get("join_rule", None)
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
name_event = state.get((EventTypes.Name, ""), None)
result = {"room_id": room_id}
if aliases:
result["aliases"] = aliases
name_event = yield get_state(EventTypes.Name, "")
if name_event:
name = name_event.content.get("name", None)
if name:
result["name"] = name
topic_event = state.get((EventTypes.Topic, ""), None)
topic_event = yield get_state(EventTypes.Topic, "")
if topic_event:
topic = topic_event.content.get("topic", None)
if topic:
result["topic"] = topic
canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
if canonical_event:
canonical_alias = canonical_event.content.get("alias", None)
if canonical_alias:
result["canonical_alias"] = canonical_alias
visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
visibility = None
if visibility_event:
visibility = visibility_event.content.get("history_visibility", None)
result["world_readable"] = visibility == "world_readable"
guest_event = state.get((EventTypes.GuestAccess, ""), None)
guest_event = yield get_state(EventTypes.GuestAccess, "")
guest = None
if guest_event:
guest = guest_event.content.get("guest_access", None)
result["guest_can_join"] = guest == "can_join"
avatar_event = state.get(("m.room.avatar", ""), None)
avatar_event = yield get_state("m.room.avatar", "")
if avatar_event:
avatar_url = avatar_event.content.get("url", None)
if avatar_url:
result["avatar_url"] = avatar_url
result["num_joined_members"] = sum(
1 for (event_type, _), ev in state.items()
if event_type == EventTypes.Member and ev.membership == Membership.JOIN
)
joined_users = yield self.store.get_users_in_room(room_id)
result["num_joined_members"] = len(joined_users)
defer.returnValue(result)

View File

@@ -18,6 +18,7 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
)
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import intern_dict
import synapse.metrics
import synapse.events
@@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource):
else:
servlet_classname = "%r" % callback
args = [
urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups()
]
kwargs = intern_dict({
name: urllib.unquote(value).decode("UTF-8") if value else value
for name, value in m.groupdict().items()
})
callback_return = yield callback(request, *args)
callback_return = yield callback(request, **kwargs)
if callback_return is not None:
code, response = callback_return
self._send_response(request, code, response)

View File

@@ -282,6 +282,12 @@ class Notifier(object):
self.notify_replication()
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
with PreserveLoggingContext():
self.notify_replication()
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
from_token=StreamToken.START):

View File

@@ -317,7 +317,7 @@ class Pusher(object):
@defer.inlineCallbacks
def _get_badge_count(self):
invites, joins = yield defer.gatherResults([
self.store.get_invites_for_user(self.user_id),
self.store.get_invited_rooms_for_user(self.user_id),
self.store.get_rooms_for_user(self.user_id),
], consumeErrors=True)

View File

@@ -107,7 +107,9 @@ class BulkPushRuleEvaluator:
users_dict.items(), [event], {event.event_id: current_state}
)
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
room_members = yield self.store.get_users_in_room(self.room_id)
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
condition_cache = {}

View File

@@ -37,6 +37,7 @@ STREAM_NAMES = (
("user_account_data", "room_account_data", "tag_account_data",),
("backfill",),
("push_rules",),
("pushers",),
)
@@ -65,6 +66,7 @@ class ReplicationResource(Resource):
* "tag_account_data": Per room per user tags.
* "backfill": Old events that have been backfilled from other servers.
* "push_rules": Per user changes to push rules.
* "pushers": Per user changes to their pushers.
The API takes two additional query parameters:
@@ -120,6 +122,7 @@ class ReplicationResource(Resource):
stream_token = yield self.sources.get_current_token()
backfill_token = yield self.store.get_current_backfill_token()
push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
pushers_token = self.store.get_pushers_stream_token()
defer.returnValue(_ReplicationToken(
room_stream_token,
@@ -129,6 +132,7 @@ class ReplicationResource(Resource):
int(stream_token.account_data_key),
backfill_token,
push_rules_token,
pushers_token,
))
@request_handler
@@ -151,6 +155,7 @@ class ReplicationResource(Resource):
yield self.typing(writer, current_token) # TODO: implement limit
yield self.receipts(writer, current_token, limit)
yield self.push_rules(writer, current_token, limit)
yield self.pushers(writer, current_token, limit)
self.streams(writer, current_token)
logger.info("Replicated %d rows", writer.total)
@@ -297,6 +302,24 @@ class ReplicationResource(Resource):
"priority_class", "priority", "conditions", "actions"
))
@defer.inlineCallbacks
def pushers(self, writer, current_token, limit):
current_position = current_token.pushers
pushers = parse_integer(writer.request, "pushers")
if pushers is not None:
updated, deleted = yield self.store.get_all_updated_pushers(
pushers, current_position, limit
)
writer.write_header_and_rows("pushers", updated, (
"position", "user_id", "access_token", "profile_tag", "kind",
"app_id", "app_display_name", "device_display_name", "pushkey",
"ts", "lang", "data"
))
writer.write_header_and_rows("deleted", deleted, (
"position", "user_id", "app_id", "pushkey"
))
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
@@ -327,7 +350,7 @@ class _Writer(object):
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
"events", "presence", "typing", "receipts", "account_data", "backfill",
"push_rules"
"push_rules", "pushers"
))):
__slots__ = []

View File

@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
def register_servlets(hs, http_server):
ClientDirectoryServer(hs).register(http_server)
ClientDirectoryListServer(hs).register(http_server)
class ClientDirectoryServer(ClientV1RestServlet):
@@ -127,8 +128,9 @@ class ClientDirectoryServer(ClientV1RestServlet):
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_association(
user.to_string(), room_alias
requester, user.to_string(), room_alias
)
logger.info(
"User %s deleted alias %s",
user.to_string(),
@@ -136,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet):
)
defer.returnValue((200, {}))
class ClientDirectoryListServer(ClientV1RestServlet):
PATTERNS = client_path_patterns("/directory/list/room/(?P<room_id>[^/]*)$")
def __init__(self, hs):
super(ClientDirectoryListServer, self).__init__(hs)
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")
defer.returnValue((200, {
"visibility": "public" if room["is_public"] else "private"
}))
@defer.inlineCallbacks
def on_PUT(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
content = parse_json_object_from_request(request)
visibility = content.get("visibility", "public")
yield self.handlers.directory_handler.edit_published_room_list(
requester, room_id, visibility,
)
defer.returnValue((200, {}))
@defer.inlineCallbacks
def on_DELETE(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield self.handlers.directory_handler.edit_published_room_list(
requester, room_id, "private",
)
defer.returnValue((200, {}))

View File

@@ -29,6 +29,10 @@ logger = logging.getLogger(__name__)
class PusherRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/pushers/set$")
def __init__(self, hs):
super(PusherRestServlet, self).__init__(hs)
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def on_POST(self, request):
requester = yield self.auth.get_user_by_req(request)
@@ -87,6 +91,8 @@ class PusherRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Config Error: " + pce.message,
errcode=Codes.MISSING_PARAM)
self.notifier.on_new_replication_data()
defer.returnValue((200, {}))
def on_OPTIONS(self, _):

View File

@@ -43,7 +43,7 @@ class PasswordRestServlet(RestServlet):
body = parse_json_object_from_request(request)
authed, result, params = yield self.auth_handler.check_auth([
authed, result, params, _ = yield self.auth_handler.check_auth([
[LoginType.PASSWORD],
[LoginType.EMAIL_IDENTITY]
], body, self.hs.get_ip_from_request(request))

View File

@@ -122,10 +122,22 @@ class RegisterRestServlet(RestServlet):
guest_access_token = body.get("guest_access_token", None)
session_id = self.auth_handler.get_session_id(body)
registered_user_id = None
if session_id:
# if we get a registered user id out of here, it means we previously
# registered a user for this session, so we could just return the
# user here. We carry on and go through the auth checks though,
# for paranoia.
registered_user_id = self.auth_handler.get_session_data(
session_id, "registered_user_id", None
)
if desired_username is not None:
yield self.registration_handler.check_username(
desired_username,
guest_access_token=guest_access_token
guest_access_token=guest_access_token,
assigned_user_id=registered_user_id,
)
if self.hs.config.enable_registration_captcha:
@@ -139,7 +151,7 @@ class RegisterRestServlet(RestServlet):
[LoginType.EMAIL_IDENTITY]
]
authed, result, params = yield self.auth_handler.check_auth(
authed, result, params, session_id = yield self.auth_handler.check_auth(
flows, body, self.hs.get_ip_from_request(request)
)
@@ -147,6 +159,22 @@ class RegisterRestServlet(RestServlet):
defer.returnValue((401, result))
return
if registered_user_id is not None:
logger.info(
"Already registered user ID %r for this session",
registered_user_id
)
access_token = yield self.auth_handler.issue_access_token(registered_user_id)
refresh_token = yield self.auth_handler.issue_refresh_token(
registered_user_id
)
defer.returnValue((200, {
"user_id": registered_user_id,
"access_token": access_token,
"home_server": self.hs.hostname,
"refresh_token": refresh_token,
}))
# NB: This may be from the auth handler and NOT from the POST
if 'password' not in params:
raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM)
@@ -161,6 +189,12 @@ class RegisterRestServlet(RestServlet):
guest_access_token=guest_access_token,
)
# remember that we've now registered that user account, and with what
# user ID (since the user may not have specified)
self.auth_handler.set_session_data(
session_id, "registered_user_id", user_id
)
if result and LoginType.EMAIL_IDENTITY in result:
threepid = result[LoginType.EMAIL_IDENTITY]

View File

@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.auth import AuthEventTypes
@@ -27,6 +28,7 @@ from collections import namedtuple
import logging
import hashlib
import os
logger = logging.getLogger(__name__)
@@ -34,8 +36,11 @@ logger = logging.getLogger(__name__)
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
SIZE_OF_CACHE = 1000
EVICTION_TIMEOUT_SECONDS = 20
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR)
EVICTION_TIMEOUT_SECONDS = 60 * 60
class _StateCacheEntry(object):
@@ -85,16 +90,8 @@ class StateHandler(object):
"""
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
cache = None
if self._state_cache is not None:
cache = self._state_cache.get(frozenset(event_ids), None)
if cache:
cache.ts = self.clock.time_msec()
state = cache.state
else:
res = yield self.resolve_state_groups(room_id, event_ids)
state = res[1]
res = yield self.resolve_state_groups(room_id, event_ids)
state = res[1]
if event_type:
defer.returnValue(state.get((event_type, state_key)))
@@ -186,20 +183,6 @@ class StateHandler(object):
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
if self._state_cache is not None:
cache = self._state_cache.get(frozenset(event_ids), None)
if cache and cache.state_group:
cache.ts = self.clock.time_msec()
prev_state = cache.state.get((event_type, state_key), None)
if prev_state:
prev_state = prev_state.event_id
prev_states = [prev_state]
else:
prev_states = []
defer.returnValue(
(cache.state_group, cache.state, prev_states)
)
state_groups = yield self.store.get_state_groups(
room_id, event_ids
)
@@ -209,7 +192,7 @@ class StateHandler(object):
state_groups.keys()
)
group_names = set(state_groups.keys())
group_names = frozenset(state_groups.keys())
if len(group_names) == 1:
name, state_list = state_groups.items().pop()
state = {
@@ -223,29 +206,38 @@ class StateHandler(object):
else:
prev_states = []
if self._state_cache is not None:
cache = _StateCacheEntry(
state=state,
state_group=name,
ts=self.clock.time_msec()
)
self._state_cache[frozenset(event_ids)] = cache
defer.returnValue((name, state, prev_states))
if self._state_cache is not None:
cache = self._state_cache.get(group_names, None)
if cache and cache.state_group:
cache.ts = self.clock.time_msec()
event_dict = yield self.store.get_events(cache.state.values())
state = {(e.type, e.state_key): e for e in event_dict.values()}
prev_state = state.get((event_type, state_key), None)
if prev_state:
prev_state = prev_state.event_id
prev_states = [prev_state]
else:
prev_states = []
defer.returnValue(
(cache.state_group, state, prev_states)
)
new_state, prev_states = self._resolve_events(
state_groups.values(), event_type, state_key
)
if self._state_cache is not None:
cache = _StateCacheEntry(
state=new_state,
state={key: event.event_id for key, event in new_state.items()},
state_group=None,
ts=self.clock.time_msec()
)
self._state_cache[frozenset(event_ids)] = cache
self._state_cache[group_names] = cache
defer.returnValue((None, new_state, prev_states))
@@ -263,48 +255,49 @@ class StateHandler(object):
from (type, state_key) to event. prev_states is a list of event_ids.
:rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
"""
state = {}
for st in state_sets:
for e in st:
state.setdefault(
(e.type, e.state_key),
{}
)[e.event_id] = e
with Measure(self.clock, "state._resolve_events"):
state = {}
for st in state_sets:
for e in st:
state.setdefault(
(e.type, e.state_key),
{}
)[e.event_id] = e
unconflicted_state = {
k: v.values()[0] for k, v in state.items()
if len(v.values()) == 1
}
unconflicted_state = {
k: v.values()[0] for k, v in state.items()
if len(v.values()) == 1
}
conflicted_state = {
k: v.values()
for k, v in state.items()
if len(v.values()) > 1
}
conflicted_state = {
k: v.values()
for k, v in state.items()
if len(v.values()) > 1
}
if event_type:
prev_states_events = conflicted_state.get(
(event_type, state_key), []
)
prev_states = [s.event_id for s in prev_states_events]
else:
prev_states = []
if event_type:
prev_states_events = conflicted_state.get(
(event_type, state_key), []
)
prev_states = [s.event_id for s in prev_states_events]
else:
prev_states = []
auth_events = {
k: e for k, e in unconflicted_state.items()
if k[0] in AuthEventTypes
}
auth_events = {
k: e for k, e in unconflicted_state.items()
if k[0] in AuthEventTypes
}
try:
resolved_state = self._resolve_state_events(
conflicted_state, auth_events
)
except:
logger.exception("Failed to resolve state")
raise
try:
resolved_state = self._resolve_state_events(
conflicted_state, auth_events
)
except:
logger.exception("Failed to resolve state")
raise
new_state = unconflicted_state
new_state.update(resolved_state)
new_state = unconflicted_state
new_state.update(resolved_state)
return new_state, prev_states

View File

@@ -119,12 +119,15 @@ class DataStore(RoomMemberStore, RoomStore,
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rules_stream_id_gen = ChainedIdGenerator(
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
)
self._pushers_id_gen = StreamIdGenerator(
db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")],
)
events_max = self._stream_id_gen.get_max_token()
event_cache_prefill, min_event_val = self._get_cache_dict(

View File

@@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
from synapse.util.caches import intern_dict
import synapse.metrics
@@ -26,6 +27,10 @@ from twisted.internet import defer
import sys
import time
import threading
import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
logger = logging.getLogger(__name__)
@@ -163,7 +168,9 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR
)
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
@@ -344,7 +351,7 @@ class SQLBaseStore(object):
"""
col_headers = list(column[0] for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor.fetchall()
intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall()
)
return results

View File

@@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore):
return room_id
@cached()
@cached(max_entries=5000)
def get_aliases_for_room(self, room_id):
return self._simple_select_onecol(
"room_aliases",

View File

@@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore):
)
self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
@cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):

View File

@@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
def persist_event(self, event, context,
is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
self.min_stream_token -= 1
stream_ordering = self.min_stream_token
if stream_ordering is None:
stream_ordering_manager = self._stream_id_gen.get_next()
else:
@contextmanager
def stream_ordering_manager():
yield stream_ordering
stream_ordering_manager = stream_ordering_manager()
try:
with stream_ordering_manager as stream_ordering:
with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
is_new_state=is_new_state,
current_state=current_state,
)
@@ -165,13 +151,38 @@ class EventsStore(SQLBaseStore):
defer.returnValue(events[0] if events else None)
@defer.inlineCallbacks
def get_events(self, event_ids, check_redacted=True,
get_prev_content=False, allow_rejected=False):
"""Get events from the database
Args:
event_ids (list): The event_ids of the events to fetch
check_redacted (bool): If True, check if event has been redacted
and redact it.
get_prev_content (bool): If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
Returns:
Deferred : Dict from event_id to event.
"""
events = yield self._get_events(
event_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
defer.returnValue({e.event_id: e for e in events})
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
def _persist_event_txn(self, txn, event, context,
is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.call_after(self.get_current_state_for_key.invalidate_all)
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
@@ -198,7 +209,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn(
txn,
[(event, context)],
backfilled=backfilled,
backfilled=False,
is_new_state=is_new_state,
)
@@ -455,7 +466,7 @@ class EventsStore(SQLBaseStore):
for event, _ in state_events_and_contexts:
if not context.rejected:
txn.call_after(
self.get_current_state_for_key.invalidate,
self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
)
@@ -526,6 +537,9 @@ class EventsStore(SQLBaseStore):
if not event_ids:
defer.returnValue([])
event_id_list = event_ids
event_ids = set(event_ids)
event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
@@ -535,23 +549,18 @@ class EventsStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_map]
if not missing_events_ids:
defer.returnValue([
event_map[e_id] for e_id in event_ids
if e_id in event_map and event_map[e_id]
])
if missing_events_ids:
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
event_map.update(missing_events)
event_map.update(missing_events)
defer.returnValue([
event_map[e_id] for e_id in event_ids
event_map[e_id] for e_id in event_id_list
if e_id in event_map and event_map[e_id]
])

View File

@@ -16,8 +16,6 @@
from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.api.errors import StoreError
from canonicaljson import encode_canonical_json
import logging
@@ -79,12 +77,41 @@ class PusherStore(SQLBaseStore):
rows = yield self.runInteraction("get_all_pushers", get_pushers)
defer.returnValue(rows)
def get_pushers_stream_token(self):
return self._pushers_id_gen.get_max_token()
def get_all_updated_pushers(self, last_id, current_id, limit):
def get_all_updated_pushers_txn(txn):
sql = (
"SELECT id, user_name, access_token, profile_tag, kind,"
" app_id, app_display_name, device_display_name, pushkey, ts,"
" lang, data"
" FROM pushers"
" WHERE ? < id AND id <= ?"
" ORDER BY id ASC LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
updated = txn.fetchall()
sql = (
"SELECT stream_id, user_id, app_id, pushkey"
" FROM deleted_pushers"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
deleted = txn.fetchall()
return (updated, deleted)
return self.runInteraction(
"get_all_updated_pushers", get_all_updated_pushers_txn
)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
pushkey, pushkey_ts, lang, data, profile_tag=""):
try:
next_id = self._pushers_id_gen.get_next()
with self._pushers_id_gen.get_next() as stream_id:
yield self._simple_upsert(
"pushers",
dict(
@@ -101,23 +128,29 @@ class PusherStore(SQLBaseStore):
lang=lang,
data=encode_canonical_json(data),
profile_tag=profile_tag,
),
insertion_values=dict(
id=next_id,
id=stream_id,
),
desc="add_pusher",
)
except Exception as e:
logger.error("create_pusher with failed: %s", e)
raise StoreError(500, "Problem creating pusher.")
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
yield self._simple_delete_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, 'user_name': user_id},
desc="delete_pusher_by_app_id_pushkey_user_id",
)
def delete_pusher_txn(txn, stream_id):
self._simple_delete_one_txn(
txn,
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
)
self._simple_upsert_txn(
txn,
"deleted_pushers",
{"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
{"stream_id": stream_id},
)
with self._pushers_id_gen.get_next() as stream_id:
yield self.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
@defer.inlineCallbacks
def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):

View File

@@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=2)
def get_receipts_for_user(self, user_id, receipt_type):
def f(txn):
sql = (
"SELECT room_id,event_id "
"FROM receipts_linearized "
"WHERE user_id = ? AND receipt_type = ? "
)
txn.execute(sql, (user_id, receipt_type))
return txn.fetchall()
rows = yield self._simple_select_list(
table="receipts_linearized",
keyvalues={
"user_id": user_id,
"receipt_type": receipt_type,
},
retcols=("room_id", "event_id"),
desc="get_receipts_for_user",
)
defer.returnValue(dict(
(yield self.runInteraction("get_receipts_for_user", f))
))
defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
@defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):

View File

@@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore):
allow_none=True,
)
def set_room_is_public(self, room_id, is_public):
return self._simple_update_one(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
desc="set_room_is_public",
)
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",

View File

@@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore):
).addCallback(self._get_events)
@cached()
def get_invites_for_user(self, user_id):
""" Get all the invite events for a user
def get_invited_rooms_for_user(self, user_id):
""" Get all the rooms the user is invited to
Args:
user_id (str): The user ID.
Returns:
A deferred list of event objects.
A deferred list of RoomsForUser.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE]
).addCallback(lambda invites: self._get_events([
invite.event_id for invite in invites
]))
)
def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user
@@ -251,30 +249,6 @@ class RoomMemberStore(SQLBaseStore):
user_id, membership_list=[Membership.JOIN],
)
@defer.inlineCallbacks
def user_rooms_intersect(self, user_id_list):
""" Checks whether all the users whose IDs are given in a list share a
room.
This is a "hot path" function that's called a lot, e.g. by presence for
generating the event stream. As such, it is implemented locally by
wrapping logic around heavily-cached database queries.
"""
if len(user_id_list) < 2:
defer.returnValue(True)
deferreds = [self.get_rooms_for_user(u) for u in user_id_list]
results = yield defer.DeferredList(deferreds, consumeErrors=True)
# A list of sets of strings giving room IDs for each user
room_id_lists = [set([r.room_id for r in result[1]]) for result in results]
# There isn't a setintersection(*list_of_sets)
ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0
defer.returnValue(ret)
@defer.inlineCallbacks
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""

View File

@@ -0,0 +1,25 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS deleted_pushers(
stream_id BIGINT NOT NULL,
app_id TEXT NOT NULL,
pushkey TEXT NOT NULL,
user_id TEXT NOT NULL,
/* We only track the most recent delete for each app_id, pushkey and user_id. */
UNIQUE (app_id, pushkey, user_id)
);
CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id);

View File

@@ -0,0 +1,23 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* This release removes the restriction that published rooms must have an alias,
* so we go back and ensure the only 'public' rooms are ones with an alias.
* We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite
*/
UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in (
SELECT room_id FROM room_aliases
);

View File

@@ -14,9 +14,8 @@
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import (
cached, cachedInlineCallbacks, cachedList
)
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches import intern_string
from twisted.internet import defer
@@ -155,8 +154,14 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
@cachedInlineCallbacks(num_args=3)
@defer.inlineCallbacks
def get_current_state_for_key(self, room_id, event_type, state_key):
event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key)
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
@cached(num_args=3)
def _get_current_state_for_key(self, room_id, event_type, state_key):
def f(txn):
sql = (
"SELECT event_id FROM current_state_events"
@@ -167,12 +172,10 @@ class StateStore(SQLBaseStore):
txn.execute(sql, args)
results = txn.fetchall()
return [r[0] for r in results]
event_ids = yield self.runInteraction("get_current_state_for_key", f)
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
return self.runInteraction("get_current_state_for_key", f)
def _get_state_groups_from_groups(self, groups, types):
"""Returns dictionary state_group -> state event ids
"""Returns dictionary state_group -> (dict of (type, state_key) -> event id)
"""
def f(txn, groups):
if types is not None:
@@ -183,7 +186,8 @@ class StateStore(SQLBaseStore):
where_clause = ""
sql = (
"SELECT state_group, event_id FROM state_groups_state WHERE"
"SELECT state_group, event_id, type, state_key"
" FROM state_groups_state WHERE"
" state_group IN (%s) %s" % (
",".join("?" for _ in groups),
where_clause,
@@ -199,7 +203,8 @@ class StateStore(SQLBaseStore):
results = {}
for row in rows:
results.setdefault(row["state_group"], []).append(row["event_id"])
key = (row["type"], row["state_key"])
results.setdefault(row["state_group"], {})[key] = row["event_id"]
return results
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
@@ -296,7 +301,7 @@ class StateStore(SQLBaseStore):
where a `state_key` of `None` matches all state_keys for the
`type`.
"""
is_all, state_dict = self._state_group_cache.get(group)
is_all, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
missing_types = set()
@@ -308,7 +313,7 @@ class StateStore(SQLBaseStore):
if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key)
if (typ, state_key) not in state_dict:
if (typ, state_key) not in state_dict_ids:
missing_types.add((typ, state_key))
sentinel = object()
@@ -326,7 +331,7 @@ class StateStore(SQLBaseStore):
got_all = not (missing_types or types is None)
return {
k: v for k, v in state_dict.items()
k: v for k, v in state_dict_ids.items()
if include(k[0], k[1])
}, missing_types, got_all
@@ -340,8 +345,9 @@ class StateStore(SQLBaseStore):
Args:
group: The state group to lookup
"""
is_all, state_dict = self._state_group_cache.get(group)
return state_dict, is_all
is_all, state_dict_ids = self._state_group_cache.get(group)
return state_dict_ids, is_all
@defer.inlineCallbacks
def _get_state_for_groups(self, groups, types=None):
@@ -354,84 +360,72 @@ class StateStore(SQLBaseStore):
missing_groups = []
if types is not None:
for group in set(groups):
state_dict, missing_types, got_all = self._get_some_state_from_cache(
state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
group, types
)
results[group] = state_dict
results[group] = state_dict_ids
if not got_all:
missing_groups.append(group)
else:
for group in set(groups):
state_dict, got_all = self._get_all_state_from_cache(
state_dict_ids, got_all = self._get_all_state_from_cache(
group
)
results[group] = state_dict
results[group] = state_dict_ids
if not got_all:
missing_groups.append(group)
if not missing_groups:
defer.returnValue({
group: {
type_tuple: event
for type_tuple, event in state.items()
if event
}
for group, state in results.items()
})
if missing_groups:
# Okay, so we have some missing_types, lets fetch them.
cache_seq_num = self._state_group_cache.sequence
# Okay, so we have some missing_types, lets fetch them.
cache_seq_num = self._state_group_cache.sequence
group_to_state_dict = yield self._get_state_groups_from_groups(
missing_groups, types
)
group_state_dict = yield self._get_state_groups_from_groups(
missing_groups, types
)
# Now we want to update the cache with all the things we fetched
# from the database.
for group, group_state_dict in group_to_state_dict.items():
if types:
# We delibrately put key -> None mappings into the cache to
# cache absence of the key, on the assumption that if we've
# explicitly asked for some types then we will probably ask
# for them again.
state_dict = {
(intern_string(etype), intern_string(state_key)): None
for (etype, state_key) in types
}
state_dict.update(results[group])
results[group] = state_dict
else:
state_dict = results[group]
state_dict.update(group_state_dict)
self._state_group_cache.update(
cache_seq_num,
key=group,
value=state_dict,
full=(types is None),
)
state_events = yield self._get_events(
[e_id for l in group_state_dict.values() for e_id in l],
[ev_id for sd in results.values() for ev_id in sd.values()],
get_prev_content=False
)
state_events = {e.event_id: e for e in state_events}
# Now we want to update the cache with all the things we fetched
# from the database.
for group, state_ids in group_state_dict.items():
if types:
# We delibrately put key -> None mappings into the cache to
# cache absence of the key, on the assumption that if we've
# explicitly asked for some types then we will probably ask
# for them again.
state_dict = {key: None for key in types}
state_dict.update(results[group])
results[group] = state_dict
else:
state_dict = results[group]
for event_id in state_ids:
try:
state_event = state_events[event_id]
state_dict[(state_event.type, state_event.state_key)] = state_event
except KeyError:
# Hmm. So we do don't have that state event? Interesting.
logger.warn(
"Can't find state event %r for state group %r",
event_id, group,
)
self._state_group_cache.update(
cache_seq_num,
key=group,
value=state_dict,
full=(types is None),
)
# Remove all the entries with None values. The None values were just
# used for bookkeeping in the cache.
for group, state_dict in results.items():
results[group] = {
key: event for key, event in state_dict.items() if event
key: state_events[event_id]
for key, event_id in state_dict.items()
if event_id and event_id in state_events
}
defer.returnValue(results)

View File

@@ -36,7 +36,7 @@ what sort order was used:
from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn
@@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore):
defer.returnValue((events, token))
@cachedInlineCallbacks(num_args=4)
@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
)
logger.debug("stream before")
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
logger.debug("stream after")
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
@cached(num_args=4)
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None:
@@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore):
return rows, token
rows, token = yield self.runInteraction(
return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
)
logger.debug("stream before")
events = yield self._get_events(
[r["event_id"] for r in rows],
get_prev_content=True
)
logger.debug("stream after")
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
@defer.inlineCallbacks
def get_room_events_max_id(self, direction='f'):
token = yield self._stream_id_gen.get_max_token()

View File

@@ -49,9 +49,14 @@ class StreamIdGenerator(object):
with stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""
def __init__(self, db_conn, table, column):
def __init__(self, db_conn, table, column, extra_tables=[]):
self._lock = threading.Lock()
self._current_max = _load_max_id(db_conn, table, column)
for table, column in extra_tables:
self._current_max = max(
self._current_max,
_load_max_id(db_conn, table, column)
)
self._unfinished_ids = deque()
def get_next(self):

View File

@@ -14,6 +14,10 @@
# limitations under the License.
import synapse.metrics
from lrucache import LruCache
import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
DEBUG_CACHES = False
@@ -25,3 +29,56 @@ cache_counter = metrics.register_cache(
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache
KNOWN_KEYS = {
key: key for key in
(
"auth_events",
"content",
"depth",
"event_id",
"hashes",
"origin",
"origin_server_ts",
"prev_events",
"room_id",
"sender",
"signatures",
"state_key",
"type",
"unsigned",
"user_id",
)
}
def intern_string(string):
"""Takes a (potentially) unicode string and interns using custom cache
"""
return _string_cache.setdefault(string, string)
def intern_dict(dictionary):
"""Takes a dictionary and interns well known keys and their values
"""
return {
KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
for key, value in dictionary.items()
}
def _intern_known_values(key, value):
intern_str_keys = ("event_id", "room_id")
intern_unicode_keys = ("sender", "user_id", "type", "state_key")
if key in intern_str_keys:
return intern(value.encode('ascii'))
if key in intern_unicode_keys:
return intern_string(value)
return value

View File

@@ -29,6 +29,16 @@ def enumerate_leaves(node, depth):
yield m
class _Node(object):
__slots__ = ["prev_node", "next_node", "key", "value"]
def __init__(self, prev_node, next_node, key, value):
self.prev_node = prev_node
self.next_node = next_node
self.key = key
self.value = value
class LruCache(object):
"""
Least-recently-used cache.
@@ -38,10 +48,9 @@ class LruCache(object):
def __init__(self, max_size, keylen=1, cache_type=dict):
cache = cache_type()
self.cache = cache # Used for introspection.
list_root = []
list_root[:] = [list_root, list_root, None, None]
PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
list_root = _Node(None, None, None, None)
list_root.next_node = list_root
list_root.prev_node = list_root
lock = threading.Lock()
@@ -55,36 +64,36 @@ class LruCache(object):
def add_node(key, value):
prev_node = list_root
next_node = prev_node[NEXT]
node = [prev_node, next_node, key, value]
prev_node[NEXT] = node
next_node[PREV] = node
next_node = prev_node.next_node
node = _Node(prev_node, next_node, key, value)
prev_node.next_node = node
next_node.prev_node = node
cache[key] = node
def move_node_to_front(node):
prev_node = node[PREV]
next_node = node[NEXT]
prev_node[NEXT] = next_node
next_node[PREV] = prev_node
prev_node = node.prev_node
next_node = node.next_node
prev_node.next_node = next_node
next_node.prev_node = prev_node
prev_node = list_root
next_node = prev_node[NEXT]
node[PREV] = prev_node
node[NEXT] = next_node
prev_node[NEXT] = node
next_node[PREV] = node
next_node = prev_node.next_node
node.prev_node = prev_node
node.next_node = next_node
prev_node.next_node = node
next_node.prev_node = node
def delete_node(node):
prev_node = node[PREV]
next_node = node[NEXT]
prev_node[NEXT] = next_node
next_node[PREV] = prev_node
prev_node = node.prev_node
next_node = node.next_node
prev_node.next_node = next_node
next_node.prev_node = prev_node
@synchronized
def cache_get(key, default=None):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
return node[VALUE]
return node.value
else:
return default
@@ -93,25 +102,25 @@ class LruCache(object):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
node[VALUE] = value
node.value = value
else:
add_node(key, value)
if len(cache) > max_size:
todelete = list_root[PREV]
todelete = list_root.prev_node
delete_node(todelete)
cache.pop(todelete[KEY], None)
cache.pop(todelete.key, None)
@synchronized
def cache_set_default(key, value):
node = cache.get(key, None)
if node is not None:
return node[VALUE]
return node.value
else:
add_node(key, value)
if len(cache) > max_size:
todelete = list_root[PREV]
todelete = list_root.prev_node
delete_node(todelete)
cache.pop(todelete[KEY], None)
cache.pop(todelete.key, None)
return value
@synchronized
@@ -119,8 +128,8 @@ class LruCache(object):
node = cache.get(key, None)
if node:
delete_node(node)
cache.pop(node[KEY], None)
return node[VALUE]
cache.pop(node.key, None)
return node.value
else:
return default
@@ -137,8 +146,8 @@ class LruCache(object):
@synchronized
def cache_clear():
list_root[NEXT] = list_root
list_root[PREV] = list_root
list_root.next_node = list_root
list_root.prev_node = list_root
cache.clear()
@synchronized

View File

@@ -131,6 +131,7 @@ class ReplicationResourceCase(unittest.TestCase):
test_timeout_tag_account_data = _test_timeout("tag_account_data")
test_timeout_backfill = _test_timeout("backfill")
test_timeout_push_rules = _test_timeout("push_rules")
test_timeout_pushers = _test_timeout("pushers")
@defer.inlineCallbacks
def send_text_message(self, room_id, message):

View File

@@ -22,9 +22,10 @@ class RegisterRestServletTestCase(unittest.TestCase):
side_effect=lambda x: defer.succeed(self.appservice))
)
self.auth_result = (False, None, None)
self.auth_result = (False, None, None, None)
self.auth_handler = Mock(
check_auth=Mock(side_effect=lambda x, y, z: self.auth_result)
check_auth=Mock(side_effect=lambda x, y, z: self.auth_result),
get_session_data=Mock(return_value=None)
)
self.registration_handler = Mock()
self.identity_handler = Mock()
@@ -112,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.auth_result = (True, None, {
"username": "kermit",
"password": "monkey"
})
}, None)
self.registration_handler.register = Mock(return_value=(user_id, token))
(code, result) = yield self.servlet.on_POST(self.request)
@@ -135,7 +136,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.auth_result = (True, None, {
"username": "kermit",
"password": "monkey"
})
}, None)
self.registration_handler.register = Mock(return_value=("@user:id", "t"))
d = self.servlet.on_POST(self.request)
return self.assertFailure(d, SynapseError)

View File

@@ -91,11 +91,6 @@ class RoomMemberStoreTestCase(unittest.TestCase):
)
)]
)
self.assertFalse(
(yield self.store.user_rooms_intersect(
[self.u_alice.to_string(), self.u_bob.to_string()]
))
)
@defer.inlineCallbacks
def test_two_members(self):
@@ -108,11 +103,6 @@ class RoomMemberStoreTestCase(unittest.TestCase):
yield self.store.get_room_members(self.room.to_string())
)}
)
self.assertTrue((
yield self.store.user_rooms_intersect([
self.u_alice.to_string(), self.u_bob.to_string()
])
))
@defer.inlineCallbacks
def test_room_hosts(self):