Compare commits

...

40 Commits

Author SHA1 Message Date
Erik Johnston
29bc21ed5d Remove commented out code 2015-06-25 16:15:32 +01:00
Erik Johnston
1788bda33f Comment and remove unused variable 2015-06-25 16:04:35 +01:00
Erik Johnston
fd39ff3ec6 Remove debug logging 2015-06-25 15:19:54 +01:00
Erik Johnston
48520505cd Dedupe auth_chain + state 2015-06-25 15:18:07 +01:00
Erik Johnston
9d6e735344 PEP8 2015-06-25 14:15:53 +01:00
Erik Johnston
584a1c0dca Parallelise key storage 2015-06-25 14:13:47 +01:00
Erik Johnston
3cf2203c5c Don't parallelise persist_events 2015-06-25 14:11:18 +01:00
Erik Johnston
9951f43764 Add unwrapFirstError 2015-06-25 11:57:54 +01:00
Erik Johnston
9e6daddaf5 Persist events in parrellel if possible 2015-06-25 11:56:10 +01:00
Erik Johnston
e2a241af08 Make _persist_event_txn call _persist_events_txn 2015-06-25 11:45:05 +01:00
Erik Johnston
4500529c73 Typo 2015-06-25 09:22:42 +01:00
Erik Johnston
ecae5437bb Remove unused line 2015-06-24 16:32:16 +01:00
Erik Johnston
0f2ac80305 Verify state and auth_chain in the same batch 2015-06-24 14:51:10 +01:00
Erik Johnston
74f7b44955 Add desc to store_server_keys_json _simple_upsert 2015-06-24 13:15:52 +01:00
Erik Johnston
307e858b94 Fix backfill 2015-06-24 13:11:58 +01:00
Erik Johnston
ec6ceadeeb Only fail individular deferred for invalid signature 2015-06-24 13:04:37 +01:00
Erik Johnston
f4964e666a Fix up process_v2_response 2015-06-24 12:53:26 +01:00
Erik Johnston
2dcddf83f3 Use correct arg 2015-06-24 11:41:39 +01:00
Erik Johnston
12d5914e83 Add missing items() 2015-06-24 11:33:57 +01:00
Erik Johnston
3ee7c551e9 Logging 2015-06-24 11:30:05 +01:00
Erik Johnston
44e3bed2a1 Make _check_sigs_and_hash_and_fetch use batch api for verify keys 2015-06-24 11:28:07 +01:00
Erik Johnston
a29319fefa Implement a batch API for verify_json_objects_for_server 2015-06-24 11:21:35 +01:00
Erik Johnston
f859e3ca37 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/persist_event_perf 2015-06-23 16:41:58 +01:00
Erik Johnston
0e1d2d0628 Remove debug logging 2015-06-22 18:09:16 +01:00
Erik Johnston
19f0497ac3 Batch outside transactions 2015-06-22 18:06:13 +01:00
Erik Johnston
be29e152d1 Don't update current_state_events for outliers 2015-06-22 17:41:42 +01:00
Erik Johnston
a4ad9b9556 Only persist a maximum of 100 events at a time 2015-06-22 17:17:25 +01:00
Erik Johnston
96ce61b2b1 Implement persist_event*s* 2015-06-22 17:03:34 +01:00
Erik Johnston
03a9e4436b Add a many version of _handle_prev_events 2015-06-22 14:09:39 +01:00
Erik Johnston
a76e217d8e Add a many version of store_state_groups_txn 2015-06-22 14:09:25 +01:00
Erik Johnston
96be533f1f Use new store.persist_events function in federation handler 2015-06-22 11:42:10 +01:00
Erik Johnston
b39b294d1f Properly cache get_server_verify_keys 2015-06-19 17:20:58 +01:00
Erik Johnston
92d3fe8deb Merge branch 'develop' of github.com:matrix-org/synapse into erikj/persist_event_perf 2015-06-19 16:25:04 +01:00
Erik Johnston
b2f7fc4527 Ignore your own domain 2015-06-19 16:16:35 +01:00
Erik Johnston
c461482e1a Remove unnecessary store.have_events call 2015-06-19 15:35:56 +01:00
Erik Johnston
83df0aef58 PEP8 2015-06-19 15:08:01 +01:00
Erik Johnston
efe2785cab Cache get_server_verify_key 2015-06-19 15:06:51 +01:00
Erik Johnston
280e7cd874 Merge branch 'erikj/fix_log_context' of github.com:matrix-org/synapse into erikj/persist_event_perf 2015-06-19 11:55:10 +01:00
Erik Johnston
63141e77e7 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/persist_event_perf 2015-06-19 11:50:34 +01:00
Erik Johnston
6df6354f1c Don't bother storing things we don't need 2015-06-18 17:47:06 +01:00
11 changed files with 984 additions and 508 deletions

View File

@@ -25,11 +25,11 @@ from syutil.base64util import decode_base64, encode_base64
from synapse.api.errors import SynapseError, Codes
from synapse.util.retryutils import get_retry_limiter
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError
from OpenSSL import crypto
from collections import namedtuple
import urllib
import hashlib
import logging
@@ -38,6 +38,9 @@ import logging
logger = logging.getLogger(__name__)
KeyGroup = namedtuple("KeyGroup", ("server_name", "group_id", "key_ids"))
class Keyring(object):
def __init__(self, hs):
self.store = hs.get_datastore()
@@ -49,141 +52,273 @@ class Keyring(object):
self.key_downloads = {}
@defer.inlineCallbacks
def verify_json_for_server(self, server_name, json_object):
logger.debug("Verifying for %s", server_name)
key_ids = signature_ids(json_object, server_name)
if not key_ids:
raise SynapseError(
400,
"Not signed with a supported algorithm",
Codes.UNAUTHORIZED,
)
try:
verify_key = yield self.get_server_verify_key(server_name, key_ids)
except IOError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
)
raise SynapseError(
502,
"Error downloading keys for %s" % (server_name,),
Codes.UNAUTHORIZED,
)
except Exception as e:
logger.warn(
"Got Exception when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
)
raise SynapseError(
401,
"No key for %s with id %s" % (server_name, key_ids),
Codes.UNAUTHORIZED,
)
return self.verify_json_objects_for_server(
[(server_name, json_object)]
)[0]
try:
verify_signed_json(json_object, server_name, verify_key)
except:
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s" % (
server_name, verify_key.alg, verify_key.version
),
Codes.UNAUTHORIZED,
)
def verify_json_objects_for_server(self, server_and_json):
"""Bulk verfies signatures of json objects, bulk fetching keys as
necessary.
@defer.inlineCallbacks
def get_server_verify_key(self, server_name, key_ids):
"""Finds a verification key for the server with one of the key ids.
Trys to fetch the key from a trusted perspective server first.
Args:
server_name(str): The name of the server to fetch a key for.
keys_ids (list of str): The key_ids to check for.
server_and_json (list): List of pairs of (server_name, json_object)
Returns:
list of deferreds indicating success or failure to verify each
json object's signature for the given server_name.
"""
cached = yield self.store.get_server_verify_keys(server_name, key_ids)
group_id_to_json = {}
group_id_to_group = {}
group_ids = []
if cached:
defer.returnValue(cached[0])
return
next_group_id = 0
deferreds = {}
download = self.key_downloads.get(server_name)
for server_name, json_object in server_and_json:
logger.debug("Verifying for %s", server_name)
group_id = next_group_id
next_group_id += 1
group_ids.append(group_id)
if download is None:
download = self._get_server_verify_key_impl(server_name, key_ids)
download = ObservableDeferred(
download,
consumeErrors=True
key_ids = signature_ids(json_object, server_name)
if not key_ids:
deferreds[group_id] = defer.fail(SynapseError(
400,
"Not signed with a supported algorithm",
Codes.UNAUTHORIZED,
))
group = KeyGroup(server_name, group_id, key_ids)
group_id_to_group[group_id] = group
group_id_to_json[group_id] = json_object
@defer.inlineCallbacks
def handle_key_deferred(group, deferred):
server_name = group.server_name
try:
_, _, key_id, verify_key = yield deferred
except IOError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
)
raise SynapseError(
502,
"Error downloading keys for %s" % (server_name,),
Codes.UNAUTHORIZED,
)
except Exception as e:
logger.exception(
"Got Exception when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
)
raise SynapseError(
401,
"No key for %s with id %s" % (server_name, key_ids),
Codes.UNAUTHORIZED,
)
json_object = group_id_to_json[group.group_id]
try:
verify_signed_json(json_object, server_name, verify_key)
except:
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s" % (
server_name, verify_key.alg, verify_key.version
),
Codes.UNAUTHORIZED,
)
deferreds.update(self.get_server_verify_keys(
group_id_to_group
))
logger.info(
"Deferred count: %d vs. %d",
len(deferreds.items()),
len(server_and_json)
)
return [
handle_key_deferred(
group_id_to_group[g_id],
deferreds[g_id],
)
self.key_downloads[server_name] = download
for g_id in group_ids
]
@download.addBoth
def callback(ret):
del self.key_downloads[server_name]
return ret
def get_server_verify_keys(self, group_id_to_group):
merged_results = {}
r = yield download.observe()
defer.returnValue(r)
fns = (
self.get_keys_from_store, # First try the local store
self.get_keys_from_perspectives, # Then try via perspectives
self.get_keys_from_server, # Then try directly
)
group_deferreds = {
group_id: defer.Deferred()
for group_id in group_id_to_group
}
@defer.inlineCallbacks
def do_iterations():
missing_keys = {
group.server_name: key_id
for group in group_id_to_group.values()
for key_id in group.key_ids
}
for fn in fns:
results = yield fn(missing_keys.items())
merged_results.update(results)
missing_groups = {}
for group in group_id_to_group.values():
for key_id in group.key_ids:
if key_id in merged_results[group.server_name]:
group_deferreds.pop(group.group_id).callback((
group.group_id,
group.server_name,
key_id,
merged_results[group.server_name][key_id],
))
break
else:
missing_groups.setdefault(
group.server_name, []
).append(group)
if not missing_groups:
break
missing_keys = {
server_name: set(
key_id for group in groups for key_id in group.key_ids
)
for server_name, groups in missing_groups.items()
}
for group in missing_groups.values():
group_deferreds.pop(group.group_id).errback(SynapseError(
401,
"No key for %s with id %s" % (
group.server_name, group.key_ids,
),
Codes.UNAUTHORIZED,
))
def on_err(err):
for deferred in group_deferreds.values():
deferred.errback(err)
group_deferreds.clear()
do_iterations().addErrback(on_err)
return group_deferreds
@defer.inlineCallbacks
def _get_server_verify_key_impl(self, server_name, key_ids):
keys = None
def get_keys_from_store(self, server_name_and_key_ids):
res = yield defer.gatherResults(
[
self.store.get_server_verify_keys(server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
defer.returnValue(dict(zip(
[server_name for server_name, _ in server_name_and_key_ids],
res
)))
@defer.inlineCallbacks
def get_keys_from_perspectives(self, server_name_and_key_ids):
@defer.inlineCallbacks
def get_key(perspective_name, perspective_keys):
try:
result = yield self.get_server_verify_key_v2_indirect(
server_name, key_ids, perspective_name, perspective_keys
server_name_and_key_ids, perspective_name, perspective_keys
)
defer.returnValue(result)
except Exception as e:
logging.info(
"Unable to getting key %r for %r from %r: %s %s",
key_ids, server_name, perspective_name,
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e.message),
)
defer.returnValue({})
perspective_results = yield defer.gatherResults([
get_key(p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
])
results = yield defer.gatherResults(
[
get_key(p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
for results in perspective_results:
if results is not None:
keys = results
union_of_keys = {}
for result in results:
for server_name, keys in result.items():
union_of_keys.setdefault(server_name, {}).update(keys)
limiter = yield get_retry_limiter(
server_name,
self.clock,
self.store,
)
defer.returnValue(union_of_keys)
with limiter:
if not keys:
@defer.inlineCallbacks
def get_keys_from_server(self, server_name_and_key_ids):
@defer.inlineCallbacks
def get_key(server_name, key_ids):
limiter = yield get_retry_limiter(
server_name,
self.clock,
self.store,
)
with limiter:
keys = None
try:
keys = yield self.get_server_verify_key_v2_direct(
server_name, key_ids
)
except Exception as e:
logging.info(
logger.info(
"Unable to getting key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e.message),
)
if not keys:
keys = yield self.get_server_verify_key_v1_direct(
server_name, key_ids
)
if not keys:
keys = yield self.get_server_verify_key_v1_direct(
server_name, key_ids
)
for key_id in key_ids:
if key_id in keys:
defer.returnValue(keys[key_id])
return
raise ValueError("No verification key found for given key ids")
keys = {server_name: keys}
defer.returnValue(keys)
results = yield defer.gatherResults(
[
get_key(server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
merged = {}
for result in results:
merged.update(result)
defer.returnValue({
server_name: keys
for server_name, keys in merged.items()
if keys
})
@defer.inlineCallbacks
def get_server_verify_key_v2_indirect(self, server_name, key_ids,
def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,
perspective_name,
perspective_keys):
limiter = yield get_retry_limiter(
@@ -204,6 +339,7 @@ class Keyring(object):
u"minimum_valid_until_ts": 0
} for key_id in key_ids
}
for server_name, key_ids in server_names_and_key_ids
}
},
)
@@ -243,23 +379,24 @@ class Keyring(object):
" server %r" % (perspective_name,)
)
response_keys = yield self.process_v2_response(
server_name, perspective_name, response
processed_response = yield self.process_v2_response(
perspective_name, response
)
keys.update(response_keys)
for server_name, response_keys in processed_response.items():
keys.setdefault(server_name, {}).update(response_keys)
yield self.store_keys(
server_name=server_name,
from_server=perspective_name,
verify_keys=keys,
)
for server_name, response_keys in keys.items():
yield self.store_keys(
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
)
defer.returnValue(keys)
@defer.inlineCallbacks
def get_server_verify_key_v2_direct(self, server_name, key_ids):
keys = {}
for requested_key_id in key_ids:
@@ -295,25 +432,25 @@ class Keyring(object):
raise ValueError("TLS certificate not allowed by fingerprints")
response_keys = yield self.process_v2_response(
server_name=server_name,
from_server=server_name,
requested_id=requested_key_id,
requested_ids=[requested_key_id],
response_json=response,
)
keys.update(response_keys)
yield self.store_keys(
server_name=server_name,
from_server=server_name,
verify_keys=keys,
)
for server_name, verify_keys in keys.items():
yield self.store_keys(
server_name=server_name,
from_server=server_name,
verify_keys=verify_keys,
)
defer.returnValue(keys)
@defer.inlineCallbacks
def process_v2_response(self, server_name, from_server, response_json,
requested_id=None):
def process_v2_response(self, from_server, response_json,
requested_ids=[]):
time_now_ms = self.clock.time_msec()
response_keys = {}
verify_keys = {}
@@ -335,6 +472,8 @@ class Keyring(object):
verify_key.time_added = time_now_ms
old_verify_keys[key_id] = verify_key
results = {}
server_name = response_json["server_name"]
for key_id in response_json["signatures"].get(server_name, {}):
if key_id not in response_json["verify_keys"]:
raise ValueError(
@@ -357,28 +496,31 @@ class Keyring(object):
signed_key_json_bytes = encode_canonical_json(signed_key_json)
ts_valid_until_ms = signed_key_json[u"valid_until_ts"]
updated_key_ids = set()
if requested_id is not None:
updated_key_ids.add(requested_id)
updated_key_ids = set(requested_ids)
updated_key_ids.update(verify_keys)
updated_key_ids.update(old_verify_keys)
response_keys.update(verify_keys)
response_keys.update(old_verify_keys)
for key_id in updated_key_ids:
yield self.store.store_server_keys_json(
server_name=server_name,
key_id=key_id,
from_server=server_name,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
yield defer.gatherResults(
[
self.store.store_server_keys_json(
server_name=server_name,
key_id=key_id,
from_server=server_name,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
for key_id in updated_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
defer.returnValue(response_keys)
results[server_name] = response_keys
raise ValueError("No verification key found for given key ids")
defer.returnValue(results)
@defer.inlineCallbacks
def get_server_verify_key_v1_direct(self, server_name, key_ids):
@@ -462,8 +604,13 @@ class Keyring(object):
Returns:
A deferred that completes when the keys are stored.
"""
for key_id, key in verify_keys.items():
# TODO(markjh): Store whether the keys have expired.
yield self.store.store_server_verify_key(
server_name, server_name, key.time_added, key
)
# TODO(markjh): Store whether the keys have expired.
yield defer.gatherResults(
[
self.store.store_server_verify_key(
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)

View File

@@ -32,7 +32,8 @@ logger = logging.getLogger(__name__)
class FederationBase(object):
@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False):
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
include_none=False):
"""Takes a list of PDUs and checks the signatures and hashs of each
one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of
@@ -50,84 +51,108 @@ class FederationBase(object):
Returns:
Deferred : A list of PDUs that have valid signatures and hashes.
"""
deferreds = self._check_sigs_and_hashes(pdus)
signed_pdus = []
def callback(pdu):
return pdu
@defer.inlineCallbacks
def do(pdu):
try:
new_pdu = yield self._check_sigs_and_hash(pdu)
signed_pdus.append(new_pdu)
except SynapseError:
# FIXME: We should handle signature failures more gracefully.
def errback(failure, pdu):
failure.trap(SynapseError)
return None
def try_local_db(res, pdu):
if not res:
# Check local db.
new_pdu = yield self.store.get_event(
return self.store.get_event(
pdu.event_id,
allow_rejected=True,
allow_none=True,
)
if new_pdu:
signed_pdus.append(new_pdu)
return
return res
# Check pdu.origin
if pdu.origin != origin:
try:
new_pdu = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
outlier=outlier,
timeout=10000,
)
if new_pdu:
signed_pdus.append(new_pdu)
return
except:
pass
def try_remote(res, pdu):
if not res and pdu.origin != origin:
return self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
outlier=outlier,
timeout=10000,
).addErrback(lambda e: None)
return res
def warn(res, pdu):
if not res:
logger.warn(
"Failed to find copy of %s with valid signature",
pdu.event_id,
)
return res
yield defer.gatherResults(
[do(pdu) for pdu in pdus],
for pdu, deferred in zip(pdus, deferreds):
deferred.addCallbacks(
callback, errback, errbackArgs=[pdu]
).addCallback(
try_local_db, pdu
).addCallback(
try_remote, pdu
).addCallback(
warn, pdu
)
valid_pdus = yield defer.gatherResults(
deferreds,
consumeErrors=True
).addErrback(unwrapFirstError)
defer.returnValue(signed_pdus)
if include_none:
defer.returnValue(valid_pdus)
else:
defer.returnValue([p for p in valid_pdus if p])
@defer.inlineCallbacks
def _check_sigs_and_hash(self, pdu):
"""Throws a SynapseError if the PDU does not have the correct
return self._check_sigs_and_hashes([pdu])[0]
def _check_sigs_and_hashes(self, pdus):
"""Throws a SynapseError if a PDU does not have the correct
signatures.
Returns:
FrozenEvent: Either the given event or it redacted if it failed the
content hash check.
"""
# Check signatures are correct.
redacted_event = prune_event(pdu)
redacted_pdu_json = redacted_event.get_pdu_json()
try:
yield self.keyring.verify_json_for_server(
pdu.origin, redacted_pdu_json
)
except SynapseError:
redacted_pdus = [
prune_event(pdu)
for pdu in pdus
]
deferreds = self.keyring.verify_json_objects_for_server([
(p.origin, p.get_pdu_json())
for p in redacted_pdus
])
def callback(_, pdu, redacted):
if not check_event_content_hash(pdu):
logger.warn(
"Event content has been tampered, redacting %s: %s",
pdu.event_id, pdu.get_pdu_json()
)
return redacted
return pdu
def errback(failure, pdu):
failure.trap(SynapseError)
logger.warn(
"Signature check failed for %s",
pdu.event_id,
)
raise
return failure
if not check_event_content_hash(pdu):
logger.warn(
"Event content has been tampered, redacting.",
pdu.event_id,
for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
deferred.addCallbacks(
callback, errback,
callbackArgs=[pdu, redacted],
errbackArgs=[pdu],
)
defer.returnValue(redacted_event)
defer.returnValue(pdu)
return deferreds

View File

@@ -30,6 +30,7 @@ import synapse.metrics
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
import copy
import itertools
import logging
import random
@@ -167,7 +168,7 @@ class FederationClient(FederationBase):
# FIXME: We should handle signature failures more gracefully.
pdus[:] = yield defer.gatherResults(
[self._check_sigs_and_hash(pdu) for pdu in pdus],
self._check_sigs_and_hashes(pdus),
consumeErrors=True,
).addErrback(unwrapFirstError)
@@ -230,7 +231,7 @@ class FederationClient(FederationBase):
pdu = pdu_list[0]
# Check signatures are correct.
pdu = yield self._check_sigs_and_hash(pdu)
pdu = yield self._check_sigs_and_hashes([pdu])[0]
break
@@ -327,6 +328,9 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def make_join(self, destinations, room_id, user_id):
for destination in destinations:
if destination == self.server_name:
continue
try:
ret = yield self.transport_layer.make_join(
destination, room_id, user_id
@@ -353,6 +357,9 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_join(self, destinations, pdu):
for destination in destinations:
if destination == self.server_name:
continue
try:
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
@@ -374,17 +381,39 @@ class FederationClient(FederationBase):
for p in content.get("auth_chain", [])
]
signed_state, signed_auth = yield defer.gatherResults(
[
self._check_sigs_and_hash_and_fetch(
destination, state, outlier=True
),
self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True
)
],
consumeErrors=True
).addErrback(unwrapFirstError)
pdus = {
p.event_id: p
for p in itertools.chain(state, auth_chain)
}
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, pdus.values(),
outlier=True,
)
valid_pdus_map = {
p.event_id: p
for p in valid_pdus
}
# NB: We *need* to copy to ensure that we don't have multiple
# references being passed on, as that causes... issues.
signed_state = [
copy.copy(valid_pdus_map[p.event_id])
for p in state
if p.event_id in valid_pdus_map
]
signed_auth = [
valid_pdus_map[p.event_id]
for p in auth_chain
if p.event_id in valid_pdus_map
]
# NB: We *need* to copy to ensure that we don't have multiple
# references being passed on, as that causes... issues.
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
auth_chain.sort(key=lambda e: e.depth)
@@ -396,7 +425,7 @@ class FederationClient(FederationBase):
except CodeMessageException:
raise
except Exception as e:
logger.warn(
logger.exception(
"Failed to send_join via %s: %s",
destination, e.message
)

View File

@@ -138,26 +138,29 @@ class FederationHandler(BaseHandler):
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.)
event_infos = []
for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids:
continue
e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
seen_ids.add(e.event_id)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
event_infos.append({
"event": e,
"auth_events": auth,
})
seen_ids.add(e.event_id)
yield self._handle_new_events(
origin,
event_infos,
outliers=True
)
try:
_, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -292,38 +295,29 @@ class FederationHandler(BaseHandler):
).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results})
yield defer.gatherResults(
[
self._handle_new_event(
dest, a,
auth_events={
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in a.auth_events
},
)
for a in auth_events.values()
if a.event_id not in seen_events
],
consumeErrors=True,
).addErrback(unwrapFirstError)
ev_infos = []
for a in auth_events.values():
if a.event_id in seen_events:
continue
ev_infos.append({
"event": a,
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in a.auth_events
}
})
yield defer.gatherResults(
[
self._handle_new_event(
dest, event_map[e_id],
state=events_to_state[e_id],
backfilled=True,
auth_events={
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in event_map[e_id].auth_events
},
)
for e_id in events_to_state
],
consumeErrors=True
).addErrback(unwrapFirstError)
for e_id in events_to_state:
ev_infos.append({
"event": event_map[e_id],
"state": events_to_state[e_id],
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
for a_id, _ in event_map[e_id].auth_events
}
})
events.sort(key=lambda e: e.depth)
@@ -331,10 +325,14 @@ class FederationHandler(BaseHandler):
if event in events_to_state:
continue
yield self._handle_new_event(
dest, event,
backfilled=True,
)
ev_infos.append({
"event": event,
})
yield self._handle_new_events(
dest, ev_infos,
backfilled=True,
)
defer.returnValue(events)
@@ -600,32 +598,22 @@ class FederationHandler(BaseHandler):
# FIXME
pass
yield self._handle_auth_events(
origin, [e for e in auth_chain if e.event_id != event.event_id]
)
@defer.inlineCallbacks
def handle_state(e):
ev_infos = []
for e in itertools.chain(state, auth_chain):
if e.event_id == event.event_id:
return
continue
e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
auth_ids = [e_id for e_id, _ in e.auth_events]
ev_infos.append({
"event": e,
"auth_events": {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
})
yield defer.DeferredList([handle_state(e) for e in state])
yield self._handle_new_events(origin, ev_infos, outliers=True)
auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
@@ -1005,6 +993,68 @@ class FederationHandler(BaseHandler):
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False,
outliers=False):
contexts = yield defer.gatherResults(
[
self._prep_event(
origin,
ev_info["event"],
state=ev_info.get("state"),
backfilled=backfilled,
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
]
)
yield self.store.persist_events(
[
(ev_info["event"], context)
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
is_new_state=(not outliers and not backfilled),
)
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
event, old_state=state, outlier=outlier,
)
if not auth_events:
auth_events = context.current_state
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
if len(event.prev_events) == 1 and event.depth < 5:
c = yield self.store.get_event(
event.prev_events[0][0],
allow_none=True,
)
if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
try:
yield self.do_auth(
origin, event, context, auth_events=auth_events
)
except AuthError as e:
logger.warn(
"Rejecting %s because %s",
event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
defer.returnValue(context)
@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
missing):
@@ -1066,14 +1116,24 @@ class FederationHandler(BaseHandler):
@log_function
def do_auth(self, origin, event, context, auth_events):
# Check if we have all the auth events.
have_events = yield self.store.have_events(
[e_id for e_id, _ in event.auth_events]
)
current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(e_id for e_id, _ in event.auth_events)
if event_auth_events - current_state:
have_events = yield self.store.have_events(
event_auth_events - current_state
)
else:
have_events = {}
have_events.update({
e.event_id: ""
for e in auth_events.values()
})
seen_events = set(have_events.keys())
missing_auth = event_auth_events - seen_events
missing_auth = event_auth_events - seen_events - current_state
if missing_auth:
logger.info("Missing auth: %s", missing_auth)

View File

@@ -359,6 +359,89 @@ class EventFederationStore(SQLBaseStore):
self.get_latest_event_ids_in_room.invalidate, room_id
)
def _handle_mult_prev_events(self, txn, events):
"""
For the given event, update the event edges table and forward and
backward extremities tables.
"""
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": ev.event_id,
"prev_event_id": e_id,
"room_id": ev.room_id,
"is_state": False,
}
for ev in events
for e_id, _ in ev.prev_events
],
)
events_by_room = {}
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)
for room_id, room_events in events_by_room.items():
prevs = [
e_id for ev in room_events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
]
if prevs:
txn.execute(
"DELETE FROM event_forward_extremities"
" WHERE room_id = ?"
" AND event_id in (%s)" % (
",".join(["?"] * len(prevs)),
),
[room_id] + prevs,
)
query = (
"INSERT INTO event_forward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_edges WHERE prev_event_id = ?"
" )"
)
txn.executemany(
query,
[(ev.event_id, ev.room_id, ev.event_id) for ev in events]
)
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
])
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
txn.executemany(
query,
[(ev.event_id, ev.room_id) for ev in events]
)
for room_id in events_by_room:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, room_id
)
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`

View File

@@ -23,9 +23,7 @@ from synapse.events.utils import prune_event
from synapse.util.logcontext import preserve_context_over_deferred
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from synapse.crypto.event_signing import compute_event_reference_hash
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_json
from contextlib import contextmanager
@@ -46,6 +44,48 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False,
is_new_state=True):
if not events_and_contexts:
return
if backfilled:
if not self.min_token_deferred.called:
yield self.min_token_deferred
start = self.min_token - 1
self.min_token -= len(events_and_contexts) + 1
stream_orderings = range(start, self.min_token, -1)
@contextmanager
def stream_ordering_manager():
yield stream_orderings
stream_ordering_manager = stream_ordering_manager()
else:
stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
self, len(events_and_contexts)
)
with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
chunks = [
events_and_contexts[x:x+100]
for x in xrange(0, len(events_and_contexts), 100)
]
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(
yield self.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
is_new_state=is_new_state,
)
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
@@ -67,13 +107,13 @@ class EventsStore(SQLBaseStore):
try:
with stream_ordering_manager as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
current_state=current_state,
)
@@ -116,12 +156,7 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
# Remove the any existing cache entries for the event_id
txn.call_after(self._invalidate_get_event_cache, event.event_id)
is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -149,37 +184,78 @@ class EventsStore(SQLBaseStore):
}
)
outlier = event.internal_metadata.is_outlier()
if not outlier:
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
have_persisted = self._simple_select_one_txn(
return self._persist_events_txn(
txn,
table="events",
keyvalues={"event_id": event.event_id},
retcols=["event_id", "outlier"],
allow_none=True,
[(event, context)],
backfilled=backfilled,
is_new_state=is_new_state,
)
metadata_json = encode_json(
event.internal_metadata.get_dict(),
using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8")
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True):
# If we have already persisted this event, we don't need to do any
# more processing.
# The processing above must be done on every call to persist event,
# since they might not have happened on previous calls. For example,
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
if not outlier and have_persisted["outlier"]:
self._store_state_groups_txn(txn, event, context)
# Remove the any existing cache entries for the event_ids
for event, _ in events_and_contexts:
txn.call_after(self._invalidate_get_event_cache, event.event_id)
depth_updates = {}
for event, _ in events_and_contexts:
if event.internal_metadata.is_outlier():
continue
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
)
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
",".join(["?"] * len(events_and_contexts)),
),
[event.event_id for event, _ in events_and_contexts]
)
have_persisted = {
event_id: outlier
for event_id, outlier in txn.fetchall()
}
event_map = {}
to_remove = set()
for event, context in events_and_contexts:
# Handle the case of the list including the same event multiple
# times. The tricky thing here is when they differ by whether
# they are an outlier.
if event.event_id in event_map:
other = event_map[event.event_id]
if not other.internal_metadata.is_outlier():
to_remove.add(event)
continue
elif not event.internal_metadata.is_outlier():
to_remove.add(event)
continue
else:
to_remove.add(other)
event_map[event.event_id] = event
if event.event_id not in have_persisted:
continue
to_remove.add(event)
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
self._store_state_groups_txn(
txn, event, context,
)
metadata_json = encode_json(
event.internal_metadata.get_dict(),
using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8")
sql = (
"UPDATE event_json SET internal_metadata = ?"
@@ -198,94 +274,91 @@ class EventsStore(SQLBaseStore):
sql,
(False, event.event_id,)
)
events_and_contexts = filter(
lambda ec: ec[0] not in to_remove,
events_and_contexts
)
if not events_and_contexts:
return
if not outlier:
self._store_state_groups_txn(txn, event, context)
self._store_mult_state_groups_txn(txn, [
(event, context)
for event, context in events_and_contexts
if not event.internal_metadata.is_outlier()
])
self._handle_prev_events(
self._handle_mult_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
events=[event for event, _ in events_and_contexts],
)
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
event_dict = {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
self._store_room_members_txn(
txn,
[
event
for event, _ in events_and_contexts
if event.type == EventTypes.Member
]
}
)
self._simple_insert_txn(
def event_dict(event):
return {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
self._simple_insert_many_txn(
txn,
table="event_json",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": metadata_json,
"json": encode_json(
event_dict, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
},
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": encode_json(
event.internal_metadata.get_dict(),
using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
"json": encode_json(
event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
)
content = encode_json(
event.content, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8")
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
}
unrec = {
k: v
for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
"signatures",
"hashes",
"prev_events",
]
}
vals["unrecognized_keys"] = encode_json(
unrec, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8")
sql = (
"INSERT INTO events"
" (stream_ordering, topological_ordering, event_id, type,"
" room_id, content, processed, outlier, depth)"
" VALUES (?,?,?,?,?,?,?,?,?)"
)
txn.execute(
sql,
(
stream_ordering, event.depth, event.event_id, event.type,
event.room_id, content, True, outlier, event.depth
)
self._simple_insert_many_txn(
txn,
table="events",
values=[
{
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"depth": event.depth,
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(
event.content, using_frozen_dicts=USE_FROZEN_DICTS
).decode("UTF-8"),
}
for event, _ in events_and_contexts
],
)
if context.rejected:
@@ -293,19 +366,19 @@ class EventsStore(SQLBaseStore):
txn, event.event_id, context.rejected
)
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
txn, event.event_id, hash_alg, hash_bytes,
)
# for hash_alg, hash_base64 in event.hashes.items():
# hash_bytes = decode_base64(hash_base64)
# self._store_event_content_hash_txn(
# txn, event.event_id, hash_alg, hash_bytes,
# )
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_prev_event_hash_txn(
txn, event.event_id, prev_event_id, alg,
hash_bytes
)
# for prev_event_id, prev_hashes in event.prev_events:
# for alg, hash_base64 in prev_hashes.items():
# hash_bytes = decode_base64(hash_base64)
# self._store_prev_event_hash_txn(
# txn, event.event_id, prev_event_id, alg,
# hash_bytes
# )
self._simple_insert_many_txn(
txn,
@@ -316,16 +389,22 @@ class EventsStore(SQLBaseStore):
"room_id": event.room_id,
"auth_id": auth_id,
}
for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
],
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts]
)
if event.is_state():
state_events_and_contexts = filter(
lambda i: i[0].is_state(),
events_and_contexts,
)
state_values = []
for event, context in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -337,51 +416,55 @@ class EventsStore(SQLBaseStore):
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
self._simple_insert_txn(
txn,
"state_events",
vals,
)
state_values.append(vals)
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": True,
}
for e_id, h in event.prev_state
],
)
self._simple_insert_many_txn(
txn,
table="state_events",
values=state_values,
)
if is_new_state and not context.rejected:
txn.call_after(
self.get_current_state_for_key.invalidate,
event.room_id, event.type, event.state_key
)
self._simple_insert_many_txn(
txn,
table="event_edges",
values=[
{
"event_id": event.event_id,
"prev_event_id": prev_id,
"room_id": event.room_id,
"is_state": True,
}
for event, _ in state_events_and_contexts
for prev_id, _ in event.prev_state
],
)
if (event.type == EventTypes.Name
or event.type == EventTypes.Aliases):
if is_new_state:
for event, _ in state_events_and_contexts:
if not context.rejected:
txn.call_after(
self.get_room_name_and_aliases.invalidate,
event.room_id
)
self.get_current_state_for_key.invalidate,
event.room_id, event.type, event.state_key
)
self._simple_upsert_txn(
txn,
"current_state_events",
keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
values={
"event_id": event.event_id,
}
)
if event.type in [EventTypes.Name, EventTypes.Aliases]:
txn.call_after(
self.get_room_name_and_aliases.invalidate,
event.room_id
)
self._simple_upsert_txn(
txn,
"current_state_events",
keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
values={
"event_id": event.event_id,
}
)
return

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from _base import SQLBaseStore
from _base import SQLBaseStore, cached
from twisted.internet import defer
@@ -71,6 +71,25 @@ class KeyStore(SQLBaseStore):
desc="store_server_certificate",
)
@cached()
@defer.inlineCallbacks
def get_all_server_verify_keys(self, server_name):
rows = yield self._simple_select_list(
table="server_signature_keys",
keyvalues={
"server_name": server_name,
},
retcols=["key_id", "verify_key"],
desc="get_all_server_verify_keys",
)
defer.returnValue({
row["key_id"]: decode_verify_key_bytes(
row["key_id"], str(row["verify_key"])
)
for row in rows
})
@defer.inlineCallbacks
def get_server_verify_keys(self, server_name, key_ids):
"""Retrieve the NACL verification key for a given server for the given
@@ -81,24 +100,14 @@ class KeyStore(SQLBaseStore):
Returns:
(list of VerifyKey): The verification keys.
"""
sql = (
"SELECT key_id, verify_key FROM server_signature_keys"
" WHERE server_name = ?"
" AND key_id in (" + ",".join("?" for key_id in key_ids) + ")"
)
rows = yield self._execute_and_decode(
"get_server_verify_keys", sql, server_name, *key_ids
)
keys = []
for row in rows:
key_id = row["key_id"]
key_bytes = row["verify_key"]
key = decode_verify_key_bytes(key_id, str(key_bytes))
keys.append(key)
defer.returnValue(keys)
keys = yield self.get_all_server_verify_keys(server_name)
defer.returnValue({
k: keys[k]
for k in key_ids
if k in keys and keys[k]
})
@defer.inlineCallbacks
def store_server_verify_key(self, server_name, from_server, time_now_ms,
verify_key):
"""Stores a NACL verification key for the given server.
@@ -109,7 +118,7 @@ class KeyStore(SQLBaseStore):
ts_now_ms (int): The time now in milliseconds
verification_key (VerifyKey): The NACL verify key.
"""
return self._simple_upsert(
yield self._simple_upsert(
table="server_signature_keys",
keyvalues={
"server_name": server_name,
@@ -123,6 +132,8 @@ class KeyStore(SQLBaseStore):
desc="store_server_verify_key",
)
self.get_all_server_verify_keys.invalidate(server_name)
def store_server_keys_json(self, server_name, key_id, from_server,
ts_now_ms, ts_expires_ms, key_json_bytes):
"""Stores the JSON bytes for a set of keys from a server
@@ -152,6 +163,7 @@ class KeyStore(SQLBaseStore):
"ts_valid_until_ms": ts_expires_ms,
"key_json": buffer(key_json_bytes),
},
desc="store_server_keys_json",
)
def get_server_keys_json(self, server_keys):

View File

@@ -35,38 +35,28 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
def _store_room_member_txn(self, txn, event):
def _store_room_members_txn(self, txn, events):
"""Store a room member in the database.
"""
try:
target_user_id = event.state_key
except:
logger.exception(
"Failed to parse target_user_id=%s", target_user_id
)
raise
logger.debug(
"_store_room_member_txn: target_user_id=%s, membership=%s",
target_user_id,
event.membership,
)
self._simple_insert_txn(
self._simple_insert_many_txn(
txn,
"room_memberships",
{
"event_id": event.event_id,
"user_id": target_user_id,
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
}
table="room_memberships",
values=[
{
"event_id": event.event_id,
"user_id": event.state_key,
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
}
for event in events
]
)
txn.call_after(self.get_rooms_for_user.invalidate, target_user_id)
txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
txn.call_after(self.get_users_in_room.invalidate, event.room_id)
for event in events:
txn.call_after(self.get_rooms_for_user.invalidate, event.state_key)
txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
txn.call_after(self.get_users_in_room.invalidate, event.room_id)
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.

View File

@@ -18,6 +18,7 @@ from twisted.internet import defer
from _base import SQLBaseStore
from syutil.base64util import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
class SignatureStore(SQLBaseStore):
@@ -101,23 +102,26 @@ class SignatureStore(SQLBaseStore):
txn.execute(query, (event_id, ))
return {k: v for k, v in txn.fetchall()}
def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
hash_bytes):
def _store_event_reference_hashes_txn(self, txn, events):
"""Store a hash for a PDU
Args:
txn (cursor):
event_id (str): Id for the Event.
algorithm (str): Hashing algorithm.
hash_bytes (bytes): Hash function output bytes.
events (list): list of Events.
"""
self._simple_insert_txn(
vals = []
for event in events:
ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
vals.append({
"event_id": event.event_id,
"algorithm": ref_alg,
"hash": buffer(ref_hash_bytes),
})
self._simple_insert_many_txn(
txn,
"event_reference_hashes",
{
"event_id": event_id,
"algorithm": algorithm,
"hash": buffer(hash_bytes),
},
table="event_reference_hashes",
values=vals,
)
def _get_event_signatures_txn(self, txn, event_id):

View File

@@ -100,16 +100,23 @@ class StateStore(SQLBaseStore):
)
def _store_state_groups_txn(self, txn, event, context):
if context.current_state is None:
return
return self._store_mult_state_groups_txn(txn, [(event, context)])
state_events = dict(context.current_state)
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
state_groups = {}
for event, context in events_and_contexts:
if context.current_state is None:
continue
if event.is_state():
state_events[(event.type, event.state_key)] = event
if context.state_group is not None:
state_groups[event.event_id] = context.state_group
continue
state_events = dict(context.current_state)
if event.is_state():
state_events[(event.type, event.state_key)] = event
state_group = context.state_group
if not state_group:
state_group = self._state_groups_id_gen.get_next_txn(txn)
self._simple_insert_txn(
txn,
@@ -135,14 +142,19 @@ class StateStore(SQLBaseStore):
for state in state_events.values()
],
)
state_groups[event.event_id] = state_group
self._simple_insert_txn(
self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
values={
"state_group": state_group,
"event_id": event.event_id,
},
values=[
{
"state_group": state_groups[event.event_id],
"event_id": event.event_id,
}
for event, context in events_and_contexts
if context.current_state is not None
],
)
@defer.inlineCallbacks

View File

@@ -107,6 +107,37 @@ class StreamIdGenerator(object):
defer.returnValue(manager())
@defer.inlineCallbacks
def get_next_mult(self, store, n):
"""
Usage:
with yield stream_id_gen.get_next(store, n) as stream_ids:
# ... persist events ...
"""
if not self._current_max:
yield store.runInteraction(
"_compute_current_max",
self._get_or_compute_current_max,
)
with self._lock:
next_ids = range(self._current_max + 1, self._current_max + n + 1)
self._current_max += n
for next_id in next_ids:
self._unfinished_ids.append(next_id)
@contextlib.contextmanager
def manager():
try:
yield next_ids
finally:
with self._lock:
for next_id in next_ids:
self._unfinished_ids.remove(next_id)
defer.returnValue(manager())
@defer.inlineCallbacks
def get_max_token(self, store):
"""Returns the maximum stream id such that all stream ids less than or