mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-13 01:50:46 +00:00
Compare commits
9 Commits
madlittlem
...
dmr/restri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9eca6d8f4 | ||
|
|
e098fbf9d4 | ||
|
|
833eae2529 | ||
|
|
15093ea48b | ||
|
|
e41a90e890 | ||
|
|
473de81b2d | ||
|
|
eb8e679974 | ||
|
|
4acecc5ea0 | ||
|
|
6298d77eb1 |
@@ -18,6 +18,7 @@ for port in 8080 8081 8082; do
|
|||||||
echo "Starting server on port $port... "
|
echo "Starting server on port $port... "
|
||||||
|
|
||||||
https_port=$((port + 400))
|
https_port=$((port + 400))
|
||||||
|
proxy_port=$((port + 100))
|
||||||
mkdir -p demo/$port
|
mkdir -p demo/$port
|
||||||
pushd demo/$port || exit
|
pushd demo/$port || exit
|
||||||
|
|
||||||
@@ -65,11 +66,21 @@ for port in 8080 8081 8082; do
|
|||||||
resources:
|
resources:
|
||||||
- names: [client, federation]
|
- names: [client, federation]
|
||||||
compress: false
|
compress: false
|
||||||
|
|
||||||
|
- port: $proxy_port
|
||||||
|
bind_addresses: ['::1', '127.0.0.1']
|
||||||
|
type: outbound_federation_proxy
|
||||||
|
|
||||||
PORTLISTENERS
|
PORTLISTENERS
|
||||||
)
|
)
|
||||||
|
|
||||||
echo "${listeners}"
|
echo "${listeners}"
|
||||||
|
|
||||||
|
echo "outbound_federation_proxied_via:"
|
||||||
|
echo " master:"
|
||||||
|
echo " host: localhost"
|
||||||
|
echo " port: $proxy_port"
|
||||||
|
|
||||||
# Disable TLS for the servers
|
# Disable TLS for the servers
|
||||||
printf '\n\n# Disable TLS for the servers.'
|
printf '\n\n# Disable TLS for the servers.'
|
||||||
echo '# DO NOT USE IN PRODUCTION'
|
echo '# DO NOT USE IN PRODUCTION'
|
||||||
|
|||||||
@@ -20,6 +20,11 @@ worker_listeners:
|
|||||||
- {{ resource }}
|
- {{ resource }}
|
||||||
{%- endfor %}
|
{%- endfor %}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% if outbound_federation_proxy_port %}
|
||||||
|
- type: outbound_federation_proxy
|
||||||
|
port: {{ outbound_federation_proxy_port }}
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
|
||||||
worker_log_config: {{ worker_log_config_filepath }}
|
worker_log_config: {{ worker_log_config_filepath }}
|
||||||
|
|
||||||
|
|||||||
@@ -328,6 +328,7 @@ def add_worker_roles_to_shared_config(
|
|||||||
worker_type: str,
|
worker_type: str,
|
||||||
worker_name: str,
|
worker_name: str,
|
||||||
worker_port: int,
|
worker_port: int,
|
||||||
|
worker_proxy_port: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Given a dictionary representing a config file shared across all workers,
|
"""Given a dictionary representing a config file shared across all workers,
|
||||||
append appropriate worker information to it for the current worker_type instance.
|
append appropriate worker information to it for the current worker_type instance.
|
||||||
@@ -340,6 +341,7 @@ def add_worker_roles_to_shared_config(
|
|||||||
"""
|
"""
|
||||||
# The instance_map config field marks the workers that write to various replication streams
|
# The instance_map config field marks the workers that write to various replication streams
|
||||||
instance_map = shared_config.setdefault("instance_map", {})
|
instance_map = shared_config.setdefault("instance_map", {})
|
||||||
|
proxied_via = shared_config.setdefault("outbound_federation_proxied_via", {})
|
||||||
|
|
||||||
# Worker-type specific sharding config
|
# Worker-type specific sharding config
|
||||||
if worker_type == "pusher":
|
if worker_type == "pusher":
|
||||||
@@ -347,6 +349,7 @@ def add_worker_roles_to_shared_config(
|
|||||||
|
|
||||||
elif worker_type == "federation_sender":
|
elif worker_type == "federation_sender":
|
||||||
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
||||||
|
proxied_via[worker_name] = {"host": "localhost", "port": worker_proxy_port}
|
||||||
|
|
||||||
elif worker_type == "event_persister":
|
elif worker_type == "event_persister":
|
||||||
# Event persisters write to the events stream, so we need to update
|
# Event persisters write to the events stream, so we need to update
|
||||||
@@ -388,6 +391,17 @@ def generate_base_homeserver_config() -> None:
|
|||||||
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
||||||
|
|
||||||
|
|
||||||
|
# Start worker ports from this arbitrary port
|
||||||
|
_worker_port = 18009
|
||||||
|
|
||||||
|
|
||||||
|
def claim_port() -> int:
|
||||||
|
global _worker_port
|
||||||
|
port = _worker_port
|
||||||
|
_worker_port += 1
|
||||||
|
return port
|
||||||
|
|
||||||
|
|
||||||
def generate_worker_files(
|
def generate_worker_files(
|
||||||
environ: Mapping[str, str], config_path: str, data_dir: str
|
environ: Mapping[str, str], config_path: str, data_dir: str
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -465,9 +479,6 @@ def generate_worker_files(
|
|||||||
# Create the worker configuration directory if it doesn't already exist
|
# Create the worker configuration directory if it doesn't already exist
|
||||||
os.makedirs("/conf/workers", exist_ok=True)
|
os.makedirs("/conf/workers", exist_ok=True)
|
||||||
|
|
||||||
# Start worker ports from this arbitrary port
|
|
||||||
worker_port = 18009
|
|
||||||
|
|
||||||
# A counter of worker_type -> int. Used for determining the name for a given
|
# A counter of worker_type -> int. Used for determining the name for a given
|
||||||
# worker type when generating its config file, as each worker's name is just
|
# worker type when generating its config file, as each worker's name is just
|
||||||
# worker_type + instance #
|
# worker_type + instance #
|
||||||
@@ -479,6 +490,8 @@ def generate_worker_files(
|
|||||||
|
|
||||||
# For each worker type specified by the user, create config values
|
# For each worker type specified by the user, create config values
|
||||||
for worker_type in worker_types:
|
for worker_type in worker_types:
|
||||||
|
worker_port = claim_port()
|
||||||
|
worker_proxy_port = claim_port()
|
||||||
worker_config = WORKERS_CONFIG.get(worker_type)
|
worker_config = WORKERS_CONFIG.get(worker_type)
|
||||||
if worker_config:
|
if worker_config:
|
||||||
worker_config = worker_config.copy()
|
worker_config = worker_config.copy()
|
||||||
@@ -495,6 +508,9 @@ def generate_worker_files(
|
|||||||
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if worker_type == "federation_sender":
|
||||||
|
worker_config["outbound_federation_proxy_port"] = worker_proxy_port
|
||||||
|
|
||||||
# Update the shared config with any worker-type specific options
|
# Update the shared config with any worker-type specific options
|
||||||
shared_config.update(worker_config["shared_extra_conf"])
|
shared_config.update(worker_config["shared_extra_conf"])
|
||||||
|
|
||||||
@@ -505,7 +521,7 @@ def generate_worker_files(
|
|||||||
|
|
||||||
# Update the shared config with sharding-related options if necessary
|
# Update the shared config with sharding-related options if necessary
|
||||||
add_worker_roles_to_shared_config(
|
add_worker_roles_to_shared_config(
|
||||||
shared_config, worker_type, worker_name, worker_port
|
shared_config, worker_type, worker_name, worker_port, worker_proxy_port
|
||||||
)
|
)
|
||||||
|
|
||||||
# Enable the worker in supervisord
|
# Enable the worker in supervisord
|
||||||
@@ -538,8 +554,6 @@ def generate_worker_files(
|
|||||||
worker_log_config_filepath=log_config_filepath,
|
worker_log_config_filepath=log_config_filepath,
|
||||||
)
|
)
|
||||||
|
|
||||||
worker_port += 1
|
|
||||||
|
|
||||||
# Build the nginx location config blocks
|
# Build the nginx location config blocks
|
||||||
nginx_location_config = ""
|
nginx_location_config = ""
|
||||||
for endpoint, upstream in nginx_locations.items():
|
for endpoint, upstream in nginx_locations.items():
|
||||||
|
|||||||
@@ -433,9 +433,16 @@ See the docs [request log format](../administration/request_log.md).
|
|||||||
|
|
||||||
* `type`: the type of listener. Normally `http`, but other valid options are:
|
* `type`: the type of listener. Normally `http`, but other valid options are:
|
||||||
|
|
||||||
* `manhole`: (see the docs [here](../../manhole.md)),
|
* `manhole`: see the docs [here](../../manhole.md).
|
||||||
|
|
||||||
* `metrics`: (see the docs [here](../../metrics-howto.md)),
|
* `metrics`: see the docs [here](../../metrics-howto.md).
|
||||||
|
|
||||||
|
* `outbound_federation_proxy`: for use with worker deployments only. Allows
|
||||||
|
this worker to make federation requests on behalf of other workers. This
|
||||||
|
should only be used with [`outbound_federation_proxied_via`](#outbound_federation_proxied_via),
|
||||||
|
matching the port number for the host specified there.
|
||||||
|
|
||||||
|
_New in Synapse 1.79._
|
||||||
|
|
||||||
* `tls`: set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
|
* `tls`: set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
|
||||||
|
|
||||||
@@ -3832,6 +3839,38 @@ federation_sender_instances:
|
|||||||
- federation_sender1
|
- federation_sender1
|
||||||
- federation_sender2
|
- federation_sender2
|
||||||
```
|
```
|
||||||
|
---
|
||||||
|
### `outbound_federation_proxied_via`
|
||||||
|
|
||||||
|
A map from worker names to objects specifying hostnames and port numbers.
|
||||||
|
|
||||||
|
By default, Synapse workers make
|
||||||
|
[federation requests](https://spec.matrix.org/v1.6/server-server-api/)
|
||||||
|
on-demand. This option lets server operators limit this ability to a subset of
|
||||||
|
"proxy" workers. Non-proxy workers will not make federation requests directly;
|
||||||
|
they will do so indirectly via a proxy worker.
|
||||||
|
|
||||||
|
By default this mapping is empty, which means that every worker can make federation
|
||||||
|
requests for themselves.
|
||||||
|
|
||||||
|
Example configuration:
|
||||||
|
```yaml
|
||||||
|
outbound_federation_proxied_via:
|
||||||
|
# Master and fed serders can make federation requests freely.
|
||||||
|
# All other workers must proxy via one of those three.
|
||||||
|
master:
|
||||||
|
host: localhost
|
||||||
|
port: 8001
|
||||||
|
federation_sender1:
|
||||||
|
host: localhost
|
||||||
|
port: 8002
|
||||||
|
federation_sender2:
|
||||||
|
host: localhost
|
||||||
|
port: 8003
|
||||||
|
```
|
||||||
|
|
||||||
|
_New in Synapse 1.79._
|
||||||
|
|
||||||
---
|
---
|
||||||
### `instance_map`
|
### `instance_map`
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,8 @@ from twisted.internet.tcp import Port
|
|||||||
from twisted.logger import LoggingFile, LogLevel
|
from twisted.logger import LoggingFile, LogLevel
|
||||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||||
from twisted.python.threadpool import ThreadPool
|
from twisted.python.threadpool import ThreadPool
|
||||||
|
from twisted.web.http import HTTPFactory
|
||||||
|
from twisted.web.proxy import Proxy
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
|
|
||||||
import synapse.util.caches
|
import synapse.util.caches
|
||||||
@@ -62,6 +64,7 @@ from synapse.events.presence_router import load_legacy_presence_router
|
|||||||
from synapse.events.spamcheck import load_legacy_spam_checkers
|
from synapse.events.spamcheck import load_legacy_spam_checkers
|
||||||
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
|
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
|
||||||
from synapse.handlers.auth import load_legacy_password_auth_providers
|
from synapse.handlers.auth import load_legacy_password_auth_providers
|
||||||
|
from synapse.http.outbound_federation_proxy import OutboundFederationProxyFactory
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import PreserveLoggingContext
|
from synapse.logging.context import PreserveLoggingContext
|
||||||
from synapse.logging.opentracing import init_tracer
|
from synapse.logging.opentracing import init_tracer
|
||||||
@@ -351,7 +354,7 @@ def listen_tcp(
|
|||||||
return r # type: ignore[return-value]
|
return r # type: ignore[return-value]
|
||||||
|
|
||||||
|
|
||||||
def listen_http(
|
def listen_http_for_resource(
|
||||||
listener_config: ListenerConfig,
|
listener_config: ListenerConfig,
|
||||||
root_resource: Resource,
|
root_resource: Resource,
|
||||||
version_string: str,
|
version_string: str,
|
||||||
@@ -360,7 +363,6 @@ def listen_http(
|
|||||||
reactor: ISynapseReactor = reactor,
|
reactor: ISynapseReactor = reactor,
|
||||||
) -> List[Port]:
|
) -> List[Port]:
|
||||||
port = listener_config.port
|
port = listener_config.port
|
||||||
bind_addresses = listener_config.bind_addresses
|
|
||||||
tls = listener_config.tls
|
tls = listener_config.tls
|
||||||
|
|
||||||
assert listener_config.http_options is not None
|
assert listener_config.http_options is not None
|
||||||
@@ -369,7 +371,7 @@ def listen_http(
|
|||||||
if site_tag is None:
|
if site_tag is None:
|
||||||
site_tag = str(port)
|
site_tag = str(port)
|
||||||
|
|
||||||
site = SynapseSite(
|
factory = SynapseSite(
|
||||||
"synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
|
"synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
|
||||||
site_tag,
|
site_tag,
|
||||||
listener_config,
|
listener_config,
|
||||||
@@ -378,13 +380,34 @@ def listen_http(
|
|||||||
max_request_body_size=max_request_body_size,
|
max_request_body_size=max_request_body_size,
|
||||||
reactor=reactor,
|
reactor=reactor,
|
||||||
)
|
)
|
||||||
|
return listen_http(listener_config, factory, context_factory, reactor)
|
||||||
|
|
||||||
|
|
||||||
|
def listen_outbound_fed_proxy(
|
||||||
|
listener_config: ListenerConfig,
|
||||||
|
context_factory: Optional[IOpenSSLContextFactory],
|
||||||
|
reactor: ISynapseReactor = reactor,
|
||||||
|
) -> None:
|
||||||
|
listen_http(listener_config, OutboundFederationProxyFactory, context_factory, reactor)
|
||||||
|
|
||||||
|
|
||||||
|
def listen_http(
|
||||||
|
listener_config: ListenerConfig,
|
||||||
|
factory: ServerFactory,
|
||||||
|
context_factory: Optional[IOpenSSLContextFactory],
|
||||||
|
reactor: ISynapseReactor = reactor,
|
||||||
|
) -> List[Port]:
|
||||||
|
port = listener_config.port
|
||||||
|
bind_addresses = listener_config.bind_addresses
|
||||||
|
tls = listener_config.tls
|
||||||
|
|
||||||
if tls:
|
if tls:
|
||||||
# refresh_certificate should have been called before this.
|
# refresh_certificate should have been called before this.
|
||||||
assert context_factory is not None
|
assert context_factory is not None
|
||||||
ports = listen_ssl(
|
ports = listen_ssl(
|
||||||
bind_addresses,
|
bind_addresses,
|
||||||
port,
|
port,
|
||||||
site,
|
factory,
|
||||||
context_factory,
|
context_factory,
|
||||||
reactor=reactor,
|
reactor=reactor,
|
||||||
)
|
)
|
||||||
@@ -393,7 +416,7 @@ def listen_http(
|
|||||||
ports = listen_tcp(
|
ports = listen_tcp(
|
||||||
bind_addresses,
|
bind_addresses,
|
||||||
port,
|
port,
|
||||||
site,
|
factory,
|
||||||
reactor=reactor,
|
reactor=reactor,
|
||||||
)
|
)
|
||||||
logger.info("Synapse now listening on TCP port %d", port)
|
logger.info("Synapse now listening on TCP port %d", port)
|
||||||
|
|||||||
@@ -222,7 +222,7 @@ class GenericWorkerServer(HomeServer):
|
|||||||
|
|
||||||
root_resource = create_resource_tree(resources, OptionsResource())
|
root_resource = create_resource_tree(resources, OptionsResource())
|
||||||
|
|
||||||
_base.listen_http(
|
_base.listen_http_for_resource(
|
||||||
listener_config,
|
listener_config,
|
||||||
root_resource,
|
root_resource,
|
||||||
self.version_string,
|
self.version_string,
|
||||||
@@ -253,6 +253,19 @@ class GenericWorkerServer(HomeServer):
|
|||||||
listener.bind_addresses,
|
listener.bind_addresses,
|
||||||
listener.port,
|
listener.port,
|
||||||
)
|
)
|
||||||
|
elif listener.type == "outbound_federation_proxy":
|
||||||
|
if (
|
||||||
|
self.get_instance_name()
|
||||||
|
not in self.config.worker.outbound_fed_restricted_to
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Outbound federation proxy listener configured, but this "
|
||||||
|
"worker is not permitted to send outgoing federation traffic!"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_base.listen_outbound_fed_proxy(
|
||||||
|
listener, self.tls_server_context_factory
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning("Unsupported listener type: %s", listener.type)
|
logger.warning("Unsupported listener type: %s", listener.type)
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ from synapse.api.urls import (
|
|||||||
from synapse.app import _base
|
from synapse.app import _base
|
||||||
from synapse.app._base import (
|
from synapse.app._base import (
|
||||||
handle_startup_exception,
|
handle_startup_exception,
|
||||||
listen_http,
|
listen_http_for_resource,
|
||||||
max_request_body_size,
|
max_request_body_size,
|
||||||
redirect_stdio_to_logs,
|
redirect_stdio_to_logs,
|
||||||
register_start,
|
register_start,
|
||||||
@@ -139,7 +139,7 @@ class SynapseHomeServer(HomeServer):
|
|||||||
else:
|
else:
|
||||||
root_resource = OptionsResource()
|
root_resource = OptionsResource()
|
||||||
|
|
||||||
ports = listen_http(
|
ports = listen_http_for_resource(
|
||||||
listener_config,
|
listener_config,
|
||||||
create_resource_tree(resources, root_resource),
|
create_resource_tree(resources, root_resource),
|
||||||
self.version_string,
|
self.version_string,
|
||||||
@@ -269,6 +269,19 @@ class SynapseHomeServer(HomeServer):
|
|||||||
listener.bind_addresses,
|
listener.bind_addresses,
|
||||||
listener.port,
|
listener.port,
|
||||||
)
|
)
|
||||||
|
elif listener.type == "outbound_federation_proxy":
|
||||||
|
if (
|
||||||
|
self.get_instance_name()
|
||||||
|
not in self.config.worker.outbound_fed_restricted_to
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Outbound federation proxy listener configured, but this "
|
||||||
|
"worker is not permitted to send outgoing federation traffic!"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_base.listen_outbound_fed_proxy(
|
||||||
|
listener, self.tls_server_context_factory
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# this shouldn't happen, as the listener type should have been checked
|
# this shouldn't happen, as the listener type should have been checked
|
||||||
# during parsing
|
# during parsing
|
||||||
|
|||||||
@@ -171,6 +171,7 @@ KNOWN_LISTENER_TYPES = {
|
|||||||
"http",
|
"http",
|
||||||
"metrics",
|
"metrics",
|
||||||
"manhole",
|
"manhole",
|
||||||
|
"outbound_federation_proxy",
|
||||||
}
|
}
|
||||||
|
|
||||||
KNOWN_RESOURCES = {
|
KNOWN_RESOURCES = {
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Dict, List, Union
|
from typing import Any, Dict, List, Mapping, Union
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
@@ -276,6 +276,8 @@ class WorkerConfig(Config):
|
|||||||
new_option_name="update_user_directory_from_worker",
|
new_option_name="update_user_directory_from_worker",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.outbound_fed_restricted_to = self._get_outbound_fed_restrictions(config)
|
||||||
|
|
||||||
def _should_this_worker_perform_duty(
|
def _should_this_worker_perform_duty(
|
||||||
self,
|
self,
|
||||||
config: Dict[str, Any],
|
config: Dict[str, Any],
|
||||||
@@ -426,6 +428,39 @@ class WorkerConfig(Config):
|
|||||||
|
|
||||||
return worker_instances
|
return worker_instances
|
||||||
|
|
||||||
|
def _get_outbound_fed_restrictions(
|
||||||
|
self, config: Dict[str, Any]
|
||||||
|
) -> Mapping[str, InstanceLocationConfig]:
|
||||||
|
proxied_via = config.get("outbound_federation_proxied_via", {})
|
||||||
|
if not isinstance(proxied_via, dict):
|
||||||
|
raise ConfigError(
|
||||||
|
f"outbound_federation_restricted_to should be a mapping, "
|
||||||
|
f"not {type(proxied_via)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
for instance_name, listener_location in proxied_via.items():
|
||||||
|
if "host" not in listener_location:
|
||||||
|
raise ConfigError(
|
||||||
|
f"outbound_federation_restricted_to/{instance_name} is missing a host"
|
||||||
|
)
|
||||||
|
if "port" not in listener_location:
|
||||||
|
raise ConfigError(
|
||||||
|
f"outbound_federation_restricted_to/{instance_name} is missing a port"
|
||||||
|
)
|
||||||
|
if not isinstance(listener_location["host"], str):
|
||||||
|
raise ConfigError(
|
||||||
|
f"outbound_federation_restricted_to/{instance_name} should be a string"
|
||||||
|
)
|
||||||
|
if not isinstance(listener_location["port"], int):
|
||||||
|
raise ConfigError(
|
||||||
|
f"outbound_federation_restricted_to/{instance_name} should be an integer"
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
instance_name: InstanceLocationConfig(listener["host"], listener["port"])
|
||||||
|
for instance_name, listener in proxied_via.items()
|
||||||
|
}
|
||||||
|
|
||||||
def read_arguments(self, args: argparse.Namespace) -> None:
|
def read_arguments(self, args: argparse.Namespace) -> None:
|
||||||
# We support a bunch of command line arguments that override options in
|
# We support a bunch of command line arguments that override options in
|
||||||
# the config. A lot of these options have a worker_* prefix when running
|
# the config. A lot of these options have a worker_* prefix when running
|
||||||
|
|||||||
@@ -11,9 +11,10 @@
|
|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import abc
|
||||||
import logging
|
import logging
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from typing import Any, Generator, List, Optional
|
from typing import Any, Generator, List, Mapping, Optional
|
||||||
from urllib.request import ( # type: ignore[attr-defined]
|
from urllib.request import ( # type: ignore[attr-defined]
|
||||||
getproxies_environment,
|
getproxies_environment,
|
||||||
proxy_bypass_environment,
|
proxy_bypass_environment,
|
||||||
@@ -30,10 +31,11 @@ from twisted.internet.interfaces import (
|
|||||||
IReactorCore,
|
IReactorCore,
|
||||||
IStreamClientEndpoint,
|
IStreamClientEndpoint,
|
||||||
)
|
)
|
||||||
from twisted.web.client import URI, Agent, HTTPConnectionPool
|
from twisted.web.client import URI, Agent, HTTPConnectionPool, _AgentBase
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse
|
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse
|
||||||
|
|
||||||
|
from synapse.config.workers import InstanceLocationConfig
|
||||||
from synapse.crypto.context_factory import FederationPolicyForHTTPS
|
from synapse.crypto.context_factory import FederationPolicyForHTTPS
|
||||||
from synapse.http import proxyagent
|
from synapse.http import proxyagent
|
||||||
from synapse.http.client import BlacklistingAgentWrapper, BlacklistingReactorWrapper
|
from synapse.http.client import BlacklistingAgentWrapper, BlacklistingReactorWrapper
|
||||||
@@ -49,10 +51,11 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
@implementer(IAgent)
|
@implementer(IAgent)
|
||||||
class MatrixFederationAgent:
|
class BaseMatrixFederationAgent(abc.ABC):
|
||||||
"""An Agent-like thing which provides a `request` method which correctly
|
"""An Agent-like thing which provides a `request` method that accepts matrix://
|
||||||
handles resolving matrix server names when using matrix://. Handles standard
|
URIs.
|
||||||
https URIs as normal.
|
|
||||||
|
Handles standard https URIs as normal.
|
||||||
|
|
||||||
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
|
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
|
||||||
|
|
||||||
@@ -82,19 +85,16 @@ class MatrixFederationAgent:
|
|||||||
default implementation.
|
default implementation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_agent: IAgent
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
reactor: ISynapseReactor,
|
reactor: ISynapseReactor,
|
||||||
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
|
||||||
user_agent: bytes,
|
user_agent: bytes,
|
||||||
ip_whitelist: IPSet,
|
ip_whitelist: IPSet,
|
||||||
ip_blacklist: IPSet,
|
ip_blacklist: IPSet,
|
||||||
_srv_resolver: Optional[SrvResolver] = None,
|
_srv_resolver: Optional[SrvResolver] = None,
|
||||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
|
||||||
):
|
):
|
||||||
# proxy_reactor is not blacklisted
|
|
||||||
proxy_reactor = reactor
|
|
||||||
|
|
||||||
# We need to use a DNS resolver which filters out blacklisted IP
|
# We need to use a DNS resolver which filters out blacklisted IP
|
||||||
# addresses, to prevent DNS rebinding.
|
# addresses, to prevent DNS rebinding.
|
||||||
reactor = BlacklistingReactorWrapper(reactor, ip_whitelist, ip_blacklist)
|
reactor = BlacklistingReactorWrapper(reactor, ip_whitelist, ip_blacklist)
|
||||||
@@ -105,35 +105,8 @@ class MatrixFederationAgent:
|
|||||||
self._pool.maxPersistentPerHost = 5
|
self._pool.maxPersistentPerHost = 5
|
||||||
self._pool.cachedConnectionTimeout = 2 * 60
|
self._pool.cachedConnectionTimeout = 2 * 60
|
||||||
|
|
||||||
self._agent = Agent.usingEndpointFactory(
|
|
||||||
reactor,
|
|
||||||
MatrixHostnameEndpointFactory(
|
|
||||||
reactor,
|
|
||||||
proxy_reactor,
|
|
||||||
tls_client_options_factory,
|
|
||||||
_srv_resolver,
|
|
||||||
),
|
|
||||||
pool=self._pool,
|
|
||||||
)
|
|
||||||
self.user_agent = user_agent
|
self.user_agent = user_agent
|
||||||
|
self._reactor = reactor
|
||||||
if _well_known_resolver is None:
|
|
||||||
_well_known_resolver = WellKnownResolver(
|
|
||||||
reactor,
|
|
||||||
agent=BlacklistingAgentWrapper(
|
|
||||||
ProxyAgent(
|
|
||||||
reactor,
|
|
||||||
proxy_reactor,
|
|
||||||
pool=self._pool,
|
|
||||||
contextFactory=tls_client_options_factory,
|
|
||||||
use_proxy=True,
|
|
||||||
),
|
|
||||||
ip_blacklist=ip_blacklist,
|
|
||||||
),
|
|
||||||
user_agent=self.user_agent,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._well_known_resolver = _well_known_resolver
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def request(
|
def request(
|
||||||
@@ -164,40 +137,7 @@ class MatrixFederationAgent:
|
|||||||
# explicit port.
|
# explicit port.
|
||||||
parsed_uri = urllib.parse.urlparse(uri)
|
parsed_uri = urllib.parse.urlparse(uri)
|
||||||
|
|
||||||
# There must be a valid hostname.
|
parsed_uri = yield self.postprocess_uri(parsed_uri)
|
||||||
assert parsed_uri.hostname
|
|
||||||
|
|
||||||
# If this is a matrix:// URI check if the server has delegated matrix
|
|
||||||
# traffic using well-known delegation.
|
|
||||||
#
|
|
||||||
# We have to do this here and not in the endpoint as we need to rewrite
|
|
||||||
# the host header with the delegated server name.
|
|
||||||
delegated_server = None
|
|
||||||
if (
|
|
||||||
parsed_uri.scheme == b"matrix"
|
|
||||||
and not _is_ip_literal(parsed_uri.hostname)
|
|
||||||
and not parsed_uri.port
|
|
||||||
):
|
|
||||||
well_known_result = yield defer.ensureDeferred(
|
|
||||||
self._well_known_resolver.get_well_known(parsed_uri.hostname)
|
|
||||||
)
|
|
||||||
delegated_server = well_known_result.delegated_server
|
|
||||||
|
|
||||||
if delegated_server:
|
|
||||||
# Ok, the server has delegated matrix traffic to somewhere else, so
|
|
||||||
# lets rewrite the URL to replace the server with the delegated
|
|
||||||
# server name.
|
|
||||||
uri = urllib.parse.urlunparse(
|
|
||||||
(
|
|
||||||
parsed_uri.scheme,
|
|
||||||
delegated_server,
|
|
||||||
parsed_uri.path,
|
|
||||||
parsed_uri.params,
|
|
||||||
parsed_uri.query,
|
|
||||||
parsed_uri.fragment,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
parsed_uri = urllib.parse.urlparse(uri)
|
|
||||||
|
|
||||||
# We need to make sure the host header is set to the netloc of the
|
# We need to make sure the host header is set to the netloc of the
|
||||||
# server and that a user-agent is provided.
|
# server and that a user-agent is provided.
|
||||||
@@ -217,6 +157,101 @@ class MatrixFederationAgent:
|
|||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def postprocess_uri(
|
||||||
|
self, parsed_uri: "urllib.parse.ParseResultBytes"
|
||||||
|
) -> Generator["defer.Deferred", Any, "urllib.parse.ParseResultBytes"]:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class MatrixFederationAgent(BaseMatrixFederationAgent):
|
||||||
|
"""A federation agent that resolves server delegation by itself, using
|
||||||
|
SRV or .well-known lookups."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
reactor: ISynapseReactor,
|
||||||
|
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
||||||
|
user_agent: bytes,
|
||||||
|
ip_whitelist: IPSet,
|
||||||
|
ip_blacklist: IPSet,
|
||||||
|
_srv_resolver: Optional[SrvResolver] = None,
|
||||||
|
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||||
|
):
|
||||||
|
super().__init__(reactor, user_agent, ip_whitelist, ip_blacklist)
|
||||||
|
|
||||||
|
# proxy_reactor is not blacklisted
|
||||||
|
proxy_reactor = reactor
|
||||||
|
|
||||||
|
self._agent = Agent.usingEndpointFactory(
|
||||||
|
self._reactor,
|
||||||
|
MatrixHostnameEndpointFactory(
|
||||||
|
self._reactor,
|
||||||
|
proxy_reactor,
|
||||||
|
tls_client_options_factory,
|
||||||
|
_srv_resolver,
|
||||||
|
),
|
||||||
|
pool=self._pool,
|
||||||
|
)
|
||||||
|
|
||||||
|
if _well_known_resolver is None:
|
||||||
|
_well_known_resolver = WellKnownResolver(
|
||||||
|
reactor,
|
||||||
|
agent=BlacklistingAgentWrapper(
|
||||||
|
ProxyAgent(
|
||||||
|
self._reactor,
|
||||||
|
proxy_reactor,
|
||||||
|
pool=self._pool,
|
||||||
|
contextFactory=tls_client_options_factory,
|
||||||
|
use_proxy=True,
|
||||||
|
),
|
||||||
|
ip_blacklist=ip_blacklist,
|
||||||
|
),
|
||||||
|
user_agent=self.user_agent,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._well_known_resolver = _well_known_resolver
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def postprocess_uri(
|
||||||
|
self, parsed_uri: "urllib.parse.ParseResultBytes"
|
||||||
|
) -> Generator[defer.Deferred, Any, "urllib.parse.ParseResultBytes"]:
|
||||||
|
# There must be a valid hostname.
|
||||||
|
assert parsed_uri.hostname
|
||||||
|
|
||||||
|
# If this is a matrix:// URI check if the server has delegated matrix
|
||||||
|
# traffic using well-known delegation.
|
||||||
|
#
|
||||||
|
# We have to do this here and not in the endpoint as we need to rewrite
|
||||||
|
# the host header with the delegated server name.
|
||||||
|
delegated_server = None
|
||||||
|
if (
|
||||||
|
parsed_uri.scheme == b"matrix"
|
||||||
|
and not _is_ip_literal(parsed_uri.hostname)
|
||||||
|
and not parsed_uri.port
|
||||||
|
):
|
||||||
|
well_known_result = yield defer.ensureDeferred(
|
||||||
|
self._well_known_resolver.get_well_known(parsed_uri.hostname)
|
||||||
|
)
|
||||||
|
delegated_server = well_known_result.delegated_server
|
||||||
|
if delegated_server:
|
||||||
|
# Ok, the server has delegated matrix traffic to somewhere else, so
|
||||||
|
# lets rewrite the URL to replace the server with the delegated
|
||||||
|
# server name.
|
||||||
|
uri = urllib.parse.urlunparse(
|
||||||
|
(
|
||||||
|
parsed_uri.scheme,
|
||||||
|
delegated_server,
|
||||||
|
parsed_uri.path,
|
||||||
|
parsed_uri.params,
|
||||||
|
parsed_uri.query,
|
||||||
|
parsed_uri.fragment,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
parsed_uri = urllib.parse.urlparse(uri)
|
||||||
|
return parsed_uri
|
||||||
|
|
||||||
|
|
||||||
@implementer(IAgentEndpointFactory)
|
@implementer(IAgentEndpointFactory)
|
||||||
class MatrixHostnameEndpointFactory:
|
class MatrixHostnameEndpointFactory:
|
||||||
@@ -430,3 +465,79 @@ def _is_ip_literal(host: bytes) -> bool:
|
|||||||
return True
|
return True
|
||||||
except AddrFormatError:
|
except AddrFormatError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@implementer(IAgent)
|
||||||
|
class InternalProxyMatrixFederationAgentInner(_AgentBase):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
reactor: IReactorCore,
|
||||||
|
endpointFactory: IAgentEndpointFactory,
|
||||||
|
pool: HTTPConnectionPool,
|
||||||
|
):
|
||||||
|
_AgentBase.__init__(self, reactor, pool)
|
||||||
|
self._endpointFactory = endpointFactory
|
||||||
|
|
||||||
|
def request(
|
||||||
|
self,
|
||||||
|
method: bytes,
|
||||||
|
uri: bytes,
|
||||||
|
headers: Optional[Headers] = None,
|
||||||
|
bodyProducer: Optional[IBodyProducer] = None,
|
||||||
|
) -> "defer.Deferred[IResponse]":
|
||||||
|
# Cache *all* connections under the same key, since we are only
|
||||||
|
# connecting to a single destination, the proxy:
|
||||||
|
# TODO make second entry an endpoint
|
||||||
|
key = ("http-proxy", None)
|
||||||
|
parsed_uri = URI.fromBytes(uri)
|
||||||
|
return self._requestWithEndpoint(
|
||||||
|
key,
|
||||||
|
self._endpointFactory.endpointForURI(parsed_uri),
|
||||||
|
method,
|
||||||
|
parsed_uri,
|
||||||
|
headers,
|
||||||
|
bodyProducer,
|
||||||
|
uri,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class InternalProxyMatrixFederationAgent(BaseMatrixFederationAgent):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
reactor: ISynapseReactor,
|
||||||
|
user_agent: bytes,
|
||||||
|
ip_whitelist: IPSet,
|
||||||
|
ip_blacklist: IPSet,
|
||||||
|
proxy_workers: Mapping[str, InstanceLocationConfig],
|
||||||
|
):
|
||||||
|
super().__init__(reactor, user_agent, ip_whitelist, ip_blacklist)
|
||||||
|
self._agent = InternalProxyMatrixFederationAgentInner(
|
||||||
|
self._reactor,
|
||||||
|
InternalProxyMatrixHostnameEndpointFactory(reactor, proxy_workers),
|
||||||
|
pool=self._pool,
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def postprocess_uri(
|
||||||
|
self, parsed_uri: "urllib.parse.ParseResultBytes"
|
||||||
|
) -> Generator[defer.Deferred, Any, urllib.parse.ParseResultBytes]:
|
||||||
|
yield None
|
||||||
|
return parsed_uri
|
||||||
|
|
||||||
|
|
||||||
|
@implementer(IAgentEndpointFactory)
|
||||||
|
class InternalProxyMatrixHostnameEndpointFactory:
|
||||||
|
def __init__(
|
||||||
|
self, reactor: IReactorCore, proxy_workers: Mapping[str, InstanceLocationConfig]
|
||||||
|
):
|
||||||
|
self._reactor = reactor
|
||||||
|
self._proxy_workers = proxy_workers
|
||||||
|
|
||||||
|
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
|
||||||
|
# key = uri.toBytes()
|
||||||
|
# TODO chose instance based on key
|
||||||
|
proxy_location = next(iter(self._proxy_workers.values()))
|
||||||
|
# TODO does this need wrapping with wrapClientTLS?
|
||||||
|
logger.warning(f"DMR: make endpoint for {uri}, {self._proxy_workers=}")
|
||||||
|
rv = HostnameEndpoint(self._reactor, proxy_location.host, proxy_location.port)
|
||||||
|
return rv
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ from twisted.internet.interfaces import IReactorTime
|
|||||||
from twisted.internet.task import Cooperator
|
from twisted.internet.task import Cooperator
|
||||||
from twisted.web.client import ResponseFailed
|
from twisted.web.client import ResponseFailed
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web.iweb import IBodyProducer, IResponse
|
from twisted.web.iweb import IAgent, IBodyProducer, IResponse
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
import synapse.util.retryutils
|
import synapse.util.retryutils
|
||||||
@@ -70,7 +70,10 @@ from synapse.http.client import (
|
|||||||
encode_query_args,
|
encode_query_args,
|
||||||
read_body_with_max_size,
|
read_body_with_max_size,
|
||||||
)
|
)
|
||||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
from synapse.http.federation.matrix_federation_agent import (
|
||||||
|
InternalProxyMatrixFederationAgent,
|
||||||
|
MatrixFederationAgent,
|
||||||
|
)
|
||||||
from synapse.http.types import QueryParams
|
from synapse.http.types import QueryParams
|
||||||
from synapse.logging import opentracing
|
from synapse.logging import opentracing
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
@@ -348,13 +351,29 @@ class MatrixFederationHttpClient:
|
|||||||
if hs.config.server.user_agent_suffix:
|
if hs.config.server.user_agent_suffix:
|
||||||
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
|
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
|
||||||
|
|
||||||
federation_agent = MatrixFederationAgent(
|
if (
|
||||||
self.reactor,
|
hs.config.worker.outbound_fed_restricted_to
|
||||||
tls_client_options_factory,
|
and hs.get_instance_name()
|
||||||
user_agent.encode("ascii"),
|
not in hs.config.worker.outbound_fed_restricted_to
|
||||||
hs.config.server.federation_ip_range_whitelist,
|
):
|
||||||
hs.config.server.federation_ip_range_blacklist,
|
logger.warning("DMR: Using InternalProxyMatrixFederationAgent")
|
||||||
)
|
# We must
|
||||||
|
federation_agent: IAgent = InternalProxyMatrixFederationAgent(
|
||||||
|
self.reactor,
|
||||||
|
user_agent.encode("ascii"),
|
||||||
|
hs.config.server.federation_ip_range_whitelist,
|
||||||
|
hs.config.server.federation_ip_range_blacklist,
|
||||||
|
hs.config.worker.outbound_fed_restricted_to,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning("DMR: Using MatrixFederationAgent")
|
||||||
|
federation_agent = MatrixFederationAgent(
|
||||||
|
self.reactor,
|
||||||
|
tls_client_options_factory,
|
||||||
|
user_agent.encode("ascii"),
|
||||||
|
hs.config.server.federation_ip_range_whitelist,
|
||||||
|
hs.config.server.federation_ip_range_blacklist,
|
||||||
|
)
|
||||||
|
|
||||||
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
|
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
|
||||||
# blacklist via IP literals in server names
|
# blacklist via IP literals in server names
|
||||||
|
|||||||
22
synapse/http/outbound_federation_proxy.py
Normal file
22
synapse/http/outbound_federation_proxy.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
from twisted.web.http import HTTPFactory
|
||||||
|
from twisted.web.proxy import Proxy, ProxyClient, ProxyClientFactory, ProxyRequest
|
||||||
|
|
||||||
|
|
||||||
|
class FederationOutboundProxyClient(ProxyClient):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class FederationOutboundProxyClientFactory(ProxyClientFactory):
|
||||||
|
protocol = FederationOutboundProxyClient
|
||||||
|
|
||||||
|
|
||||||
|
class FederationOutboundProxyRequest(ProxyRequest):
|
||||||
|
protocols = {b"matrix": FederationOutboundProxyClientFactory}
|
||||||
|
ports = {b"matrix": 80}
|
||||||
|
|
||||||
|
|
||||||
|
class FederationOutboundProxy(Proxy):
|
||||||
|
requestFactory = FederationOutboundProxyRequest
|
||||||
|
|
||||||
|
|
||||||
|
OutboundFederationProxyFactory = HTTPFactory.forProtocol(FederationOutboundProxy)
|
||||||
Reference in New Issue
Block a user