mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
23 Commits
erikj/dock
...
babolivier
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64805e6811 | ||
|
|
115b79948f | ||
|
|
202fc2cc00 | ||
|
|
94342ce65d | ||
|
|
4ec2df6dda | ||
|
|
d85851bcca | ||
|
|
422da6c52c | ||
|
|
a0c8c1fc49 | ||
|
|
eca7ece93f | ||
|
|
8a090731c9 | ||
|
|
f8b891c5e7 | ||
|
|
0d7ec185cb | ||
|
|
dbf7545ccd | ||
|
|
7f9a087947 | ||
|
|
b1db74b39a | ||
|
|
3cba0dccb3 | ||
|
|
256333718b | ||
|
|
1167fe1ad4 | ||
|
|
7c7b084d50 | ||
|
|
8d316f0060 | ||
|
|
b8deaa077e | ||
|
|
7fa5156292 | ||
|
|
68105fca36 |
@@ -46,7 +46,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
if not hasattr(event, "room_id"):
|
||||
raise AuthError(500, "Event has no room_id: %s" % event)
|
||||
|
||||
if do_sig_check:
|
||||
if False and do_sig_check: # Disable all sig checks for meshsim
|
||||
sender_domain = get_domain_from_id(event.sender)
|
||||
event_id_domain = get_domain_from_id(event.event_id)
|
||||
|
||||
|
||||
@@ -14,9 +14,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import string
|
||||
|
||||
from synapse.types import EventID
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from . import EventBase, FrozenEvent, _event_dict_property
|
||||
|
||||
@@ -49,10 +49,10 @@ class EventBuilderFactory(object):
|
||||
self.event_id_count = 0
|
||||
|
||||
def create_event_id(self):
|
||||
i = str(self.event_id_count)
|
||||
i = self.event_id_count
|
||||
self.event_id_count += 1
|
||||
|
||||
local_part = str(int(self.clock.time())) + i + random_string(5)
|
||||
local_part = _encode_id(i)
|
||||
|
||||
e_id = EventID(local_part, self.hostname)
|
||||
|
||||
@@ -73,3 +73,19 @@ class EventBuilderFactory(object):
|
||||
key_values["signatures"] = {}
|
||||
|
||||
return EventBuilder(key_values=key_values,)
|
||||
|
||||
|
||||
def _numberToBase(n, b):
|
||||
if n == 0:
|
||||
return [0]
|
||||
digits = []
|
||||
while n:
|
||||
digits.append(int(n % b))
|
||||
n //= b
|
||||
return digits[::-1]
|
||||
|
||||
|
||||
def _encode_id(i):
|
||||
digits = string.digits + string.ascii_letters
|
||||
val_slice = _numberToBase(i, len(digits))
|
||||
return "".join(digits[x] for x in val_slice)
|
||||
|
||||
@@ -26,7 +26,7 @@ from synapse.crypto.event_signing import check_event_content_hash
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.types import get_domain_from_id, EventID
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -136,6 +136,7 @@ class FederationBase(object):
|
||||
* throws a SynapseError if the signature check failed.
|
||||
The deferreds run their callbacks in the sentinel logcontext.
|
||||
"""
|
||||
return [defer.succeed(p) for p in pdus]
|
||||
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
|
||||
|
||||
ctx = logcontext.LoggingContext.current_context()
|
||||
@@ -317,7 +318,7 @@ def event_from_pdu_json(pdu_json, outlier=False):
|
||||
|
||||
depth = pdu_json['depth']
|
||||
if not isinstance(depth, six.integer_types):
|
||||
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
|
||||
raise SynapseError(400, "Depth %r not an integer" % (depth, ),
|
||||
Codes.BAD_JSON)
|
||||
|
||||
if depth < 0:
|
||||
@@ -325,6 +326,31 @@ def event_from_pdu_json(pdu_json, outlier=False):
|
||||
elif depth > MAX_DEPTH:
|
||||
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
|
||||
|
||||
event_id = pdu_json["event_id"]
|
||||
if event_id[0] != "$":
|
||||
pdu_json["event_id"] = EventID(
|
||||
event_id,
|
||||
get_domain_from_id(pdu_json["sender"]),
|
||||
).to_string()
|
||||
event_id = pdu_json["event_id"]
|
||||
|
||||
if "auth_events" in pdu_json:
|
||||
pdu_json["auth_events"] = [
|
||||
(e, {}) if isinstance(e, six.string_types) else e
|
||||
for e in pdu_json["auth_events"]
|
||||
]
|
||||
|
||||
if "prev_events" in pdu_json:
|
||||
pdu_json["prev_events"] = [
|
||||
(e, {}) if isinstance(e, six.string_types) else e
|
||||
for e in pdu_json["prev_events"]
|
||||
]
|
||||
|
||||
if "origin" not in pdu_json:
|
||||
pdu_json["origin"] = get_domain_from_id(pdu_json["sender"])
|
||||
|
||||
logger.info("Unmangled event to: %s", pdu_json)
|
||||
|
||||
event = FrozenEvent(
|
||||
pdu_json
|
||||
)
|
||||
|
||||
@@ -39,6 +39,7 @@ from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.federation.units import _mangle_pdu
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -696,7 +697,7 @@ class FederationClient(FederationBase):
|
||||
destination=destination,
|
||||
room_id=room_id,
|
||||
event_id=event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
content=_mangle_pdu(pdu.get_pdu_json(time_now)),
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if e.code == 403:
|
||||
|
||||
@@ -36,7 +36,7 @@ from synapse.api.errors import (
|
||||
from synapse.crypto.event_signing import compute_event_signature
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.federation.units import Edu, Transaction, _mangle_pdu
|
||||
from synapse.http.endpoint import parse_server_name
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
@@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logcontext import nested_logging_context
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
|
||||
# when processing incoming transactions, we try to handle multiple rooms in
|
||||
# parallel, up to this limit.
|
||||
TRANSACTION_CONCURRENCY_LIMIT = 10
|
||||
@@ -233,7 +234,16 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
|
||||
if hasattr(transaction, "edus"):
|
||||
for edu in (Edu(**x) for x in transaction.edus):
|
||||
logger.info("Got edus: %s", transaction.edus)
|
||||
|
||||
edus = []
|
||||
for x in transaction.edus:
|
||||
try:
|
||||
edus.append(Edu(**x))
|
||||
except Exception:
|
||||
logger.exception("Failed to handle EDU: %s", x)
|
||||
|
||||
for edu in edus:
|
||||
yield self.received_edu(
|
||||
origin,
|
||||
edu.edu_type,
|
||||
@@ -329,8 +339,8 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
|
||||
defer.returnValue({
|
||||
"pdus": [pdu.get_pdu_json() for pdu in pdus],
|
||||
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
|
||||
"pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
|
||||
"auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -375,7 +385,7 @@ class FederationServer(FederationBase):
|
||||
yield self.check_server_matches_acl(origin_host, pdu.room_id)
|
||||
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
|
||||
time_now = self._clock.time_msec()
|
||||
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
|
||||
defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_send_join_request(self, origin, content):
|
||||
@@ -389,9 +399,9 @@ class FederationServer(FederationBase):
|
||||
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
|
||||
time_now = self._clock.time_msec()
|
||||
defer.returnValue((200, {
|
||||
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
|
||||
"state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
|
||||
"auth_chain": [
|
||||
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
|
||||
_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
|
||||
],
|
||||
}))
|
||||
|
||||
@@ -424,7 +434,7 @@ class FederationServer(FederationBase):
|
||||
time_now = self._clock.time_msec()
|
||||
auth_pdus = yield self.handler.on_event_auth(event_id)
|
||||
res = {
|
||||
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
|
||||
"auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
|
||||
}
|
||||
defer.returnValue((200, res))
|
||||
|
||||
@@ -473,7 +483,7 @@ class FederationServer(FederationBase):
|
||||
time_now = self._clock.time_msec()
|
||||
send_content = {
|
||||
"auth_chain": [
|
||||
e.get_pdu_json(time_now)
|
||||
_mangle_pdu(e.get_pdu_json(time_now))
|
||||
for e in ret["auth_chain"]
|
||||
],
|
||||
"rejects": ret.get("rejects", []),
|
||||
@@ -549,7 +559,7 @@ class FederationServer(FederationBase):
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
defer.returnValue({
|
||||
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
|
||||
"events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
|
||||
})
|
||||
|
||||
@log_function
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
# limitations under the License.
|
||||
import datetime
|
||||
import logging
|
||||
import string
|
||||
import json
|
||||
|
||||
from six import itervalues
|
||||
|
||||
@@ -42,6 +44,8 @@ from .units import Edu, Transaction
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
|
||||
|
||||
sent_pdus_destination_dist_count = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations:count", ""
|
||||
)
|
||||
@@ -128,7 +132,7 @@ class TransactionQueue(object):
|
||||
self.last_device_list_stream_id_by_dest = {}
|
||||
|
||||
# HACK to get unique tx id
|
||||
self._next_txn_id = int(self.clock.time_msec())
|
||||
self._next_txn_id = 1
|
||||
|
||||
self._order = 1
|
||||
|
||||
@@ -260,6 +264,15 @@ class TransactionQueue(object):
|
||||
destinations.discard(self.server_name)
|
||||
logger.debug("Sending to: %s", str(destinations))
|
||||
|
||||
pdu_logger.info(
|
||||
"SendingPDU",
|
||||
extra={
|
||||
"event_id": pdu.event_id, "room_id": pdu.room_id,
|
||||
"destinations": json.dumps(list(destinations)),
|
||||
"server": self.server_name,
|
||||
},
|
||||
)
|
||||
|
||||
if not destinations:
|
||||
return
|
||||
|
||||
@@ -439,16 +452,22 @@ class TransactionQueue(object):
|
||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||
|
||||
# We can only include at most 50 PDUs per transactions
|
||||
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
|
||||
pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
|
||||
if leftover_pdus:
|
||||
self.pending_pdus_by_dest[destination] = leftover_pdus
|
||||
# self.pending_pdus_by_dest[destination] = leftover_pdus
|
||||
for _, _, p_span in leftover_pdus:
|
||||
p_span.set_tag("success", False)
|
||||
p_span.log_kv({"result": "dropped"})
|
||||
p_span.finish()
|
||||
|
||||
logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
|
||||
|
||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||
|
||||
# We can only include at most 100 EDUs per transactions
|
||||
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
|
||||
if leftover_edus:
|
||||
self.pending_edus_by_dest[destination] = leftover_edus
|
||||
pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
|
||||
# if leftover_edus:
|
||||
# self.pending_edus_by_dest[destination] = leftover_edus
|
||||
|
||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||
|
||||
@@ -519,7 +538,7 @@ class TransactionQueue(object):
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
logger.exception(
|
||||
"TX [%s] Failed to send transaction: %s",
|
||||
destination,
|
||||
e,
|
||||
@@ -574,6 +593,7 @@ class TransactionQueue(object):
|
||||
|
||||
success = True
|
||||
|
||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||
|
||||
txn_id = str(self._next_txn_id)
|
||||
@@ -669,3 +689,19 @@ class TransactionQueue(object):
|
||||
success = False
|
||||
|
||||
defer.returnValue(success)
|
||||
|
||||
|
||||
def _numberToBase(n, b):
|
||||
if n == 0:
|
||||
return [0]
|
||||
digits = []
|
||||
while n:
|
||||
digits.append(int(n % b))
|
||||
n //= b
|
||||
return digits[::-1]
|
||||
|
||||
|
||||
def _encode_id(i):
|
||||
digits = string.digits + string.ascii_letters
|
||||
val_slice = _numberToBase(i, len(digits))
|
||||
return "".join(digits[x] for x in val_slice)
|
||||
|
||||
@@ -125,7 +125,7 @@ class Authenticator(object):
|
||||
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
yield self.keyring.verify_json_for_server(origin, json_request)
|
||||
# yield self.keyring.verify_json_for_server(origin, json_request)
|
||||
|
||||
logger.info("Request from %s", origin)
|
||||
request.authenticated_entity = origin
|
||||
|
||||
@@ -19,6 +19,7 @@ server protocol.
|
||||
|
||||
import logging
|
||||
|
||||
from synapse.types import get_localpart_from_id, get_domain_from_id
|
||||
from synapse.util.jsonobject import JsonEncodedObject
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -76,15 +77,14 @@ class Transaction(JsonEncodedObject):
|
||||
]
|
||||
|
||||
internal_keys = [
|
||||
"transaction_id",
|
||||
"destination",
|
||||
]
|
||||
|
||||
required_keys = [
|
||||
"transaction_id",
|
||||
"origin",
|
||||
"destination",
|
||||
"origin_server_ts",
|
||||
"previous_ids",
|
||||
]
|
||||
|
||||
required_keys = [
|
||||
"pdus",
|
||||
]
|
||||
|
||||
@@ -108,15 +108,33 @@ class Transaction(JsonEncodedObject):
|
||||
""" Used to create a new transaction. Will auto fill out
|
||||
transaction_id and origin_server_ts keys.
|
||||
"""
|
||||
if "origin_server_ts" not in kwargs:
|
||||
raise KeyError(
|
||||
"Require 'origin_server_ts' to construct a Transaction"
|
||||
)
|
||||
if "transaction_id" not in kwargs:
|
||||
raise KeyError(
|
||||
"Require 'transaction_id' to construct a Transaction"
|
||||
)
|
||||
|
||||
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
|
||||
kwargs["pdus"] = [
|
||||
_mangle_pdu(p.get_pdu_json())
|
||||
for p in pdus
|
||||
]
|
||||
|
||||
return Transaction(**kwargs)
|
||||
|
||||
|
||||
def _mangle_pdu(pdu_json):
|
||||
pdu_json.pop("origin", None)
|
||||
pdu_json.pop("hashes", None)
|
||||
pdu_json.pop("signatures", None)
|
||||
pdu_json.get("unsigned", {}).pop("age_ts", None)
|
||||
pdu_json.get("unsigned", {}).pop("age", None)
|
||||
|
||||
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
|
||||
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
|
||||
|
||||
if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
|
||||
pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
|
||||
|
||||
logger.info("Mangled PDU: %s", pdu_json)
|
||||
|
||||
return pdu_json
|
||||
|
||||
|
||||
def _strip_hashes(iterable):
|
||||
return (
|
||||
e for e, hashes in iterable
|
||||
)
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import base64
|
||||
import secrets
|
||||
import logging
|
||||
import unicodedata
|
||||
|
||||
@@ -748,7 +750,9 @@ class AuthHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def issue_access_token(self, user_id, device_id=None):
|
||||
access_token = self.macaroon_gen.generate_access_token(user_id)
|
||||
# access_token = self.macaroon_gen.generate_access_token(user_id)
|
||||
access_token = base64.b64encode(secrets.token_bytes(8))
|
||||
|
||||
yield self.store.add_access_token_to_user(user_id, access_token,
|
||||
device_id)
|
||||
defer.returnValue(access_token)
|
||||
|
||||
@@ -278,6 +278,7 @@ class DeviceHandler(BaseHandler):
|
||||
"device_list_key", position, rooms=room_ids,
|
||||
)
|
||||
|
||||
return
|
||||
if hosts:
|
||||
logger.info("Sending device list update notif to: %r", hosts)
|
||||
for host in hosts:
|
||||
|
||||
@@ -259,10 +259,8 @@ class DirectoryHandler(BaseHandler):
|
||||
servers = result["servers"]
|
||||
|
||||
if not room_id:
|
||||
raise SynapseError(
|
||||
404,
|
||||
raise NotFoundError(
|
||||
"Room alias %s not found" % (room_alias.to_string(),),
|
||||
Codes.NOT_FOUND
|
||||
)
|
||||
|
||||
users = yield self.state.get_current_user_in_room(room_id)
|
||||
@@ -302,10 +300,8 @@ class DirectoryHandler(BaseHandler):
|
||||
"servers": result.servers,
|
||||
})
|
||||
else:
|
||||
raise SynapseError(
|
||||
404,
|
||||
raise NotFoundError(
|
||||
"Room alias %r not found" % (room_alias.to_string(),),
|
||||
Codes.NOT_FOUND
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -48,13 +48,14 @@ from synapse.crypto.event_signing import (
|
||||
compute_event_signature,
|
||||
)
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationCleanRoomRestServlet,
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
)
|
||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.types import UserID, get_domain_from_id, create_requester
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room
|
||||
@@ -67,6 +68,7 @@ from ._base import BaseHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
|
||||
|
||||
def shortstr(iterable, maxitems=5):
|
||||
"""If iterable has maxitems or fewer, return the stringification of a list
|
||||
@@ -105,6 +107,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
self.hs = hs
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastore() # type: synapse.storage.DataStore
|
||||
self.federation_client = hs.get_federation_client()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
@@ -174,6 +177,15 @@ class FederationHandler(BaseHandler):
|
||||
or pdu.internal_metadata.is_outlier()
|
||||
)
|
||||
)
|
||||
pdu_logger.info(
|
||||
"ReceivedPDU",
|
||||
extra={
|
||||
"event_id": pdu.event_id, "room_id": pdu.room_id,
|
||||
"origin": origin, "already_seen": already_seen,
|
||||
"server": self.server_name,
|
||||
},
|
||||
)
|
||||
|
||||
if already_seen:
|
||||
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
|
||||
return
|
||||
@@ -302,20 +314,20 @@ class FederationHandler(BaseHandler):
|
||||
# but there is an interaction with min_depth that I'm not really
|
||||
# following.
|
||||
|
||||
if sent_to_us_directly:
|
||||
logger.warn(
|
||||
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
|
||||
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
|
||||
)
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
403,
|
||||
(
|
||||
"Your server isn't divulging details about prev_events "
|
||||
"referenced in this event."
|
||||
),
|
||||
affected=pdu.event_id,
|
||||
)
|
||||
# if sent_to_us_directly:
|
||||
# logger.warn(
|
||||
# "[%s %s] Rejecting: failed to fetch %d prev events: %s",
|
||||
# room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
|
||||
# )
|
||||
# raise FederationError(
|
||||
# "ERROR",
|
||||
# 403,
|
||||
# (
|
||||
# "Your server isn't divulging details about prev_events "
|
||||
# "referenced in this event."
|
||||
# ),
|
||||
# affected=pdu.event_id,
|
||||
# )
|
||||
|
||||
# Calculate the state after each of the previous events, and
|
||||
# resolve them to find the correct state at the current event.
|
||||
@@ -504,9 +516,9 @@ class FederationHandler(BaseHandler):
|
||||
room_id,
|
||||
earliest_events_ids=list(latest),
|
||||
latest_events=[pdu],
|
||||
limit=10,
|
||||
limit=5,
|
||||
min_depth=min_depth,
|
||||
timeout=60000,
|
||||
timeout=15000,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -1300,8 +1312,38 @@ class FederationHandler(BaseHandler):
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
yield self.persist_events_and_notify([(event, context)])
|
||||
|
||||
sender = UserID.from_string(event.sender)
|
||||
target = UserID.from_string(event.state_key)
|
||||
if (sender.localpart == target.localpart):
|
||||
run_as_background_process(
|
||||
"_auto_accept_invite",
|
||||
self._auto_accept_invite,
|
||||
sender, target, event.room_id,
|
||||
)
|
||||
|
||||
event.unsigned.pop("invite_room_state", None)
|
||||
defer.returnValue(event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _auto_accept_invite(self, sender, target, room_id):
|
||||
joined = False
|
||||
for attempt in range(0, 10):
|
||||
try:
|
||||
yield self.hs.get_room_member_handler().update_membership(
|
||||
requester=create_requester(target.to_string()),
|
||||
target=target,
|
||||
room_id=room_id,
|
||||
action="join",
|
||||
)
|
||||
joined = True
|
||||
break
|
||||
except Exception:
|
||||
# We're going to retry, but we should log the error
|
||||
logger.exception("Error auto-accepting invite on attempt %d" % attempt)
|
||||
yield self.clock.sleep(1)
|
||||
if not joined:
|
||||
logger.error("Giving up on trying to auto-accept invite: too many attempts")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
|
||||
origin, event = yield self._make_and_verify_event(
|
||||
|
||||
@@ -626,6 +626,7 @@ class PresenceHandler(object):
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
"""
|
||||
return
|
||||
self.federation.send_presence(states)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -816,6 +817,7 @@ class PresenceHandler(object):
|
||||
if self.is_mine(observed_user):
|
||||
yield self.invite_presence(observed_user, observer_user)
|
||||
else:
|
||||
return
|
||||
yield self.federation.send_edu(
|
||||
destination=observed_user.domain,
|
||||
edu_type="m.presence_invite",
|
||||
@@ -836,6 +838,7 @@ class PresenceHandler(object):
|
||||
if self.is_mine(observer_user):
|
||||
yield self.accept_presence(observed_user, observer_user)
|
||||
else:
|
||||
return
|
||||
self.federation.send_edu(
|
||||
destination=observer_user.domain,
|
||||
edu_type="m.presence_accept",
|
||||
|
||||
@@ -147,6 +147,8 @@ class ReceiptsHandler(BaseHandler):
|
||||
|
||||
logger.debug("Sending receipt to: %r", remotedomains)
|
||||
|
||||
return
|
||||
|
||||
for domain in remotedomains:
|
||||
self.federation.send_edu(
|
||||
destination=domain,
|
||||
|
||||
@@ -79,6 +79,8 @@ class RoomCreationHandler(BaseHandler):
|
||||
# linearizer to stop two upgrades happening at once
|
||||
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
|
||||
|
||||
self._next_room_id = 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def upgrade_room(self, requester, old_room_id, new_version):
|
||||
"""Replace a room with a new room with a different version
|
||||
@@ -741,7 +743,9 @@ class RoomCreationHandler(BaseHandler):
|
||||
attempts = 0
|
||||
while attempts < 5:
|
||||
try:
|
||||
random_string = stringutils.random_string(18)
|
||||
i = self._next_room_id
|
||||
self._next_room_id += 1
|
||||
random_string = stringutils.random_string(3) + str(i)
|
||||
gen_room_id = RoomID(
|
||||
random_string,
|
||||
self.hs.hostname,
|
||||
|
||||
@@ -29,7 +29,8 @@ import synapse.server
|
||||
import synapse.types
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
from synapse.types import RoomID, UserID
|
||||
from synapse.types import RoomID, UserID, RoomAlias
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room, user_left_room
|
||||
|
||||
@@ -416,6 +417,10 @@ class RoomMemberHandler(object):
|
||||
ret = yield self._remote_join(
|
||||
requester, remote_room_hosts, room_id, target, content
|
||||
)
|
||||
logcontext.run_in_background(
|
||||
self._send_merged_user_invites,
|
||||
requester, room_id,
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
|
||||
elif effective_membership_state == Membership.LEAVE:
|
||||
@@ -450,8 +455,44 @@ class RoomMemberHandler(object):
|
||||
prev_events_and_hashes=prev_events_and_hashes,
|
||||
content=content,
|
||||
)
|
||||
if effective_membership_state == Membership.JOIN:
|
||||
logcontext.run_in_background(
|
||||
self._send_merged_user_invites,
|
||||
requester, room_id,
|
||||
)
|
||||
defer.returnValue(res)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _send_merged_user_invites(self, requester, room_id):
|
||||
try:
|
||||
profile_alias = "#_profile_" + requester.user.localpart + ":" + self.hs.hostname
|
||||
profile_alias = RoomAlias.from_string(profile_alias)
|
||||
profile_room_id, remote_room_hosts = yield self.lookup_room_alias(profile_alias)
|
||||
if profile_room_id:
|
||||
linked_accounts = yield self.state_handler.get_current_state(
|
||||
room_id=profile_room_id.to_string(),
|
||||
event_type="m.linked_accounts",
|
||||
state_key="",
|
||||
)
|
||||
if not linked_accounts or not linked_accounts.content['all_children']:
|
||||
return
|
||||
for child_id in linked_accounts.content['all_children']:
|
||||
child = UserID.from_string(child_id)
|
||||
if self.hs.is_mine(child) or child_id == requester.user.to_string():
|
||||
# TODO: Handle auto-invite for local users (not a priority)
|
||||
continue
|
||||
try:
|
||||
yield self.update_membership(
|
||||
requester=requester,
|
||||
target=child,
|
||||
room_id=room_id,
|
||||
action="invite",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to invite %s to %s" % (child_id, room_id))
|
||||
except Exception:
|
||||
logger.exception("Failed to send invites to children of %s in %s" % (requester.user.to_string(), room_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_membership_event(
|
||||
self,
|
||||
|
||||
@@ -931,10 +931,7 @@ class SyncHandler(object):
|
||||
newly_joined_rooms, newly_joined_users, _, _ = res
|
||||
_, _, newly_left_rooms, newly_left_users = res
|
||||
|
||||
block_all_presence_data = (
|
||||
since_token is None and
|
||||
sync_config.filter_collection.blocks_all_presence()
|
||||
)
|
||||
block_all_presence_data = True
|
||||
if self.hs_config.use_presence and not block_all_presence_data:
|
||||
yield self._generate_sync_entry_for_presence(
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
@@ -1231,10 +1228,7 @@ class SyncHandler(object):
|
||||
`(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
|
||||
"""
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
block_all_room_ephemeral = (
|
||||
sync_result_builder.since_token is None and
|
||||
sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
|
||||
)
|
||||
block_all_room_ephemeral = True
|
||||
|
||||
if block_all_room_ephemeral:
|
||||
ephemeral_by_room = {}
|
||||
|
||||
@@ -231,6 +231,7 @@ class TypingHandler(object):
|
||||
for domain in set(get_domain_from_id(u) for u in users):
|
||||
if domain != self.server_name:
|
||||
logger.debug("sending typing update to %s", domain)
|
||||
return
|
||||
self.federation.send_edu(
|
||||
destination=domain,
|
||||
edu_type="m.typing",
|
||||
|
||||
@@ -194,8 +194,11 @@ class _WrappedConnection(object):
|
||||
# In Twisted >18.4; the TLS connection will be None if it has closed
|
||||
# which will make abortConnection() throw. Check that the TLS connection
|
||||
# is not None before trying to close it.
|
||||
if self.transport.getHandle() is not None:
|
||||
self.transport.abortConnection()
|
||||
try:
|
||||
if self.transport.getHandle() is not None:
|
||||
self.transport.abortConnection()
|
||||
except:
|
||||
logger.warning("Failed to abort connection")
|
||||
|
||||
def request(self, request):
|
||||
self.last_request = time.time()
|
||||
|
||||
@@ -18,6 +18,7 @@ import logging
|
||||
import random
|
||||
import sys
|
||||
from io import BytesIO
|
||||
import os
|
||||
|
||||
from six import PY3, string_types
|
||||
from six.moves import urllib
|
||||
@@ -55,6 +56,7 @@ outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_request
|
||||
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
|
||||
"", ["method", "code"])
|
||||
|
||||
USE_PROXY = "SYNAPSE_USE_PROXY" in os.environ
|
||||
|
||||
MAX_LONG_RETRIES = 10
|
||||
MAX_SHORT_RETRIES = 3
|
||||
@@ -65,6 +67,18 @@ else:
|
||||
MAXINT = sys.maxint
|
||||
|
||||
|
||||
class ProxyMatrixFederationEndpointFactory(object):
|
||||
def __init__(self, hs):
|
||||
self.reactor = hs.get_reactor()
|
||||
self.tls_client_options_factory = hs.tls_client_options_factory
|
||||
|
||||
def endpointForURI(self, uri):
|
||||
return matrix_federation_endpoint(
|
||||
self.reactor, "localhost:8888", timeout=10,
|
||||
tls_client_options_factory=None
|
||||
)
|
||||
|
||||
|
||||
class MatrixFederationEndpointFactory(object):
|
||||
def __init__(self, hs):
|
||||
self.reactor = hs.get_reactor()
|
||||
@@ -190,8 +204,23 @@ class MatrixFederationHttpClient(object):
|
||||
pool.retryAutomatically = False
|
||||
pool.maxPersistentPerHost = 5
|
||||
pool.cachedConnectionTimeout = 2 * 60
|
||||
self.agent = Agent.usingEndpointFactory(
|
||||
reactor, MatrixFederationEndpointFactory(hs), pool=pool
|
||||
|
||||
if USE_PROXY:
|
||||
self.agent = Agent.usingEndpointFactory(
|
||||
reactor, ProxyMatrixFederationEndpointFactory(hs), pool=pool
|
||||
)
|
||||
else:
|
||||
self.agent = Agent.usingEndpointFactory(
|
||||
reactor, MatrixFederationEndpointFactory(hs), pool=pool
|
||||
)
|
||||
|
||||
file_pool = HTTPConnectionPool(reactor)
|
||||
file_pool.retryAutomatically = False
|
||||
file_pool.maxPersistentPerHost = 5
|
||||
file_pool.cachedConnectionTimeout = 10
|
||||
|
||||
self.file_agent = Agent.usingEndpointFactory(
|
||||
reactor, MatrixFederationEndpointFactory(hs), pool=file_pool
|
||||
)
|
||||
self.clock = hs.get_clock()
|
||||
self._store = hs.get_datastore()
|
||||
@@ -211,7 +240,8 @@ class MatrixFederationHttpClient(object):
|
||||
timeout=None,
|
||||
long_retries=False,
|
||||
ignore_backoff=False,
|
||||
backoff_on_404=False
|
||||
backoff_on_404=False,
|
||||
agent=None,
|
||||
):
|
||||
"""
|
||||
Sends a request to the given server.
|
||||
@@ -704,6 +734,7 @@ class MatrixFederationHttpClient(object):
|
||||
request,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
ignore_backoff=ignore_backoff,
|
||||
agent=self.file_agent,
|
||||
)
|
||||
|
||||
headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
Reference in New Issue
Block a user