Compare commits

...

23 Commits

Author SHA1 Message Date
Brendan Abolivier
64805e6811 Remove stranded code 2019-05-16 18:35:25 +01:00
Brendan Abolivier
115b79948f Remove old proxy binary 2019-02-13 21:58:39 +00:00
Travis Ralston
202fc2cc00 Catch room profile errors and anything else that can go wrong
Fixes an issue where things become unhappy when the room profile for a user is missing.
2019-02-13 21:01:32 +00:00
Brendan Abolivier
94342ce65d Add logging for meshsim 2019-02-13 20:54:34 +00:00
Brendan Abolivier
4ec2df6dda Fix unnoticed merge conflict 2019-02-13 20:54:34 +00:00
Erik Johnston
d85851bcca Fix fetching media when using proxy 2019-02-13 20:54:34 +00:00
Erik Johnston
422da6c52c Reduce send invite request size 2019-02-13 20:54:34 +00:00
Erik Johnston
a0c8c1fc49 Compress some client data 2019-02-13 20:54:34 +00:00
Erik Johnston
eca7ece93f Handle slow/lossy connections better when sending transactions 2019-02-13 20:54:34 +00:00
Erik Johnston
8a090731c9 Actually fix exceptions 2019-02-13 20:54:34 +00:00
Erik Johnston
f8b891c5e7 Reduce size of fed transaction IDs 2019-02-13 20:54:34 +00:00
Erik Johnston
0d7ec185cb Make event_ids smaller 2019-02-13 20:54:34 +00:00
Erik Johnston
dbf7545ccd Mangle some more PDU fields 2019-02-13 20:54:34 +00:00
Erik Johnston
7f9a087947 Change access tokens to be base64'ed 4 bytes 2019-02-13 20:54:34 +00:00
Travis Ralston
b1db74b39a Merge pull request #4218 from matrix-org/travis/account-merging
Proof of concept for auto-accepting invites on merged accounts
2019-02-13 20:54:34 +00:00
Erik Johnston
3cba0dccb3 Disable presence/typing/receipts. Don't die if we can't parse an EDU 2019-02-13 20:54:34 +00:00
Erik Johnston
256333718b Make using proxy optional 2019-02-13 20:54:34 +00:00
Erik Johnston
1167fe1ad4 Drop unnecessary keys from transactions 2019-02-13 20:54:34 +00:00
Erik Johnston
7c7b084d50 Make room ID smaller 2019-02-13 20:54:34 +00:00
Erik Johnston
8d316f0060 Reduce event ID size 2019-02-13 20:54:34 +00:00
Erik Johnston
b8deaa077e Strip signatures and hashes on outgoing events 2019-02-13 20:54:27 +00:00
Brendan Abolivier
7fa5156292 Make synapse talk HTTP to the local proxy only when federating 2019-02-13 18:05:47 +00:00
Erik Johnston
68105fca36 Don't verify stuff 2019-02-13 18:04:13 +00:00
20 changed files with 308 additions and 79 deletions

View File

@@ -46,7 +46,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
if not hasattr(event, "room_id"):
raise AuthError(500, "Event has no room_id: %s" % event)
if do_sig_check:
if False and do_sig_check: # Disable all sig checks for meshsim
sender_domain = get_domain_from_id(event.sender)
event_id_domain = get_domain_from_id(event.event_id)

View File

@@ -14,9 +14,9 @@
# limitations under the License.
import copy
import string
from synapse.types import EventID
from synapse.util.stringutils import random_string
from . import EventBase, FrozenEvent, _event_dict_property
@@ -49,10 +49,10 @@ class EventBuilderFactory(object):
self.event_id_count = 0
def create_event_id(self):
i = str(self.event_id_count)
i = self.event_id_count
self.event_id_count += 1
local_part = str(int(self.clock.time())) + i + random_string(5)
local_part = _encode_id(i)
e_id = EventID(local_part, self.hostname)
@@ -73,3 +73,19 @@ class EventBuilderFactory(object):
key_values["signatures"] = {}
return EventBuilder(key_values=key_values,)
def _numberToBase(n, b):
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def _encode_id(i):
digits = string.digits + string.ascii_letters
val_slice = _numberToBase(i, len(digits))
return "".join(digits[x] for x in val_slice)

View File

@@ -26,7 +26,7 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.types import get_domain_from_id
from synapse.types import get_domain_from_id, EventID
from synapse.util import logcontext, unwrapFirstError
logger = logging.getLogger(__name__)
@@ -136,6 +136,7 @@ class FederationBase(object):
* throws a SynapseError if the signature check failed.
The deferreds run their callbacks in the sentinel logcontext.
"""
return [defer.succeed(p) for p in pdus]
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
ctx = logcontext.LoggingContext.current_context()
@@ -317,7 +318,7 @@ def event_from_pdu_json(pdu_json, outlier=False):
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
raise SynapseError(400, "Depth %r not an integer" % (depth, ),
Codes.BAD_JSON)
if depth < 0:
@@ -325,6 +326,31 @@ def event_from_pdu_json(pdu_json, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event_id = pdu_json["event_id"]
if event_id[0] != "$":
pdu_json["event_id"] = EventID(
event_id,
get_domain_from_id(pdu_json["sender"]),
).to_string()
event_id = pdu_json["event_id"]
if "auth_events" in pdu_json:
pdu_json["auth_events"] = [
(e, {}) if isinstance(e, six.string_types) else e
for e in pdu_json["auth_events"]
]
if "prev_events" in pdu_json:
pdu_json["prev_events"] = [
(e, {}) if isinstance(e, six.string_types) else e
for e in pdu_json["prev_events"]
]
if "origin" not in pdu_json:
pdu_json["origin"] = get_domain_from_id(pdu_json["sender"])
logger.info("Unmangled event to: %s", pdu_json)
event = FrozenEvent(
pdu_json
)

View File

@@ -39,6 +39,7 @@ from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.federation.units import _mangle_pdu
logger = logging.getLogger(__name__)
@@ -696,7 +697,7 @@ class FederationClient(FederationBase):
destination=destination,
room_id=room_id,
event_id=event_id,
content=pdu.get_pdu_json(time_now),
content=_mangle_pdu(pdu.get_pdu_json(time_now)),
)
except HttpResponseException as e:
if e.code == 403:

View File

@@ -36,7 +36,7 @@ from synapse.api.errors import (
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.federation.units import Edu, Transaction, _mangle_pdu
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -233,7 +234,16 @@ class FederationServer(FederationBase):
)
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
logger.info("Got edus: %s", transaction.edus)
edus = []
for x in transaction.edus:
try:
edus.append(Edu(**x))
except Exception:
logger.exception("Failed to handle EDU: %s", x)
for edu in edus:
yield self.received_edu(
origin,
edu.edu_type,
@@ -329,8 +339,8 @@ class FederationServer(FederationBase):
)
defer.returnValue({
"pdus": [pdu.get_pdu_json() for pdu in pdus],
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
"pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
"auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
})
@defer.inlineCallbacks
@@ -375,7 +385,7 @@ class FederationServer(FederationBase):
yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
@@ -389,9 +399,9 @@ class FederationServer(FederationBase):
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
],
}))
@@ -424,7 +434,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
"auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
}
defer.returnValue((200, res))
@@ -473,7 +483,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
send_content = {
"auth_chain": [
e.get_pdu_json(time_now)
_mangle_pdu(e.get_pdu_json(time_now))
for e in ret["auth_chain"]
],
"rejects": ret.get("rejects", []),
@@ -549,7 +559,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
"events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
})
@log_function

View File

@@ -14,6 +14,8 @@
# limitations under the License.
import datetime
import logging
import string
import json
from six import itervalues
@@ -42,6 +44,8 @@ from .units import Edu, Transaction
logger = logging.getLogger(__name__)
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
)
@@ -128,7 +132,7 @@ class TransactionQueue(object):
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
self._next_txn_id = 1
self._order = 1
@@ -260,6 +264,15 @@ class TransactionQueue(object):
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
pdu_logger.info(
"SendingPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destinations": json.dumps(list(destinations)),
"server": self.server_name,
},
)
if not destinations:
return
@@ -439,16 +452,22 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus
# self.pending_pdus_by_dest[destination] = leftover_pdus
for _, _, p_span in leftover_pdus:
p_span.set_tag("success", False)
p_span.log_kv({"result": "dropped"})
p_span.finish()
logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus
pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
# if leftover_edus:
# self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
@@ -519,7 +538,7 @@ class TransactionQueue(object):
except FederationDeniedError as e:
logger.info(e)
except Exception as e:
logger.warn(
logger.exception(
"TX [%s] Failed to send transaction: %s",
destination,
e,
@@ -574,6 +593,7 @@ class TransactionQueue(object):
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
@@ -669,3 +689,19 @@ class TransactionQueue(object):
success = False
defer.returnValue(success)
def _numberToBase(n, b):
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def _encode_id(i):
digits = string.digits + string.ascii_letters
val_slice = _numberToBase(i, len(digits))
return "".join(digits[x] for x in val_slice)

View File

@@ -125,7 +125,7 @@ class Authenticator(object):
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
)
yield self.keyring.verify_json_for_server(origin, json_request)
# yield self.keyring.verify_json_for_server(origin, json_request)
logger.info("Request from %s", origin)
request.authenticated_entity = origin

View File

@@ -19,6 +19,7 @@ server protocol.
import logging
from synapse.types import get_localpart_from_id, get_domain_from_id
from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
@@ -76,15 +77,14 @@ class Transaction(JsonEncodedObject):
]
internal_keys = [
"transaction_id",
"destination",
]
required_keys = [
"transaction_id",
"origin",
"destination",
"origin_server_ts",
"previous_ids",
]
required_keys = [
"pdus",
]
@@ -108,15 +108,33 @@ class Transaction(JsonEncodedObject):
""" Used to create a new transaction. Will auto fill out
transaction_id and origin_server_ts keys.
"""
if "origin_server_ts" not in kwargs:
raise KeyError(
"Require 'origin_server_ts' to construct a Transaction"
)
if "transaction_id" not in kwargs:
raise KeyError(
"Require 'transaction_id' to construct a Transaction"
)
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
kwargs["pdus"] = [
_mangle_pdu(p.get_pdu_json())
for p in pdus
]
return Transaction(**kwargs)
def _mangle_pdu(pdu_json):
pdu_json.pop("origin", None)
pdu_json.pop("hashes", None)
pdu_json.pop("signatures", None)
pdu_json.get("unsigned", {}).pop("age_ts", None)
pdu_json.get("unsigned", {}).pop("age", None)
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
logger.info("Mangled PDU: %s", pdu_json)
return pdu_json
def _strip_hashes(iterable):
return (
e for e, hashes in iterable
)

View File

@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import secrets
import logging
import unicodedata
@@ -748,7 +750,9 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def issue_access_token(self, user_id, device_id=None):
access_token = self.macaroon_gen.generate_access_token(user_id)
# access_token = self.macaroon_gen.generate_access_token(user_id)
access_token = base64.b64encode(secrets.token_bytes(8))
yield self.store.add_access_token_to_user(user_id, access_token,
device_id)
defer.returnValue(access_token)

View File

@@ -278,6 +278,7 @@ class DeviceHandler(BaseHandler):
"device_list_key", position, rooms=room_ids,
)
return
if hosts:
logger.info("Sending device list update notif to: %r", hosts)
for host in hosts:

View File

@@ -259,10 +259,8 @@ class DirectoryHandler(BaseHandler):
servers = result["servers"]
if not room_id:
raise SynapseError(
404,
raise NotFoundError(
"Room alias %s not found" % (room_alias.to_string(),),
Codes.NOT_FOUND
)
users = yield self.state.get_current_user_in_room(room_id)
@@ -302,10 +300,8 @@ class DirectoryHandler(BaseHandler):
"servers": result.servers,
})
else:
raise SynapseError(
404,
raise NotFoundError(
"Room alias %r not found" % (room_alias.to_string(),),
Codes.NOT_FOUND
)
@defer.inlineCallbacks

View File

@@ -48,13 +48,14 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.types import UserID, get_domain_from_id, create_requester
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
@@ -67,6 +68,7 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
def shortstr(iterable, maxitems=5):
"""If iterable has maxitems or fewer, return the stringification of a list
@@ -105,6 +107,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
@@ -174,6 +177,15 @@ class FederationHandler(BaseHandler):
or pdu.internal_metadata.is_outlier()
)
)
pdu_logger.info(
"ReceivedPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"origin": origin, "already_seen": already_seen,
"server": self.server_name,
},
)
if already_seen:
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
@@ -302,20 +314,20 @@ class FederationHandler(BaseHandler):
# but there is an interaction with min_depth that I'm not really
# following.
if sent_to_us_directly:
logger.warn(
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
# if sent_to_us_directly:
# logger.warn(
# "[%s %s] Rejecting: failed to fetch %d prev events: %s",
# room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
# )
# raise FederationError(
# "ERROR",
# 403,
# (
# "Your server isn't divulging details about prev_events "
# "referenced in this event."
# ),
# affected=pdu.event_id,
# )
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
@@ -504,9 +516,9 @@ class FederationHandler(BaseHandler):
room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
limit=5,
min_depth=min_depth,
timeout=60000,
timeout=15000,
)
logger.info(
@@ -1300,8 +1312,38 @@ class FederationHandler(BaseHandler):
context = yield self.state_handler.compute_event_context(event)
yield self.persist_events_and_notify([(event, context)])
sender = UserID.from_string(event.sender)
target = UserID.from_string(event.state_key)
if (sender.localpart == target.localpart):
run_as_background_process(
"_auto_accept_invite",
self._auto_accept_invite,
sender, target, event.room_id,
)
event.unsigned.pop("invite_room_state", None)
defer.returnValue(event)
@defer.inlineCallbacks
def _auto_accept_invite(self, sender, target, room_id):
joined = False
for attempt in range(0, 10):
try:
yield self.hs.get_room_member_handler().update_membership(
requester=create_requester(target.to_string()),
target=target,
room_id=room_id,
action="join",
)
joined = True
break
except Exception:
# We're going to retry, but we should log the error
logger.exception("Error auto-accepting invite on attempt %d" % attempt)
yield self.clock.sleep(1)
if not joined:
logger.error("Giving up on trying to auto-accept invite: too many attempts")
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
origin, event = yield self._make_and_verify_event(

View File

@@ -626,6 +626,7 @@ class PresenceHandler(object):
Args:
states (list(UserPresenceState))
"""
return
self.federation.send_presence(states)
@defer.inlineCallbacks
@@ -816,6 +817,7 @@ class PresenceHandler(object):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
return
yield self.federation.send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
@@ -836,6 +838,7 @@ class PresenceHandler(object):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
return
self.federation.send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",

View File

@@ -147,6 +147,8 @@ class ReceiptsHandler(BaseHandler):
logger.debug("Sending receipt to: %r", remotedomains)
return
for domain in remotedomains:
self.federation.send_edu(
destination=domain,

View File

@@ -79,6 +79,8 @@ class RoomCreationHandler(BaseHandler):
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
self._next_room_id = 0
@defer.inlineCallbacks
def upgrade_room(self, requester, old_room_id, new_version):
"""Replace a room with a new room with a different version
@@ -741,7 +743,9 @@ class RoomCreationHandler(BaseHandler):
attempts = 0
while attempts < 5:
try:
random_string = stringutils.random_string(18)
i = self._next_room_id
self._next_room_id += 1
random_string = stringutils.random_string(3) + str(i)
gen_room_id = RoomID(
random_string,
self.hs.hostname,

View File

@@ -29,7 +29,8 @@ import synapse.server
import synapse.types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
from synapse.types import RoomID, UserID, RoomAlias
from synapse.util import logcontext
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -416,6 +417,10 @@ class RoomMemberHandler(object):
ret = yield self._remote_join(
requester, remote_room_hosts, room_id, target, content
)
logcontext.run_in_background(
self._send_merged_user_invites,
requester, room_id,
)
defer.returnValue(ret)
elif effective_membership_state == Membership.LEAVE:
@@ -450,8 +455,44 @@ class RoomMemberHandler(object):
prev_events_and_hashes=prev_events_and_hashes,
content=content,
)
if effective_membership_state == Membership.JOIN:
logcontext.run_in_background(
self._send_merged_user_invites,
requester, room_id,
)
defer.returnValue(res)
@defer.inlineCallbacks
def _send_merged_user_invites(self, requester, room_id):
try:
profile_alias = "#_profile_" + requester.user.localpart + ":" + self.hs.hostname
profile_alias = RoomAlias.from_string(profile_alias)
profile_room_id, remote_room_hosts = yield self.lookup_room_alias(profile_alias)
if profile_room_id:
linked_accounts = yield self.state_handler.get_current_state(
room_id=profile_room_id.to_string(),
event_type="m.linked_accounts",
state_key="",
)
if not linked_accounts or not linked_accounts.content['all_children']:
return
for child_id in linked_accounts.content['all_children']:
child = UserID.from_string(child_id)
if self.hs.is_mine(child) or child_id == requester.user.to_string():
# TODO: Handle auto-invite for local users (not a priority)
continue
try:
yield self.update_membership(
requester=requester,
target=child,
room_id=room_id,
action="invite",
)
except Exception:
logger.exception("Failed to invite %s to %s" % (child_id, room_id))
except Exception:
logger.exception("Failed to send invites to children of %s in %s" % (requester.user.to_string(), room_id))
@defer.inlineCallbacks
def send_membership_event(
self,

View File

@@ -931,10 +931,7 @@ class SyncHandler(object):
newly_joined_rooms, newly_joined_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res
block_all_presence_data = (
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
block_all_presence_data = True
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
@@ -1231,10 +1228,7 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
sync_result_builder.since_token is None and
sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
block_all_room_ephemeral = True
if block_all_room_ephemeral:
ephemeral_by_room = {}

View File

@@ -231,6 +231,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
return
self.federation.send_edu(
destination=domain,
edu_type="m.typing",

View File

@@ -194,8 +194,11 @@ class _WrappedConnection(object):
# In Twisted >18.4; the TLS connection will be None if it has closed
# which will make abortConnection() throw. Check that the TLS connection
# is not None before trying to close it.
if self.transport.getHandle() is not None:
self.transport.abortConnection()
try:
if self.transport.getHandle() is not None:
self.transport.abortConnection()
except:
logger.warning("Failed to abort connection")
def request(self, request):
self.last_request = time.time()

View File

@@ -18,6 +18,7 @@ import logging
import random
import sys
from io import BytesIO
import os
from six import PY3, string_types
from six.moves import urllib
@@ -55,6 +56,7 @@ outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_request
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
"", ["method", "code"])
USE_PROXY = "SYNAPSE_USE_PROXY" in os.environ
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
@@ -65,6 +67,18 @@ else:
MAXINT = sys.maxint
class ProxyMatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
return matrix_federation_endpoint(
self.reactor, "localhost:8888", timeout=10,
tls_client_options_factory=None
)
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
@@ -190,8 +204,23 @@ class MatrixFederationHttpClient(object):
pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
if USE_PROXY:
self.agent = Agent.usingEndpointFactory(
reactor, ProxyMatrixFederationEndpointFactory(hs), pool=pool
)
else:
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
file_pool = HTTPConnectionPool(reactor)
file_pool.retryAutomatically = False
file_pool.maxPersistentPerHost = 5
file_pool.cachedConnectionTimeout = 10
self.file_agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=file_pool
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
@@ -211,7 +240,8 @@ class MatrixFederationHttpClient(object):
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False
backoff_on_404=False,
agent=None,
):
"""
Sends a request to the given server.
@@ -704,6 +734,7 @@ class MatrixFederationHttpClient(object):
request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
agent=self.file_agent,
)
headers = dict(response.headers.getAllRawHeaders())