mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
12 Commits
devon/rust
...
erikj/join
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67cd168522 | ||
|
|
6ec29fa2a7 | ||
|
|
414c5683b9 | ||
|
|
5a117376e1 | ||
|
|
e1750fd75d | ||
|
|
1a2113af55 | ||
|
|
d381eae552 | ||
|
|
96a1eb77f8 | ||
|
|
717fca45b4 | ||
|
|
d3e555f90d | ||
|
|
20a57f5354 | ||
|
|
0ee0d7b460 |
@@ -43,7 +43,11 @@ from synapse.api.errors import (
|
||||
from synapse.config.key import TrustedKeyServer
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import prune_event_dict
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.context import (
|
||||
defer_to_thread,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import unwrapFirstError
|
||||
@@ -161,6 +165,7 @@ class Keyring:
|
||||
self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None
|
||||
):
|
||||
self.clock = hs.get_clock()
|
||||
self.reactor = hs.get_reactor()
|
||||
|
||||
if key_fetchers is None:
|
||||
key_fetchers = (
|
||||
@@ -288,7 +293,9 @@ class Keyring:
|
||||
verify_key = key_result.verify_key
|
||||
json_object = verify_request.get_json_object()
|
||||
try:
|
||||
verify_signed_json(
|
||||
await defer_to_thread(
|
||||
self.reactor,
|
||||
verify_signed_json,
|
||||
json_object,
|
||||
verify_request.server_name,
|
||||
verify_key,
|
||||
@@ -544,22 +551,18 @@ class BaseV2KeyFetcher(KeyFetcher):
|
||||
|
||||
key_json_bytes = encode_canonical_json(response_json)
|
||||
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store.store_server_keys_json,
|
||||
server_name=server_name,
|
||||
key_id=key_id,
|
||||
from_server=from_server,
|
||||
ts_now_ms=time_added_ms,
|
||||
ts_expires_ms=ts_valid_until_ms,
|
||||
key_json_bytes=key_json_bytes,
|
||||
)
|
||||
for key_id in verify_keys
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
await self.store.store_server_keys_json_multi(
|
||||
[
|
||||
(
|
||||
server_name,
|
||||
key_id,
|
||||
from_server,
|
||||
time_added_ms,
|
||||
ts_valid_until_ms,
|
||||
key_json_bytes,
|
||||
)
|
||||
for key_id in verify_keys
|
||||
],
|
||||
)
|
||||
|
||||
return verify_keys
|
||||
|
||||
@@ -56,6 +56,7 @@ from synapse.api.room_versions import (
|
||||
from synapse.events import EventBase, builder
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.federation.transport.client import SendJoinResponse
|
||||
from synapse.logging.opentracing import start_active_span
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
@@ -465,18 +466,18 @@ class FederationClient(FederationBase):
|
||||
pdu.event_id, allow_rejected=True, allow_none=True
|
||||
)
|
||||
|
||||
pdu_origin = get_domain_from_id(pdu.sender)
|
||||
if not res and pdu_origin != origin:
|
||||
try:
|
||||
res = await self.get_pdu(
|
||||
destinations=[pdu_origin],
|
||||
event_id=pdu.event_id,
|
||||
room_version=room_version,
|
||||
outlier=outlier,
|
||||
timeout=10000,
|
||||
)
|
||||
except SynapseError:
|
||||
pass
|
||||
# pdu_origin = get_domain_from_id(pdu.sender)
|
||||
# if not res and pdu_origin != origin:
|
||||
# try:
|
||||
# res = await self.get_pdu(
|
||||
# destinations=[pdu_origin],
|
||||
# event_id=pdu.event_id,
|
||||
# room_version=room_version,
|
||||
# outlier=outlier,
|
||||
# timeout=10000,
|
||||
# )
|
||||
# except SynapseError:
|
||||
# pass
|
||||
|
||||
if not res:
|
||||
logger.warning(
|
||||
@@ -754,7 +755,8 @@ class FederationClient(FederationBase):
|
||||
"""
|
||||
|
||||
async def send_request(destination) -> SendJoinResult:
|
||||
response = await self._do_send_join(room_version, destination, pdu)
|
||||
with start_active_span("_do_send_join"):
|
||||
response = await self._do_send_join(room_version, destination, pdu)
|
||||
|
||||
# If an event was returned (and expected to be returned):
|
||||
#
|
||||
@@ -804,6 +806,34 @@ class FederationClient(FederationBase):
|
||||
% (create_room_version,)
|
||||
)
|
||||
|
||||
logger.info("Got from send_join %d events", len(state) + len(auth_chain))
|
||||
|
||||
with start_active_span("filter_auth_chain"):
|
||||
event_map = {e.event_id: e for e in auth_chain}
|
||||
|
||||
state = [
|
||||
e
|
||||
for e in state
|
||||
if e.type != EventTypes.Member or e.membership != Membership.LEAVE
|
||||
]
|
||||
|
||||
roots = list(state)
|
||||
new_auth_chain_ids = set()
|
||||
|
||||
while roots:
|
||||
e = roots.pop()
|
||||
|
||||
for aid in e.auth_event_ids():
|
||||
if aid in new_auth_chain_ids:
|
||||
continue
|
||||
|
||||
a = event_map.get(aid)
|
||||
if a:
|
||||
roots.append(a)
|
||||
new_auth_chain_ids.add(aid)
|
||||
|
||||
auth_chain = [event_map[aid] for aid in new_auth_chain_ids]
|
||||
|
||||
logger.info(
|
||||
"Processing from send_join %d events", len(state) + len(auth_chain)
|
||||
)
|
||||
@@ -814,19 +844,21 @@ class FederationClient(FederationBase):
|
||||
valid_pdus_map: Dict[str, EventBase] = {}
|
||||
|
||||
async def _execute(pdu: EventBase) -> None:
|
||||
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
|
||||
pdu=pdu,
|
||||
origin=destination,
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
with start_active_span("_check_sigs_and_hash_and_fetch_one"):
|
||||
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
|
||||
pdu=pdu,
|
||||
origin=destination,
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
if valid_pdu:
|
||||
valid_pdus_map[valid_pdu.event_id] = valid_pdu
|
||||
|
||||
await concurrently_execute(
|
||||
_execute, itertools.chain(state, auth_chain), 10000
|
||||
)
|
||||
with start_active_span("check_sigs"):
|
||||
await concurrently_execute(
|
||||
_execute, itertools.chain(state, auth_chain), 10000
|
||||
)
|
||||
|
||||
# NB: We *need* to copy to ensure that we don't have multiple
|
||||
# references being passed on, as that causes... issues.
|
||||
|
||||
@@ -52,6 +52,7 @@ from synapse.logging.context import (
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.opentracing import start_active_span
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationCleanRoomRestServlet,
|
||||
@@ -452,14 +453,15 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
logger.debug("Joining %s to %s", joinee, room_id)
|
||||
|
||||
origin, event, room_version_obj = await self._make_and_verify_event(
|
||||
target_hosts,
|
||||
room_id,
|
||||
joinee,
|
||||
"join",
|
||||
content,
|
||||
params={"ver": KNOWN_ROOM_VERSIONS},
|
||||
)
|
||||
with start_active_span("make_join"):
|
||||
origin, event, room_version_obj = await self._make_and_verify_event(
|
||||
target_hosts,
|
||||
room_id,
|
||||
joinee,
|
||||
"join",
|
||||
content,
|
||||
params={"ver": KNOWN_ROOM_VERSIONS},
|
||||
)
|
||||
|
||||
# This shouldn't happen, because the RoomMemberHandler has a
|
||||
# linearizer lock which only allows one operation per user per room
|
||||
@@ -480,9 +482,10 @@ class FederationHandler(BaseHandler):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
ret = await self.federation_client.send_join(
|
||||
host_list, event, room_version_obj
|
||||
)
|
||||
with start_active_span("send_join"):
|
||||
ret = await self.federation_client.send_join(
|
||||
host_list, event, room_version_obj
|
||||
)
|
||||
|
||||
event = ret.event
|
||||
origin = ret.origin
|
||||
@@ -510,9 +513,10 @@ class FederationHandler(BaseHandler):
|
||||
auth_events=auth_chain,
|
||||
)
|
||||
|
||||
max_stream_id = await self._persist_auth_tree(
|
||||
origin, room_id, auth_chain, state, event, room_version_obj
|
||||
)
|
||||
with start_active_span("_persist_auth_tree"):
|
||||
max_stream_id = await self._persist_auth_tree(
|
||||
origin, room_id, auth_chain, state, event, room_version_obj
|
||||
)
|
||||
|
||||
# We wait here until this instance has seen the events come down
|
||||
# replication (if we're using replication) as the below uses caches.
|
||||
@@ -1139,51 +1143,54 @@ class FederationHandler(BaseHandler):
|
||||
if e_id not in event_map:
|
||||
missing_auth_events.add(e_id)
|
||||
|
||||
for e_id in missing_auth_events:
|
||||
m_ev = await self.federation_client.get_pdu(
|
||||
[origin],
|
||||
e_id,
|
||||
room_version=room_version,
|
||||
outlier=True,
|
||||
timeout=10000,
|
||||
)
|
||||
if m_ev and m_ev.event_id == e_id:
|
||||
event_map[e_id] = m_ev
|
||||
else:
|
||||
logger.info("Failed to find auth event %r", e_id)
|
||||
with start_active_span("fetching.missing_auth_events"):
|
||||
for e_id in missing_auth_events:
|
||||
m_ev = await self.federation_client.get_pdu(
|
||||
[origin],
|
||||
e_id,
|
||||
room_version=room_version,
|
||||
outlier=True,
|
||||
timeout=10000,
|
||||
)
|
||||
if m_ev and m_ev.event_id == e_id:
|
||||
event_map[e_id] = m_ev
|
||||
else:
|
||||
logger.info("Failed to find auth event %r", e_id)
|
||||
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
auth_for_e = {
|
||||
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
|
||||
for e_id in e.auth_event_ids()
|
||||
if e_id in event_map
|
||||
}
|
||||
if create_event:
|
||||
auth_for_e[(EventTypes.Create, "")] = create_event
|
||||
with start_active_span("authing_events"):
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
auth_for_e = {
|
||||
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
|
||||
for e_id in e.auth_event_ids()
|
||||
if e_id in event_map
|
||||
}
|
||||
if create_event:
|
||||
auth_for_e[(EventTypes.Create, "")] = create_event
|
||||
|
||||
try:
|
||||
event_auth.check(room_version, e, auth_events=auth_for_e)
|
||||
except SynapseError as err:
|
||||
# we may get SynapseErrors here as well as AuthErrors. For
|
||||
# instance, there are a couple of (ancient) events in some
|
||||
# rooms whose senders do not have the correct sigil; these
|
||||
# cause SynapseErrors in auth.check. We don't want to give up
|
||||
# the attempt to federate altogether in such cases.
|
||||
try:
|
||||
event_auth.check(room_version, e, auth_events=auth_for_e)
|
||||
except SynapseError as err:
|
||||
# we may get SynapseErrors here as well as AuthErrors. For
|
||||
# instance, there are a couple of (ancient) events in some
|
||||
# rooms whose senders do not have the correct sigil; these
|
||||
# cause SynapseErrors in auth.check. We don't want to give up
|
||||
# the attempt to federate altogether in such cases.
|
||||
|
||||
logger.warning("Rejecting %s because %s", e.event_id, err.msg)
|
||||
logger.warning("Rejecting %s because %s", e.event_id, err.msg)
|
||||
|
||||
if e == event:
|
||||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
if e == event:
|
||||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
if auth_events or state:
|
||||
await self._federation_event_handler.persist_events_and_notify(
|
||||
room_id,
|
||||
[
|
||||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
],
|
||||
)
|
||||
with start_active_span("persist_events_and_notify.state"):
|
||||
await self._federation_event_handler.persist_events_and_notify(
|
||||
room_id,
|
||||
[
|
||||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
],
|
||||
)
|
||||
|
||||
new_event_context = await self.state_handler.compute_event_context(
|
||||
event, old_state=state
|
||||
|
||||
@@ -33,10 +33,11 @@ from prometheus_client import Counter
|
||||
|
||||
import synapse.metrics
|
||||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersions
|
||||
from synapse.crypto.event_signing import compute_event_reference_hash
|
||||
from synapse.events import EventBase # noqa: F401
|
||||
from synapse.events.snapshot import EventContext # noqa: F401
|
||||
from synapse.logging.opentracing import start_active_span
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
@@ -241,7 +242,7 @@ class PersistEventsStore:
|
||||
txn.execute(sql + clause, args)
|
||||
results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
for chunk in batch_iter(event_ids, 1000000000000000):
|
||||
await self.db_pool.runInteraction(
|
||||
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
|
||||
)
|
||||
@@ -304,7 +305,7 @@ class PersistEventsStore:
|
||||
to_recursively_check.append(prev_event_id)
|
||||
existing_prevs.add(prev_event_id)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
for chunk in batch_iter(event_ids, 100000000000000000):
|
||||
await self.db_pool.runInteraction(
|
||||
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
|
||||
)
|
||||
@@ -382,7 +383,8 @@ class PersistEventsStore:
|
||||
# Insert into event_to_state_groups.
|
||||
self._store_event_state_mappings_txn(txn, events_and_contexts)
|
||||
|
||||
self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
|
||||
with start_active_span("_persist_event_auth_chain_txn"):
|
||||
self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
|
||||
|
||||
# _store_rejected_events_txn filters out any events which were
|
||||
# rejected, and returns the filtered list.
|
||||
@@ -393,12 +395,13 @@ class PersistEventsStore:
|
||||
# From this point onwards the events are only ones that weren't
|
||||
# rejected.
|
||||
|
||||
self._update_metadata_tables_txn(
|
||||
txn,
|
||||
events_and_contexts=events_and_contexts,
|
||||
all_events_and_contexts=all_events_and_contexts,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
with start_active_span("_update_metadata_tables_txn"):
|
||||
self._update_metadata_tables_txn(
|
||||
txn,
|
||||
events_and_contexts=events_and_contexts,
|
||||
all_events_and_contexts=all_events_and_contexts,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
|
||||
# We call this last as it assumes we've inserted the events into
|
||||
# room_memberships, where applicable.
|
||||
@@ -1298,6 +1301,8 @@ class PersistEventsStore:
|
||||
},
|
||||
)
|
||||
|
||||
self._handle_mult_prev_events(txn, [event])
|
||||
|
||||
sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
|
||||
txn.execute(sql, (False, event.event_id))
|
||||
|
||||
@@ -1446,7 +1451,11 @@ class PersistEventsStore:
|
||||
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
|
||||
|
||||
def _update_metadata_tables_txn(
|
||||
self, txn, events_and_contexts, all_events_and_contexts, backfilled
|
||||
self,
|
||||
txn,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
all_events_and_contexts,
|
||||
backfilled,
|
||||
):
|
||||
"""Update all the miscellaneous tables for new events
|
||||
|
||||
@@ -1537,7 +1546,12 @@ class PersistEventsStore:
|
||||
|
||||
# Insert event_reference_hashes table.
|
||||
self._store_event_reference_hashes_txn(
|
||||
txn, [event for event, _ in events_and_contexts]
|
||||
txn,
|
||||
[
|
||||
event
|
||||
for event, _ in events_and_contexts
|
||||
if event.format_version == EventFormatVersions.V1
|
||||
],
|
||||
)
|
||||
|
||||
# Prefill the event cache
|
||||
@@ -2120,7 +2134,7 @@ class PersistEventsStore:
|
||||
values={"min_depth": depth},
|
||||
)
|
||||
|
||||
def _handle_mult_prev_events(self, txn, events):
|
||||
def _handle_mult_prev_events(self, txn, events: Iterable[EventBase]):
|
||||
"""
|
||||
For the given event, update the event edges table and forward and
|
||||
backward extremities tables.
|
||||
@@ -2137,6 +2151,7 @@ class PersistEventsStore:
|
||||
}
|
||||
for ev in events
|
||||
for e_id in ev.prev_event_ids()
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -138,6 +138,19 @@ class KeyStore(SQLBaseStore):
|
||||
for i in invalidations:
|
||||
invalidate((i,))
|
||||
|
||||
async def store_server_keys_json_multi(
|
||||
self,
|
||||
entries: List[Tuple[str, str, str, int, int, bytes]],
|
||||
):
|
||||
await self.db_pool.simple_upsert_many(
|
||||
table="server_keys_json",
|
||||
key_names=("server_name", "key_id", "from_server"),
|
||||
key_values=[e[:3] for e in entries],
|
||||
value_names=("ts_added_ms", "ts_valid_until_ms", "key_json"),
|
||||
value_values=[(e[3], e[4], db_binary_type(e[5])) for e in entries],
|
||||
desc="store_server_keys_json_multi",
|
||||
)
|
||||
|
||||
async def store_server_keys_json(
|
||||
self,
|
||||
server_name: str,
|
||||
|
||||
@@ -412,8 +412,8 @@ class EventsPersistenceStorage:
|
||||
return replaced_events
|
||||
|
||||
chunks = [
|
||||
events_and_contexts[x : x + 100]
|
||||
for x in range(0, len(events_and_contexts), 100)
|
||||
events_and_contexts[x : x + 100000000]
|
||||
for x in range(0, len(events_and_contexts), 10000000)
|
||||
]
|
||||
|
||||
for chunk in chunks:
|
||||
@@ -445,7 +445,9 @@ class EventsPersistenceStorage:
|
||||
potentially_left_users: Set[str] = set()
|
||||
|
||||
if not backfilled:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
with Measure(
|
||||
self._clock, "_calculate_state_and_extrem"
|
||||
), opentracing.start_active_span("_calculate_state_and_extrem"):
|
||||
# Work out the new "current state" for each room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
# calculating the state from that.
|
||||
|
||||
Reference in New Issue
Block a user