Compare commits

...

9 Commits

Author SHA1 Message Date
David Robertson
e9eca6d8f4 WIP snapshot 2023-03-03 14:58:21 +00:00
David Robertson
e098fbf9d4 Testing:
- proxy via fed senders in complement
2023-03-02 16:21:45 +00:00
David Robertson
833eae2529 Pull out claim_port 2023-03-02 16:21:45 +00:00
David Robertson
15093ea48b Testing
- Force demo script to listen for proxy
- Refuse to make fed request if restricted
2023-03-02 16:21:45 +00:00
David Robertson
e41a90e890 WIP: make proxy requests 2023-03-02 16:21:36 +00:00
David Robertson
473de81b2d WIP: listen for proxy requests 2023-03-02 16:21:21 +00:00
David Robertson
eb8e679974 Pull out core listen_http machinery 2023-03-02 16:19:34 +00:00
David Robertson
4acecc5ea0 Parse new config 2023-03-02 16:19:33 +00:00
David Robertson
6298d77eb1 Define new config 2023-03-01 18:31:44 +00:00
12 changed files with 405 additions and 99 deletions

View File

@@ -18,6 +18,7 @@ for port in 8080 8081 8082; do
echo "Starting server on port $port... "
https_port=$((port + 400))
proxy_port=$((port + 100))
mkdir -p demo/$port
pushd demo/$port || exit
@@ -65,11 +66,21 @@ for port in 8080 8081 8082; do
resources:
- names: [client, federation]
compress: false
- port: $proxy_port
bind_addresses: ['::1', '127.0.0.1']
type: outbound_federation_proxy
PORTLISTENERS
)
echo "${listeners}"
echo "outbound_federation_proxied_via:"
echo " master:"
echo " host: localhost"
echo " port: $proxy_port"
# Disable TLS for the servers
printf '\n\n# Disable TLS for the servers.'
echo '# DO NOT USE IN PRODUCTION'

View File

@@ -20,6 +20,11 @@ worker_listeners:
- {{ resource }}
{%- endfor %}
{% endif %}
{% if outbound_federation_proxy_port %}
- type: outbound_federation_proxy
port: {{ outbound_federation_proxy_port }}
{% endif %}
worker_log_config: {{ worker_log_config_filepath }}

View File

@@ -328,6 +328,7 @@ def add_worker_roles_to_shared_config(
worker_type: str,
worker_name: str,
worker_port: int,
worker_proxy_port: int,
) -> None:
"""Given a dictionary representing a config file shared across all workers,
append appropriate worker information to it for the current worker_type instance.
@@ -340,6 +341,7 @@ def add_worker_roles_to_shared_config(
"""
# The instance_map config field marks the workers that write to various replication streams
instance_map = shared_config.setdefault("instance_map", {})
proxied_via = shared_config.setdefault("outbound_federation_proxied_via", {})
# Worker-type specific sharding config
if worker_type == "pusher":
@@ -347,6 +349,7 @@ def add_worker_roles_to_shared_config(
elif worker_type == "federation_sender":
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
proxied_via[worker_name] = {"host": "localhost", "port": worker_proxy_port}
elif worker_type == "event_persister":
# Event persisters write to the events stream, so we need to update
@@ -388,6 +391,17 @@ def generate_base_homeserver_config() -> None:
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
# Start worker ports from this arbitrary port
_worker_port = 18009
def claim_port() -> int:
global _worker_port
port = _worker_port
_worker_port += 1
return port
def generate_worker_files(
environ: Mapping[str, str], config_path: str, data_dir: str
) -> None:
@@ -465,9 +479,6 @@ def generate_worker_files(
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
# Start worker ports from this arbitrary port
worker_port = 18009
# A counter of worker_type -> int. Used for determining the name for a given
# worker type when generating its config file, as each worker's name is just
# worker_type + instance #
@@ -479,6 +490,8 @@ def generate_worker_files(
# For each worker type specified by the user, create config values
for worker_type in worker_types:
worker_port = claim_port()
worker_proxy_port = claim_port()
worker_config = WORKERS_CONFIG.get(worker_type)
if worker_config:
worker_config = worker_config.copy()
@@ -495,6 +508,9 @@ def generate_worker_files(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
if worker_type == "federation_sender":
worker_config["outbound_federation_proxy_port"] = worker_proxy_port
# Update the shared config with any worker-type specific options
shared_config.update(worker_config["shared_extra_conf"])
@@ -505,7 +521,7 @@ def generate_worker_files(
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_type, worker_name, worker_port
shared_config, worker_type, worker_name, worker_port, worker_proxy_port
)
# Enable the worker in supervisord
@@ -538,8 +554,6 @@ def generate_worker_files(
worker_log_config_filepath=log_config_filepath,
)
worker_port += 1
# Build the nginx location config blocks
nginx_location_config = ""
for endpoint, upstream in nginx_locations.items():

View File

@@ -433,9 +433,16 @@ See the docs [request log format](../administration/request_log.md).
* `type`: the type of listener. Normally `http`, but other valid options are:
* `manhole`: (see the docs [here](../../manhole.md)),
* `manhole`: see the docs [here](../../manhole.md).
* `metrics`: (see the docs [here](../../metrics-howto.md)),
* `metrics`: see the docs [here](../../metrics-howto.md).
* `outbound_federation_proxy`: for use with worker deployments only. Allows
this worker to make federation requests on behalf of other workers. This
should only be used with [`outbound_federation_proxied_via`](#outbound_federation_proxied_via),
matching the port number for the host specified there.
_New in Synapse 1.79._
* `tls`: set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
@@ -3832,6 +3839,38 @@ federation_sender_instances:
- federation_sender1
- federation_sender2
```
---
### `outbound_federation_proxied_via`
A map from worker names to objects specifying hostnames and port numbers.
By default, Synapse workers make
[federation requests](https://spec.matrix.org/v1.6/server-server-api/)
on-demand. This option lets server operators limit this ability to a subset of
"proxy" workers. Non-proxy workers will not make federation requests directly;
they will do so indirectly via a proxy worker.
By default this mapping is empty, which means that every worker can make federation
requests for themselves.
Example configuration:
```yaml
outbound_federation_proxied_via:
# Master and fed serders can make federation requests freely.
# All other workers must proxy via one of those three.
master:
host: localhost
port: 8001
federation_sender1:
host: localhost
port: 8002
federation_sender2:
host: localhost
port: 8003
```
_New in Synapse 1.79._
---
### `instance_map`

View File

@@ -47,6 +47,8 @@ from twisted.internet.tcp import Port
from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.threadpool import ThreadPool
from twisted.web.http import HTTPFactory
from twisted.web.proxy import Proxy
from twisted.web.resource import Resource
import synapse.util.caches
@@ -62,6 +64,7 @@ from synapse.events.presence_router import load_legacy_presence_router
from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.http.outbound_federation_proxy import OutboundFederationProxyFactory
from synapse.http.site import SynapseSite
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import init_tracer
@@ -351,7 +354,7 @@ def listen_tcp(
return r # type: ignore[return-value]
def listen_http(
def listen_http_for_resource(
listener_config: ListenerConfig,
root_resource: Resource,
version_string: str,
@@ -360,7 +363,6 @@ def listen_http(
reactor: ISynapseReactor = reactor,
) -> List[Port]:
port = listener_config.port
bind_addresses = listener_config.bind_addresses
tls = listener_config.tls
assert listener_config.http_options is not None
@@ -369,7 +371,7 @@ def listen_http(
if site_tag is None:
site_tag = str(port)
site = SynapseSite(
factory = SynapseSite(
"synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
site_tag,
listener_config,
@@ -378,13 +380,34 @@ def listen_http(
max_request_body_size=max_request_body_size,
reactor=reactor,
)
return listen_http(listener_config, factory, context_factory, reactor)
def listen_outbound_fed_proxy(
listener_config: ListenerConfig,
context_factory: Optional[IOpenSSLContextFactory],
reactor: ISynapseReactor = reactor,
) -> None:
listen_http(listener_config, OutboundFederationProxyFactory, context_factory, reactor)
def listen_http(
listener_config: ListenerConfig,
factory: ServerFactory,
context_factory: Optional[IOpenSSLContextFactory],
reactor: ISynapseReactor = reactor,
) -> List[Port]:
port = listener_config.port
bind_addresses = listener_config.bind_addresses
tls = listener_config.tls
if tls:
# refresh_certificate should have been called before this.
assert context_factory is not None
ports = listen_ssl(
bind_addresses,
port,
site,
factory,
context_factory,
reactor=reactor,
)
@@ -393,7 +416,7 @@ def listen_http(
ports = listen_tcp(
bind_addresses,
port,
site,
factory,
reactor=reactor,
)
logger.info("Synapse now listening on TCP port %d", port)

View File

@@ -222,7 +222,7 @@ class GenericWorkerServer(HomeServer):
root_resource = create_resource_tree(resources, OptionsResource())
_base.listen_http(
_base.listen_http_for_resource(
listener_config,
root_resource,
self.version_string,
@@ -253,6 +253,19 @@ class GenericWorkerServer(HomeServer):
listener.bind_addresses,
listener.port,
)
elif listener.type == "outbound_federation_proxy":
if (
self.get_instance_name()
not in self.config.worker.outbound_fed_restricted_to
):
logger.warning(
"Outbound federation proxy listener configured, but this "
"worker is not permitted to send outgoing federation traffic!"
)
else:
_base.listen_outbound_fed_proxy(
listener, self.tls_server_context_factory
)
else:
logger.warning("Unsupported listener type: %s", listener.type)

View File

@@ -37,7 +37,7 @@ from synapse.api.urls import (
from synapse.app import _base
from synapse.app._base import (
handle_startup_exception,
listen_http,
listen_http_for_resource,
max_request_body_size,
redirect_stdio_to_logs,
register_start,
@@ -139,7 +139,7 @@ class SynapseHomeServer(HomeServer):
else:
root_resource = OptionsResource()
ports = listen_http(
ports = listen_http_for_resource(
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
@@ -269,6 +269,19 @@ class SynapseHomeServer(HomeServer):
listener.bind_addresses,
listener.port,
)
elif listener.type == "outbound_federation_proxy":
if (
self.get_instance_name()
not in self.config.worker.outbound_fed_restricted_to
):
logger.warning(
"Outbound federation proxy listener configured, but this "
"worker is not permitted to send outgoing federation traffic!"
)
else:
_base.listen_outbound_fed_proxy(
listener, self.tls_server_context_factory
)
else:
# this shouldn't happen, as the listener type should have been checked
# during parsing

View File

@@ -171,6 +171,7 @@ KNOWN_LISTENER_TYPES = {
"http",
"metrics",
"manhole",
"outbound_federation_proxy",
}
KNOWN_RESOURCES = {

View File

@@ -15,7 +15,7 @@
import argparse
import logging
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Mapping, Union
import attr
@@ -276,6 +276,8 @@ class WorkerConfig(Config):
new_option_name="update_user_directory_from_worker",
)
self.outbound_fed_restricted_to = self._get_outbound_fed_restrictions(config)
def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
@@ -426,6 +428,39 @@ class WorkerConfig(Config):
return worker_instances
def _get_outbound_fed_restrictions(
self, config: Dict[str, Any]
) -> Mapping[str, InstanceLocationConfig]:
proxied_via = config.get("outbound_federation_proxied_via", {})
if not isinstance(proxied_via, dict):
raise ConfigError(
f"outbound_federation_restricted_to should be a mapping, "
f"not {type(proxied_via)}"
)
for instance_name, listener_location in proxied_via.items():
if "host" not in listener_location:
raise ConfigError(
f"outbound_federation_restricted_to/{instance_name} is missing a host"
)
if "port" not in listener_location:
raise ConfigError(
f"outbound_federation_restricted_to/{instance_name} is missing a port"
)
if not isinstance(listener_location["host"], str):
raise ConfigError(
f"outbound_federation_restricted_to/{instance_name} should be a string"
)
if not isinstance(listener_location["port"], int):
raise ConfigError(
f"outbound_federation_restricted_to/{instance_name} should be an integer"
)
return {
instance_name: InstanceLocationConfig(listener["host"], listener["port"])
for instance_name, listener in proxied_via.items()
}
def read_arguments(self, args: argparse.Namespace) -> None:
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running

View File

@@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import urllib.parse
from typing import Any, Generator, List, Optional
from typing import Any, Generator, List, Mapping, Optional
from urllib.request import ( # type: ignore[attr-defined]
getproxies_environment,
proxy_bypass_environment,
@@ -30,10 +31,11 @@ from twisted.internet.interfaces import (
IReactorCore,
IStreamClientEndpoint,
)
from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.client import URI, Agent, HTTPConnectionPool, _AgentBase
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse
from synapse.config.workers import InstanceLocationConfig
from synapse.crypto.context_factory import FederationPolicyForHTTPS
from synapse.http import proxyagent
from synapse.http.client import BlacklistingAgentWrapper, BlacklistingReactorWrapper
@@ -49,10 +51,11 @@ logger = logging.getLogger(__name__)
@implementer(IAgent)
class MatrixFederationAgent:
"""An Agent-like thing which provides a `request` method which correctly
handles resolving matrix server names when using matrix://. Handles standard
https URIs as normal.
class BaseMatrixFederationAgent(abc.ABC):
"""An Agent-like thing which provides a `request` method that accepts matrix://
URIs.
Handles standard https URIs as normal.
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
@@ -82,19 +85,16 @@ class MatrixFederationAgent:
default implementation.
"""
_agent: IAgent
def __init__(
self,
reactor: ISynapseReactor,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_whitelist: IPSet,
ip_blacklist: IPSet,
_srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None,
):
# proxy_reactor is not blacklisted
proxy_reactor = reactor
# We need to use a DNS resolver which filters out blacklisted IP
# addresses, to prevent DNS rebinding.
reactor = BlacklistingReactorWrapper(reactor, ip_whitelist, ip_blacklist)
@@ -105,35 +105,8 @@ class MatrixFederationAgent:
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
self._agent = Agent.usingEndpointFactory(
reactor,
MatrixHostnameEndpointFactory(
reactor,
proxy_reactor,
tls_client_options_factory,
_srv_resolver,
),
pool=self._pool,
)
self.user_agent = user_agent
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
reactor,
agent=BlacklistingAgentWrapper(
ProxyAgent(
reactor,
proxy_reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
use_proxy=True,
),
ip_blacklist=ip_blacklist,
),
user_agent=self.user_agent,
)
self._well_known_resolver = _well_known_resolver
self._reactor = reactor
@defer.inlineCallbacks
def request(
@@ -164,40 +137,7 @@ class MatrixFederationAgent:
# explicit port.
parsed_uri = urllib.parse.urlparse(uri)
# There must be a valid hostname.
assert parsed_uri.hostname
# If this is a matrix:// URI check if the server has delegated matrix
# traffic using well-known delegation.
#
# We have to do this here and not in the endpoint as we need to rewrite
# the host header with the delegated server name.
delegated_server = None
if (
parsed_uri.scheme == b"matrix"
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
well_known_result = yield defer.ensureDeferred(
self._well_known_resolver.get_well_known(parsed_uri.hostname)
)
delegated_server = well_known_result.delegated_server
if delegated_server:
# Ok, the server has delegated matrix traffic to somewhere else, so
# lets rewrite the URL to replace the server with the delegated
# server name.
uri = urllib.parse.urlunparse(
(
parsed_uri.scheme,
delegated_server,
parsed_uri.path,
parsed_uri.params,
parsed_uri.query,
parsed_uri.fragment,
)
)
parsed_uri = urllib.parse.urlparse(uri)
parsed_uri = yield self.postprocess_uri(parsed_uri)
# We need to make sure the host header is set to the netloc of the
# server and that a user-agent is provided.
@@ -217,6 +157,101 @@ class MatrixFederationAgent:
return res
@abc.abstractmethod
@defer.inlineCallbacks
def postprocess_uri(
self, parsed_uri: "urllib.parse.ParseResultBytes"
) -> Generator["defer.Deferred", Any, "urllib.parse.ParseResultBytes"]:
...
class MatrixFederationAgent(BaseMatrixFederationAgent):
"""A federation agent that resolves server delegation by itself, using
SRV or .well-known lookups."""
def __init__(
self,
reactor: ISynapseReactor,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_whitelist: IPSet,
ip_blacklist: IPSet,
_srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None,
):
super().__init__(reactor, user_agent, ip_whitelist, ip_blacklist)
# proxy_reactor is not blacklisted
proxy_reactor = reactor
self._agent = Agent.usingEndpointFactory(
self._reactor,
MatrixHostnameEndpointFactory(
self._reactor,
proxy_reactor,
tls_client_options_factory,
_srv_resolver,
),
pool=self._pool,
)
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
reactor,
agent=BlacklistingAgentWrapper(
ProxyAgent(
self._reactor,
proxy_reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
use_proxy=True,
),
ip_blacklist=ip_blacklist,
),
user_agent=self.user_agent,
)
self._well_known_resolver = _well_known_resolver
@defer.inlineCallbacks
def postprocess_uri(
self, parsed_uri: "urllib.parse.ParseResultBytes"
) -> Generator[defer.Deferred, Any, "urllib.parse.ParseResultBytes"]:
# There must be a valid hostname.
assert parsed_uri.hostname
# If this is a matrix:// URI check if the server has delegated matrix
# traffic using well-known delegation.
#
# We have to do this here and not in the endpoint as we need to rewrite
# the host header with the delegated server name.
delegated_server = None
if (
parsed_uri.scheme == b"matrix"
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
well_known_result = yield defer.ensureDeferred(
self._well_known_resolver.get_well_known(parsed_uri.hostname)
)
delegated_server = well_known_result.delegated_server
if delegated_server:
# Ok, the server has delegated matrix traffic to somewhere else, so
# lets rewrite the URL to replace the server with the delegated
# server name.
uri = urllib.parse.urlunparse(
(
parsed_uri.scheme,
delegated_server,
parsed_uri.path,
parsed_uri.params,
parsed_uri.query,
parsed_uri.fragment,
)
)
parsed_uri = urllib.parse.urlparse(uri)
return parsed_uri
@implementer(IAgentEndpointFactory)
class MatrixHostnameEndpointFactory:
@@ -430,3 +465,79 @@ def _is_ip_literal(host: bytes) -> bool:
return True
except AddrFormatError:
return False
@implementer(IAgent)
class InternalProxyMatrixFederationAgentInner(_AgentBase):
def __init__(
self,
reactor: IReactorCore,
endpointFactory: IAgentEndpointFactory,
pool: HTTPConnectionPool,
):
_AgentBase.__init__(self, reactor, pool)
self._endpointFactory = endpointFactory
def request(
self,
method: bytes,
uri: bytes,
headers: Optional[Headers] = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> "defer.Deferred[IResponse]":
# Cache *all* connections under the same key, since we are only
# connecting to a single destination, the proxy:
# TODO make second entry an endpoint
key = ("http-proxy", None)
parsed_uri = URI.fromBytes(uri)
return self._requestWithEndpoint(
key,
self._endpointFactory.endpointForURI(parsed_uri),
method,
parsed_uri,
headers,
bodyProducer,
uri,
)
class InternalProxyMatrixFederationAgent(BaseMatrixFederationAgent):
def __init__(
self,
reactor: ISynapseReactor,
user_agent: bytes,
ip_whitelist: IPSet,
ip_blacklist: IPSet,
proxy_workers: Mapping[str, InstanceLocationConfig],
):
super().__init__(reactor, user_agent, ip_whitelist, ip_blacklist)
self._agent = InternalProxyMatrixFederationAgentInner(
self._reactor,
InternalProxyMatrixHostnameEndpointFactory(reactor, proxy_workers),
pool=self._pool,
)
@defer.inlineCallbacks
def postprocess_uri(
self, parsed_uri: "urllib.parse.ParseResultBytes"
) -> Generator[defer.Deferred, Any, urllib.parse.ParseResultBytes]:
yield None
return parsed_uri
@implementer(IAgentEndpointFactory)
class InternalProxyMatrixHostnameEndpointFactory:
def __init__(
self, reactor: IReactorCore, proxy_workers: Mapping[str, InstanceLocationConfig]
):
self._reactor = reactor
self._proxy_workers = proxy_workers
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
# key = uri.toBytes()
# TODO chose instance based on key
proxy_location = next(iter(self._proxy_workers.values()))
# TODO does this need wrapping with wrapClientTLS?
logger.warning(f"DMR: make endpoint for {uri}, {self._proxy_workers=}")
rv = HostnameEndpoint(self._reactor, proxy_location.host, proxy_location.port)
return rv

View File

@@ -49,7 +49,7 @@ from twisted.internet.interfaces import IReactorTime
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse
from twisted.web.iweb import IAgent, IBodyProducer, IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -70,7 +70,10 @@ from synapse.http.client import (
encode_query_args,
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.federation.matrix_federation_agent import (
InternalProxyMatrixFederationAgent,
MatrixFederationAgent,
)
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -348,13 +351,29 @@ class MatrixFederationHttpClient:
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_whitelist,
hs.config.server.federation_ip_range_blacklist,
)
if (
hs.config.worker.outbound_fed_restricted_to
and hs.get_instance_name()
not in hs.config.worker.outbound_fed_restricted_to
):
logger.warning("DMR: Using InternalProxyMatrixFederationAgent")
# We must
federation_agent: IAgent = InternalProxyMatrixFederationAgent(
self.reactor,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_whitelist,
hs.config.server.federation_ip_range_blacklist,
hs.config.worker.outbound_fed_restricted_to,
)
else:
logger.warning("DMR: Using MatrixFederationAgent")
federation_agent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_whitelist,
hs.config.server.federation_ip_range_blacklist,
)
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
# blacklist via IP literals in server names

View File

@@ -0,0 +1,22 @@
from twisted.web.http import HTTPFactory
from twisted.web.proxy import Proxy, ProxyClient, ProxyClientFactory, ProxyRequest
class FederationOutboundProxyClient(ProxyClient):
...
class FederationOutboundProxyClientFactory(ProxyClientFactory):
protocol = FederationOutboundProxyClient
class FederationOutboundProxyRequest(ProxyRequest):
protocols = {b"matrix": FederationOutboundProxyClientFactory}
ports = {b"matrix": 80}
class FederationOutboundProxy(Proxy):
requestFactory = FederationOutboundProxyRequest
OutboundFederationProxyFactory = HTTPFactory.forProtocol(FederationOutboundProxy)