mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
9 Commits
bbz/improv
...
dmr/restri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9eca6d8f4 | ||
|
|
e098fbf9d4 | ||
|
|
833eae2529 | ||
|
|
15093ea48b | ||
|
|
e41a90e890 | ||
|
|
473de81b2d | ||
|
|
eb8e679974 | ||
|
|
4acecc5ea0 | ||
|
|
6298d77eb1 |
@@ -18,6 +18,7 @@ for port in 8080 8081 8082; do
|
||||
echo "Starting server on port $port... "
|
||||
|
||||
https_port=$((port + 400))
|
||||
proxy_port=$((port + 100))
|
||||
mkdir -p demo/$port
|
||||
pushd demo/$port || exit
|
||||
|
||||
@@ -65,11 +66,21 @@ for port in 8080 8081 8082; do
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
compress: false
|
||||
|
||||
- port: $proxy_port
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
type: outbound_federation_proxy
|
||||
|
||||
PORTLISTENERS
|
||||
)
|
||||
|
||||
echo "${listeners}"
|
||||
|
||||
echo "outbound_federation_proxied_via:"
|
||||
echo " master:"
|
||||
echo " host: localhost"
|
||||
echo " port: $proxy_port"
|
||||
|
||||
# Disable TLS for the servers
|
||||
printf '\n\n# Disable TLS for the servers.'
|
||||
echo '# DO NOT USE IN PRODUCTION'
|
||||
|
||||
@@ -20,6 +20,11 @@ worker_listeners:
|
||||
- {{ resource }}
|
||||
{%- endfor %}
|
||||
{% endif %}
|
||||
{% if outbound_federation_proxy_port %}
|
||||
- type: outbound_federation_proxy
|
||||
port: {{ outbound_federation_proxy_port }}
|
||||
{% endif %}
|
||||
|
||||
|
||||
worker_log_config: {{ worker_log_config_filepath }}
|
||||
|
||||
|
||||
@@ -328,6 +328,7 @@ def add_worker_roles_to_shared_config(
|
||||
worker_type: str,
|
||||
worker_name: str,
|
||||
worker_port: int,
|
||||
worker_proxy_port: int,
|
||||
) -> None:
|
||||
"""Given a dictionary representing a config file shared across all workers,
|
||||
append appropriate worker information to it for the current worker_type instance.
|
||||
@@ -340,6 +341,7 @@ def add_worker_roles_to_shared_config(
|
||||
"""
|
||||
# The instance_map config field marks the workers that write to various replication streams
|
||||
instance_map = shared_config.setdefault("instance_map", {})
|
||||
proxied_via = shared_config.setdefault("outbound_federation_proxied_via", {})
|
||||
|
||||
# Worker-type specific sharding config
|
||||
if worker_type == "pusher":
|
||||
@@ -347,6 +349,7 @@ def add_worker_roles_to_shared_config(
|
||||
|
||||
elif worker_type == "federation_sender":
|
||||
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
||||
proxied_via[worker_name] = {"host": "localhost", "port": worker_proxy_port}
|
||||
|
||||
elif worker_type == "event_persister":
|
||||
# Event persisters write to the events stream, so we need to update
|
||||
@@ -388,6 +391,17 @@ def generate_base_homeserver_config() -> None:
|
||||
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
||||
|
||||
|
||||
# Start worker ports from this arbitrary port
|
||||
_worker_port = 18009
|
||||
|
||||
|
||||
def claim_port() -> int:
|
||||
global _worker_port
|
||||
port = _worker_port
|
||||
_worker_port += 1
|
||||
return port
|
||||
|
||||
|
||||
def generate_worker_files(
|
||||
environ: Mapping[str, str], config_path: str, data_dir: str
|
||||
) -> None:
|
||||
@@ -465,9 +479,6 @@ def generate_worker_files(
|
||||
# Create the worker configuration directory if it doesn't already exist
|
||||
os.makedirs("/conf/workers", exist_ok=True)
|
||||
|
||||
# Start worker ports from this arbitrary port
|
||||
worker_port = 18009
|
||||
|
||||
# A counter of worker_type -> int. Used for determining the name for a given
|
||||
# worker type when generating its config file, as each worker's name is just
|
||||
# worker_type + instance #
|
||||
@@ -479,6 +490,8 @@ def generate_worker_files(
|
||||
|
||||
# For each worker type specified by the user, create config values
|
||||
for worker_type in worker_types:
|
||||
worker_port = claim_port()
|
||||
worker_proxy_port = claim_port()
|
||||
worker_config = WORKERS_CONFIG.get(worker_type)
|
||||
if worker_config:
|
||||
worker_config = worker_config.copy()
|
||||
@@ -495,6 +508,9 @@ def generate_worker_files(
|
||||
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
||||
)
|
||||
|
||||
if worker_type == "federation_sender":
|
||||
worker_config["outbound_federation_proxy_port"] = worker_proxy_port
|
||||
|
||||
# Update the shared config with any worker-type specific options
|
||||
shared_config.update(worker_config["shared_extra_conf"])
|
||||
|
||||
@@ -505,7 +521,7 @@ def generate_worker_files(
|
||||
|
||||
# Update the shared config with sharding-related options if necessary
|
||||
add_worker_roles_to_shared_config(
|
||||
shared_config, worker_type, worker_name, worker_port
|
||||
shared_config, worker_type, worker_name, worker_port, worker_proxy_port
|
||||
)
|
||||
|
||||
# Enable the worker in supervisord
|
||||
@@ -538,8 +554,6 @@ def generate_worker_files(
|
||||
worker_log_config_filepath=log_config_filepath,
|
||||
)
|
||||
|
||||
worker_port += 1
|
||||
|
||||
# Build the nginx location config blocks
|
||||
nginx_location_config = ""
|
||||
for endpoint, upstream in nginx_locations.items():
|
||||
|
||||
@@ -433,9 +433,16 @@ See the docs [request log format](../administration/request_log.md).
|
||||
|
||||
* `type`: the type of listener. Normally `http`, but other valid options are:
|
||||
|
||||
* `manhole`: (see the docs [here](../../manhole.md)),
|
||||
* `manhole`: see the docs [here](../../manhole.md).
|
||||
|
||||
* `metrics`: (see the docs [here](../../metrics-howto.md)),
|
||||
* `metrics`: see the docs [here](../../metrics-howto.md).
|
||||
|
||||
* `outbound_federation_proxy`: for use with worker deployments only. Allows
|
||||
this worker to make federation requests on behalf of other workers. This
|
||||
should only be used with [`outbound_federation_proxied_via`](#outbound_federation_proxied_via),
|
||||
matching the port number for the host specified there.
|
||||
|
||||
_New in Synapse 1.79._
|
||||
|
||||
* `tls`: set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
|
||||
|
||||
@@ -3832,6 +3839,38 @@ federation_sender_instances:
|
||||
- federation_sender1
|
||||
- federation_sender2
|
||||
```
|
||||
---
|
||||
### `outbound_federation_proxied_via`
|
||||
|
||||
A map from worker names to objects specifying hostnames and port numbers.
|
||||
|
||||
By default, Synapse workers make
|
||||
[federation requests](https://spec.matrix.org/v1.6/server-server-api/)
|
||||
on-demand. This option lets server operators limit this ability to a subset of
|
||||
"proxy" workers. Non-proxy workers will not make federation requests directly;
|
||||
they will do so indirectly via a proxy worker.
|
||||
|
||||
By default this mapping is empty, which means that every worker can make federation
|
||||
requests for themselves.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
outbound_federation_proxied_via:
|
||||
# Master and fed serders can make federation requests freely.
|
||||
# All other workers must proxy via one of those three.
|
||||
master:
|
||||
host: localhost
|
||||
port: 8001
|
||||
federation_sender1:
|
||||
host: localhost
|
||||
port: 8002
|
||||
federation_sender2:
|
||||
host: localhost
|
||||
port: 8003
|
||||
```
|
||||
|
||||
_New in Synapse 1.79._
|
||||
|
||||
---
|
||||
### `instance_map`
|
||||
|
||||
|
||||
@@ -47,6 +47,8 @@ from twisted.internet.tcp import Port
|
||||
from twisted.logger import LoggingFile, LogLevel
|
||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
from twisted.web.http import HTTPFactory
|
||||
from twisted.web.proxy import Proxy
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
import synapse.util.caches
|
||||
@@ -62,6 +64,7 @@ from synapse.events.presence_router import load_legacy_presence_router
|
||||
from synapse.events.spamcheck import load_legacy_spam_checkers
|
||||
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
|
||||
from synapse.handlers.auth import load_legacy_password_auth_providers
|
||||
from synapse.http.outbound_federation_proxy import OutboundFederationProxyFactory
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import init_tracer
|
||||
@@ -351,7 +354,7 @@ def listen_tcp(
|
||||
return r # type: ignore[return-value]
|
||||
|
||||
|
||||
def listen_http(
|
||||
def listen_http_for_resource(
|
||||
listener_config: ListenerConfig,
|
||||
root_resource: Resource,
|
||||
version_string: str,
|
||||
@@ -360,7 +363,6 @@ def listen_http(
|
||||
reactor: ISynapseReactor = reactor,
|
||||
) -> List[Port]:
|
||||
port = listener_config.port
|
||||
bind_addresses = listener_config.bind_addresses
|
||||
tls = listener_config.tls
|
||||
|
||||
assert listener_config.http_options is not None
|
||||
@@ -369,7 +371,7 @@ def listen_http(
|
||||
if site_tag is None:
|
||||
site_tag = str(port)
|
||||
|
||||
site = SynapseSite(
|
||||
factory = SynapseSite(
|
||||
"synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
|
||||
site_tag,
|
||||
listener_config,
|
||||
@@ -378,13 +380,34 @@ def listen_http(
|
||||
max_request_body_size=max_request_body_size,
|
||||
reactor=reactor,
|
||||
)
|
||||
return listen_http(listener_config, factory, context_factory, reactor)
|
||||
|
||||
|
||||
def listen_outbound_fed_proxy(
|
||||
listener_config: ListenerConfig,
|
||||
context_factory: Optional[IOpenSSLContextFactory],
|
||||
reactor: ISynapseReactor = reactor,
|
||||
) -> None:
|
||||
listen_http(listener_config, OutboundFederationProxyFactory, context_factory, reactor)
|
||||
|
||||
|
||||
def listen_http(
|
||||
listener_config: ListenerConfig,
|
||||
factory: ServerFactory,
|
||||
context_factory: Optional[IOpenSSLContextFactory],
|
||||
reactor: ISynapseReactor = reactor,
|
||||
) -> List[Port]:
|
||||
port = listener_config.port
|
||||
bind_addresses = listener_config.bind_addresses
|
||||
tls = listener_config.tls
|
||||
|
||||
if tls:
|
||||
# refresh_certificate should have been called before this.
|
||||
assert context_factory is not None
|
||||
ports = listen_ssl(
|
||||
bind_addresses,
|
||||
port,
|
||||
site,
|
||||
factory,
|
||||
context_factory,
|
||||
reactor=reactor,
|
||||
)
|
||||
@@ -393,7 +416,7 @@ def listen_http(
|
||||
ports = listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
site,
|
||||
factory,
|
||||
reactor=reactor,
|
||||
)
|
||||
logger.info("Synapse now listening on TCP port %d", port)
|
||||
|
||||
@@ -222,7 +222,7 @@ class GenericWorkerServer(HomeServer):
|
||||
|
||||
root_resource = create_resource_tree(resources, OptionsResource())
|
||||
|
||||
_base.listen_http(
|
||||
_base.listen_http_for_resource(
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
@@ -253,6 +253,19 @@ class GenericWorkerServer(HomeServer):
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
)
|
||||
elif listener.type == "outbound_federation_proxy":
|
||||
if (
|
||||
self.get_instance_name()
|
||||
not in self.config.worker.outbound_fed_restricted_to
|
||||
):
|
||||
logger.warning(
|
||||
"Outbound federation proxy listener configured, but this "
|
||||
"worker is not permitted to send outgoing federation traffic!"
|
||||
)
|
||||
else:
|
||||
_base.listen_outbound_fed_proxy(
|
||||
listener, self.tls_server_context_factory
|
||||
)
|
||||
else:
|
||||
logger.warning("Unsupported listener type: %s", listener.type)
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ from synapse.api.urls import (
|
||||
from synapse.app import _base
|
||||
from synapse.app._base import (
|
||||
handle_startup_exception,
|
||||
listen_http,
|
||||
listen_http_for_resource,
|
||||
max_request_body_size,
|
||||
redirect_stdio_to_logs,
|
||||
register_start,
|
||||
@@ -139,7 +139,7 @@ class SynapseHomeServer(HomeServer):
|
||||
else:
|
||||
root_resource = OptionsResource()
|
||||
|
||||
ports = listen_http(
|
||||
ports = listen_http_for_resource(
|
||||
listener_config,
|
||||
create_resource_tree(resources, root_resource),
|
||||
self.version_string,
|
||||
@@ -269,6 +269,19 @@ class SynapseHomeServer(HomeServer):
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
)
|
||||
elif listener.type == "outbound_federation_proxy":
|
||||
if (
|
||||
self.get_instance_name()
|
||||
not in self.config.worker.outbound_fed_restricted_to
|
||||
):
|
||||
logger.warning(
|
||||
"Outbound federation proxy listener configured, but this "
|
||||
"worker is not permitted to send outgoing federation traffic!"
|
||||
)
|
||||
else:
|
||||
_base.listen_outbound_fed_proxy(
|
||||
listener, self.tls_server_context_factory
|
||||
)
|
||||
else:
|
||||
# this shouldn't happen, as the listener type should have been checked
|
||||
# during parsing
|
||||
|
||||
@@ -171,6 +171,7 @@ KNOWN_LISTENER_TYPES = {
|
||||
"http",
|
||||
"metrics",
|
||||
"manhole",
|
||||
"outbound_federation_proxy",
|
||||
}
|
||||
|
||||
KNOWN_RESOURCES = {
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
from typing import Any, Dict, List, Union
|
||||
from typing import Any, Dict, List, Mapping, Union
|
||||
|
||||
import attr
|
||||
|
||||
@@ -276,6 +276,8 @@ class WorkerConfig(Config):
|
||||
new_option_name="update_user_directory_from_worker",
|
||||
)
|
||||
|
||||
self.outbound_fed_restricted_to = self._get_outbound_fed_restrictions(config)
|
||||
|
||||
def _should_this_worker_perform_duty(
|
||||
self,
|
||||
config: Dict[str, Any],
|
||||
@@ -426,6 +428,39 @@ class WorkerConfig(Config):
|
||||
|
||||
return worker_instances
|
||||
|
||||
def _get_outbound_fed_restrictions(
|
||||
self, config: Dict[str, Any]
|
||||
) -> Mapping[str, InstanceLocationConfig]:
|
||||
proxied_via = config.get("outbound_federation_proxied_via", {})
|
||||
if not isinstance(proxied_via, dict):
|
||||
raise ConfigError(
|
||||
f"outbound_federation_restricted_to should be a mapping, "
|
||||
f"not {type(proxied_via)}"
|
||||
)
|
||||
|
||||
for instance_name, listener_location in proxied_via.items():
|
||||
if "host" not in listener_location:
|
||||
raise ConfigError(
|
||||
f"outbound_federation_restricted_to/{instance_name} is missing a host"
|
||||
)
|
||||
if "port" not in listener_location:
|
||||
raise ConfigError(
|
||||
f"outbound_federation_restricted_to/{instance_name} is missing a port"
|
||||
)
|
||||
if not isinstance(listener_location["host"], str):
|
||||
raise ConfigError(
|
||||
f"outbound_federation_restricted_to/{instance_name} should be a string"
|
||||
)
|
||||
if not isinstance(listener_location["port"], int):
|
||||
raise ConfigError(
|
||||
f"outbound_federation_restricted_to/{instance_name} should be an integer"
|
||||
)
|
||||
|
||||
return {
|
||||
instance_name: InstanceLocationConfig(listener["host"], listener["port"])
|
||||
for instance_name, listener in proxied_via.items()
|
||||
}
|
||||
|
||||
def read_arguments(self, args: argparse.Namespace) -> None:
|
||||
# We support a bunch of command line arguments that override options in
|
||||
# the config. A lot of these options have a worker_* prefix when running
|
||||
|
||||
@@ -11,9 +11,10 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import abc
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import Any, Generator, List, Optional
|
||||
from typing import Any, Generator, List, Mapping, Optional
|
||||
from urllib.request import ( # type: ignore[attr-defined]
|
||||
getproxies_environment,
|
||||
proxy_bypass_environment,
|
||||
@@ -30,10 +31,11 @@ from twisted.internet.interfaces import (
|
||||
IReactorCore,
|
||||
IStreamClientEndpoint,
|
||||
)
|
||||
from twisted.web.client import URI, Agent, HTTPConnectionPool
|
||||
from twisted.web.client import URI, Agent, HTTPConnectionPool, _AgentBase
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse
|
||||
|
||||
from synapse.config.workers import InstanceLocationConfig
|
||||
from synapse.crypto.context_factory import FederationPolicyForHTTPS
|
||||
from synapse.http import proxyagent
|
||||
from synapse.http.client import BlacklistingAgentWrapper, BlacklistingReactorWrapper
|
||||
@@ -49,10 +51,11 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@implementer(IAgent)
|
||||
class MatrixFederationAgent:
|
||||
"""An Agent-like thing which provides a `request` method which correctly
|
||||
handles resolving matrix server names when using matrix://. Handles standard
|
||||
https URIs as normal.
|
||||
class BaseMatrixFederationAgent(abc.ABC):
|
||||
"""An Agent-like thing which provides a `request` method that accepts matrix://
|
||||
URIs.
|
||||
|
||||
Handles standard https URIs as normal.
|
||||
|
||||
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
|
||||
|
||||
@@ -82,19 +85,16 @@ class MatrixFederationAgent:
|
||||
default implementation.
|
||||
"""
|
||||
|
||||
_agent: IAgent
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
||||
user_agent: bytes,
|
||||
ip_whitelist: IPSet,
|
||||
ip_blacklist: IPSet,
|
||||
_srv_resolver: Optional[SrvResolver] = None,
|
||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||
):
|
||||
# proxy_reactor is not blacklisted
|
||||
proxy_reactor = reactor
|
||||
|
||||
# We need to use a DNS resolver which filters out blacklisted IP
|
||||
# addresses, to prevent DNS rebinding.
|
||||
reactor = BlacklistingReactorWrapper(reactor, ip_whitelist, ip_blacklist)
|
||||
@@ -105,35 +105,8 @@ class MatrixFederationAgent:
|
||||
self._pool.maxPersistentPerHost = 5
|
||||
self._pool.cachedConnectionTimeout = 2 * 60
|
||||
|
||||
self._agent = Agent.usingEndpointFactory(
|
||||
reactor,
|
||||
MatrixHostnameEndpointFactory(
|
||||
reactor,
|
||||
proxy_reactor,
|
||||
tls_client_options_factory,
|
||||
_srv_resolver,
|
||||
),
|
||||
pool=self._pool,
|
||||
)
|
||||
self.user_agent = user_agent
|
||||
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
reactor,
|
||||
agent=BlacklistingAgentWrapper(
|
||||
ProxyAgent(
|
||||
reactor,
|
||||
proxy_reactor,
|
||||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
use_proxy=True,
|
||||
),
|
||||
ip_blacklist=ip_blacklist,
|
||||
),
|
||||
user_agent=self.user_agent,
|
||||
)
|
||||
|
||||
self._well_known_resolver = _well_known_resolver
|
||||
self._reactor = reactor
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request(
|
||||
@@ -164,40 +137,7 @@ class MatrixFederationAgent:
|
||||
# explicit port.
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
|
||||
# There must be a valid hostname.
|
||||
assert parsed_uri.hostname
|
||||
|
||||
# If this is a matrix:// URI check if the server has delegated matrix
|
||||
# traffic using well-known delegation.
|
||||
#
|
||||
# We have to do this here and not in the endpoint as we need to rewrite
|
||||
# the host header with the delegated server name.
|
||||
delegated_server = None
|
||||
if (
|
||||
parsed_uri.scheme == b"matrix"
|
||||
and not _is_ip_literal(parsed_uri.hostname)
|
||||
and not parsed_uri.port
|
||||
):
|
||||
well_known_result = yield defer.ensureDeferred(
|
||||
self._well_known_resolver.get_well_known(parsed_uri.hostname)
|
||||
)
|
||||
delegated_server = well_known_result.delegated_server
|
||||
|
||||
if delegated_server:
|
||||
# Ok, the server has delegated matrix traffic to somewhere else, so
|
||||
# lets rewrite the URL to replace the server with the delegated
|
||||
# server name.
|
||||
uri = urllib.parse.urlunparse(
|
||||
(
|
||||
parsed_uri.scheme,
|
||||
delegated_server,
|
||||
parsed_uri.path,
|
||||
parsed_uri.params,
|
||||
parsed_uri.query,
|
||||
parsed_uri.fragment,
|
||||
)
|
||||
)
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
parsed_uri = yield self.postprocess_uri(parsed_uri)
|
||||
|
||||
# We need to make sure the host header is set to the netloc of the
|
||||
# server and that a user-agent is provided.
|
||||
@@ -217,6 +157,101 @@ class MatrixFederationAgent:
|
||||
|
||||
return res
|
||||
|
||||
@abc.abstractmethod
|
||||
@defer.inlineCallbacks
|
||||
def postprocess_uri(
|
||||
self, parsed_uri: "urllib.parse.ParseResultBytes"
|
||||
) -> Generator["defer.Deferred", Any, "urllib.parse.ParseResultBytes"]:
|
||||
...
|
||||
|
||||
|
||||
class MatrixFederationAgent(BaseMatrixFederationAgent):
|
||||
"""A federation agent that resolves server delegation by itself, using
|
||||
SRV or .well-known lookups."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
||||
user_agent: bytes,
|
||||
ip_whitelist: IPSet,
|
||||
ip_blacklist: IPSet,
|
||||
_srv_resolver: Optional[SrvResolver] = None,
|
||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||
):
|
||||
super().__init__(reactor, user_agent, ip_whitelist, ip_blacklist)
|
||||
|
||||
# proxy_reactor is not blacklisted
|
||||
proxy_reactor = reactor
|
||||
|
||||
self._agent = Agent.usingEndpointFactory(
|
||||
self._reactor,
|
||||
MatrixHostnameEndpointFactory(
|
||||
self._reactor,
|
||||
proxy_reactor,
|
||||
tls_client_options_factory,
|
||||
_srv_resolver,
|
||||
),
|
||||
pool=self._pool,
|
||||
)
|
||||
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
reactor,
|
||||
agent=BlacklistingAgentWrapper(
|
||||
ProxyAgent(
|
||||
self._reactor,
|
||||
proxy_reactor,
|
||||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
use_proxy=True,
|
||||
),
|
||||
ip_blacklist=ip_blacklist,
|
||||
),
|
||||
user_agent=self.user_agent,
|
||||
)
|
||||
|
||||
self._well_known_resolver = _well_known_resolver
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def postprocess_uri(
|
||||
self, parsed_uri: "urllib.parse.ParseResultBytes"
|
||||
) -> Generator[defer.Deferred, Any, "urllib.parse.ParseResultBytes"]:
|
||||
# There must be a valid hostname.
|
||||
assert parsed_uri.hostname
|
||||
|
||||
# If this is a matrix:// URI check if the server has delegated matrix
|
||||
# traffic using well-known delegation.
|
||||
#
|
||||
# We have to do this here and not in the endpoint as we need to rewrite
|
||||
# the host header with the delegated server name.
|
||||
delegated_server = None
|
||||
if (
|
||||
parsed_uri.scheme == b"matrix"
|
||||
and not _is_ip_literal(parsed_uri.hostname)
|
||||
and not parsed_uri.port
|
||||
):
|
||||
well_known_result = yield defer.ensureDeferred(
|
||||
self._well_known_resolver.get_well_known(parsed_uri.hostname)
|
||||
)
|
||||
delegated_server = well_known_result.delegated_server
|
||||
if delegated_server:
|
||||
# Ok, the server has delegated matrix traffic to somewhere else, so
|
||||
# lets rewrite the URL to replace the server with the delegated
|
||||
# server name.
|
||||
uri = urllib.parse.urlunparse(
|
||||
(
|
||||
parsed_uri.scheme,
|
||||
delegated_server,
|
||||
parsed_uri.path,
|
||||
parsed_uri.params,
|
||||
parsed_uri.query,
|
||||
parsed_uri.fragment,
|
||||
)
|
||||
)
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
return parsed_uri
|
||||
|
||||
|
||||
@implementer(IAgentEndpointFactory)
|
||||
class MatrixHostnameEndpointFactory:
|
||||
@@ -430,3 +465,79 @@ def _is_ip_literal(host: bytes) -> bool:
|
||||
return True
|
||||
except AddrFormatError:
|
||||
return False
|
||||
|
||||
|
||||
@implementer(IAgent)
|
||||
class InternalProxyMatrixFederationAgentInner(_AgentBase):
|
||||
def __init__(
|
||||
self,
|
||||
reactor: IReactorCore,
|
||||
endpointFactory: IAgentEndpointFactory,
|
||||
pool: HTTPConnectionPool,
|
||||
):
|
||||
_AgentBase.__init__(self, reactor, pool)
|
||||
self._endpointFactory = endpointFactory
|
||||
|
||||
def request(
|
||||
self,
|
||||
method: bytes,
|
||||
uri: bytes,
|
||||
headers: Optional[Headers] = None,
|
||||
bodyProducer: Optional[IBodyProducer] = None,
|
||||
) -> "defer.Deferred[IResponse]":
|
||||
# Cache *all* connections under the same key, since we are only
|
||||
# connecting to a single destination, the proxy:
|
||||
# TODO make second entry an endpoint
|
||||
key = ("http-proxy", None)
|
||||
parsed_uri = URI.fromBytes(uri)
|
||||
return self._requestWithEndpoint(
|
||||
key,
|
||||
self._endpointFactory.endpointForURI(parsed_uri),
|
||||
method,
|
||||
parsed_uri,
|
||||
headers,
|
||||
bodyProducer,
|
||||
uri,
|
||||
)
|
||||
|
||||
|
||||
class InternalProxyMatrixFederationAgent(BaseMatrixFederationAgent):
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
user_agent: bytes,
|
||||
ip_whitelist: IPSet,
|
||||
ip_blacklist: IPSet,
|
||||
proxy_workers: Mapping[str, InstanceLocationConfig],
|
||||
):
|
||||
super().__init__(reactor, user_agent, ip_whitelist, ip_blacklist)
|
||||
self._agent = InternalProxyMatrixFederationAgentInner(
|
||||
self._reactor,
|
||||
InternalProxyMatrixHostnameEndpointFactory(reactor, proxy_workers),
|
||||
pool=self._pool,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def postprocess_uri(
|
||||
self, parsed_uri: "urllib.parse.ParseResultBytes"
|
||||
) -> Generator[defer.Deferred, Any, urllib.parse.ParseResultBytes]:
|
||||
yield None
|
||||
return parsed_uri
|
||||
|
||||
|
||||
@implementer(IAgentEndpointFactory)
|
||||
class InternalProxyMatrixHostnameEndpointFactory:
|
||||
def __init__(
|
||||
self, reactor: IReactorCore, proxy_workers: Mapping[str, InstanceLocationConfig]
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._proxy_workers = proxy_workers
|
||||
|
||||
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
|
||||
# key = uri.toBytes()
|
||||
# TODO chose instance based on key
|
||||
proxy_location = next(iter(self._proxy_workers.values()))
|
||||
# TODO does this need wrapping with wrapClientTLS?
|
||||
logger.warning(f"DMR: make endpoint for {uri}, {self._proxy_workers=}")
|
||||
rv = HostnameEndpoint(self._reactor, proxy_location.host, proxy_location.port)
|
||||
return rv
|
||||
|
||||
@@ -49,7 +49,7 @@ from twisted.internet.interfaces import IReactorTime
|
||||
from twisted.internet.task import Cooperator
|
||||
from twisted.web.client import ResponseFailed
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IBodyProducer, IResponse
|
||||
from twisted.web.iweb import IAgent, IBodyProducer, IResponse
|
||||
|
||||
import synapse.metrics
|
||||
import synapse.util.retryutils
|
||||
@@ -70,7 +70,10 @@ from synapse.http.client import (
|
||||
encode_query_args,
|
||||
read_body_with_max_size,
|
||||
)
|
||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||
from synapse.http.federation.matrix_federation_agent import (
|
||||
InternalProxyMatrixFederationAgent,
|
||||
MatrixFederationAgent,
|
||||
)
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
@@ -348,13 +351,29 @@ class MatrixFederationHttpClient:
|
||||
if hs.config.server.user_agent_suffix:
|
||||
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
|
||||
|
||||
federation_agent = MatrixFederationAgent(
|
||||
self.reactor,
|
||||
tls_client_options_factory,
|
||||
user_agent.encode("ascii"),
|
||||
hs.config.server.federation_ip_range_whitelist,
|
||||
hs.config.server.federation_ip_range_blacklist,
|
||||
)
|
||||
if (
|
||||
hs.config.worker.outbound_fed_restricted_to
|
||||
and hs.get_instance_name()
|
||||
not in hs.config.worker.outbound_fed_restricted_to
|
||||
):
|
||||
logger.warning("DMR: Using InternalProxyMatrixFederationAgent")
|
||||
# We must
|
||||
federation_agent: IAgent = InternalProxyMatrixFederationAgent(
|
||||
self.reactor,
|
||||
user_agent.encode("ascii"),
|
||||
hs.config.server.federation_ip_range_whitelist,
|
||||
hs.config.server.federation_ip_range_blacklist,
|
||||
hs.config.worker.outbound_fed_restricted_to,
|
||||
)
|
||||
else:
|
||||
logger.warning("DMR: Using MatrixFederationAgent")
|
||||
federation_agent = MatrixFederationAgent(
|
||||
self.reactor,
|
||||
tls_client_options_factory,
|
||||
user_agent.encode("ascii"),
|
||||
hs.config.server.federation_ip_range_whitelist,
|
||||
hs.config.server.federation_ip_range_blacklist,
|
||||
)
|
||||
|
||||
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
|
||||
# blacklist via IP literals in server names
|
||||
|
||||
22
synapse/http/outbound_federation_proxy.py
Normal file
22
synapse/http/outbound_federation_proxy.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from twisted.web.http import HTTPFactory
|
||||
from twisted.web.proxy import Proxy, ProxyClient, ProxyClientFactory, ProxyRequest
|
||||
|
||||
|
||||
class FederationOutboundProxyClient(ProxyClient):
|
||||
...
|
||||
|
||||
|
||||
class FederationOutboundProxyClientFactory(ProxyClientFactory):
|
||||
protocol = FederationOutboundProxyClient
|
||||
|
||||
|
||||
class FederationOutboundProxyRequest(ProxyRequest):
|
||||
protocols = {b"matrix": FederationOutboundProxyClientFactory}
|
||||
ports = {b"matrix": 80}
|
||||
|
||||
|
||||
class FederationOutboundProxy(Proxy):
|
||||
requestFactory = FederationOutboundProxyRequest
|
||||
|
||||
|
||||
OutboundFederationProxyFactory = HTTPFactory.forProtocol(FederationOutboundProxy)
|
||||
Reference in New Issue
Block a user