mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
50 Commits
madlittlem
...
rei/noddy_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd78f3d2ee | ||
|
|
2057fee653 | ||
|
|
09831a666e | ||
|
|
d29144cb31 | ||
|
|
29d0c44b16 | ||
|
|
9b39b11817 | ||
|
|
7d639bbecf | ||
|
|
075e8d3ca8 | ||
|
|
706bfbbb88 | ||
|
|
7d398d52b7 | ||
|
|
be472547bb | ||
|
|
1845e4ced6 | ||
|
|
b1f707a802 | ||
|
|
3c67fda366 | ||
|
|
e49ee3dd1c | ||
|
|
9d02bd5b1c | ||
|
|
e52725b2ac | ||
|
|
210f76f4b9 | ||
|
|
e85f8f77c2 | ||
|
|
6452e22e8f | ||
|
|
9d3e8d7fcd | ||
|
|
a3f8ec284a | ||
|
|
843e048673 | ||
|
|
1d101ce7a8 | ||
|
|
ac3918d8ac | ||
|
|
dab15bf7d7 | ||
|
|
d5f42f4655 | ||
|
|
9d12232f07 | ||
|
|
b21487300c | ||
|
|
1e73e7dde2 | ||
|
|
b2141a3640 | ||
|
|
5d2241e372 | ||
|
|
bd46391d31 | ||
|
|
e64a35cb00 | ||
|
|
58c8a2c6fd | ||
|
|
56bcba2b16 | ||
|
|
40a684e169 | ||
|
|
84880cbe94 | ||
|
|
50bb114032 | ||
|
|
e466404b4f | ||
|
|
4dd02db657 | ||
|
|
333f806d06 | ||
|
|
550d2a1e77 | ||
|
|
61349bd860 | ||
|
|
20bd26ecad | ||
|
|
e91cd9c722 | ||
|
|
18a331a18c | ||
|
|
d7e3925a14 | ||
|
|
9df46343b9 | ||
|
|
a035d84ce0 |
1
changelog.d/17751.misc
Normal file
1
changelog.d/17751.misc
Normal file
@@ -0,0 +1 @@
|
||||
Minor performance increase for large accounts using sliding sync.
|
||||
1
changelog.d/17765.misc
Normal file
1
changelog.d/17765.misc
Normal file
@@ -0,0 +1 @@
|
||||
Increase performance of the notifier when there are many syncing users.
|
||||
1
changelog.d/17766.misc
Normal file
1
changelog.d/17766.misc
Normal file
@@ -0,0 +1 @@
|
||||
Increase performance of the notifier when there are many syncing users.
|
||||
1
changelog.d/17767.misc
Normal file
1
changelog.d/17767.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix perf when streams don't change often.
|
||||
1
changelog.d/17768.misc
Normal file
1
changelog.d/17768.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve performance of sliding sync connections that do not ask for any rooms.
|
||||
@@ -6,8 +6,10 @@
|
||||
|
||||
server {
|
||||
# Listen on an unoccupied port number
|
||||
listen 8008;
|
||||
listen [::]:8008;
|
||||
{% for port in range(8008, 8008 + num_inbound_ports) %}
|
||||
listen {{ port }};
|
||||
listen [::]:{{ port }};
|
||||
{% endfor %}
|
||||
|
||||
{% if tls_cert_path is not none and tls_key_path is not none %}
|
||||
listen 8448 ssl;
|
||||
|
||||
@@ -11,24 +11,16 @@ files = /etc/supervisor/conf.d/*.conf
|
||||
[program:nginx]
|
||||
command=/usr/local/bin/prefix-log /usr/sbin/nginx -g "daemon off;"
|
||||
priority=500
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
username=www-data
|
||||
autorestart=true
|
||||
|
||||
[program:redis]
|
||||
{% if using_unix_sockets %}
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock
|
||||
command=/usr/local/bin/prefix-log /usr/bin/redis-server --unixsocket /tmp/redis.sock
|
||||
{% else %}
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
|
||||
command=/usr/local/bin/prefix-log /usr/bin/redis-server
|
||||
{% endif %}
|
||||
priority=1
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
username=redis
|
||||
autorestart=true
|
||||
|
||||
|
||||
@@ -1,52 +1,38 @@
|
||||
{% if use_forking_launcher %}
|
||||
[program:synapse_fork]
|
||||
command=/usr/local/bin/python -m synapse.app.complement_fork_starter
|
||||
command=/usr/local/bin/prefix-log /root/synapse/env/bin/python -m synapse.app.complement_fork_starter
|
||||
{{ main_config_path }}
|
||||
synapse.app.homeserver
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
--config-path=/root/synapse/config/workers/shared.yaml
|
||||
{%- for worker in workers %}
|
||||
-- {{ worker.app }}
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
--config-path=/conf/workers/{{ worker.name }}.yaml
|
||||
--config-path=/root/synapse/config/workers/shared.yaml
|
||||
--config-path=/root/synapse/config/workers/{{ worker.name }}.yaml
|
||||
{%- endfor %}
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
autorestart=unexpected
|
||||
exitcodes=0
|
||||
|
||||
{% else %}
|
||||
[program:synapse_main]
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver
|
||||
command=/usr/local/bin/prefix-log /root/synapse/env/bin/python -m synapse.app.homeserver
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
--config-path=/root/synapse/config/workers/shared.yaml
|
||||
priority=10
|
||||
# Log startup failures to supervisord's stdout/err
|
||||
# Regular synapse logs will still go in the configured data directory
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
autorestart=unexpected
|
||||
exitcodes=0
|
||||
|
||||
|
||||
{% for worker in workers %}
|
||||
[program:synapse_{{ worker.name }}]
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
|
||||
command=/usr/local/bin/prefix-log /root/synapse/env/bin/python -m {{ worker.app }}
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
--config-path=/conf/workers/{{ worker.name }}.yaml
|
||||
--config-path=/root/synapse/config/workers/shared.yaml
|
||||
--config-path=/root/synapse/config/workers/{{ worker.name }}.yaml
|
||||
autorestart=unexpected
|
||||
priority=500
|
||||
exitcodes=0
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
@@ -20,7 +20,26 @@ worker_listeners:
|
||||
- {{ resource }}
|
||||
{%- endfor %}
|
||||
{% endif %}
|
||||
- type: metrics
|
||||
port: {{ port + 1 }}
|
||||
resources:
|
||||
- metrics
|
||||
|
||||
worker_log_config: {{ worker_log_config_filepath }}
|
||||
|
||||
database:
|
||||
name: "psycopg2"
|
||||
args:
|
||||
user: "{{ POSTGRES_USER or "synapse" }}"
|
||||
password: "{{ POSTGRES_PASSWORD }}"
|
||||
database: "{{ POSTGRES_DB or "synapse" }}"
|
||||
{% if not SYNAPSE_USE_UNIX_SOCKET %}
|
||||
{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #}
|
||||
host: "{{ POSTGRES_HOST or "db" }}"
|
||||
port: "{{ POSTGRES_PORT or "5432" }}"
|
||||
{% endif %}
|
||||
cp_min: {{ POSTGRES_CP_MIN or 5 }}
|
||||
cp_max: {{ POSTGRES_CP_MAX or 10 }}
|
||||
application_name: "{{ name }}"
|
||||
|
||||
{{ worker_extra_conf }}
|
||||
|
||||
@@ -69,6 +69,7 @@ database:
|
||||
{% endif %}
|
||||
cp_min: {{ POSTGRES_CP_MIN or 5 }}
|
||||
cp_max: {{ POSTGRES_CP_MAX or 10 }}
|
||||
application_name: synapse
|
||||
{% else %}
|
||||
database:
|
||||
name: "sqlite3"
|
||||
@@ -92,7 +93,7 @@ federation_rc_concurrent: 3
|
||||
|
||||
## Files ##
|
||||
|
||||
media_store_path: "/data/media"
|
||||
media_store_path: "/root/synapse/data/media"
|
||||
max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "50M" }}"
|
||||
max_image_pixels: "32M"
|
||||
dynamic_thumbnails: false
|
||||
@@ -179,7 +180,7 @@ macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
|
||||
|
||||
## Signing Keys ##
|
||||
|
||||
signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
|
||||
signing_key_path: "/root/synapse/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
|
||||
old_signing_keys: {}
|
||||
key_refresh_interval: "1d" # 1 Day.
|
||||
|
||||
@@ -190,4 +191,4 @@ trusted_key_servers:
|
||||
"ed25519:auto": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
|
||||
|
||||
password_config:
|
||||
enabled: true
|
||||
enabled: false
|
||||
|
||||
@@ -24,6 +24,15 @@
|
||||
# nginx and supervisord configs depending on the workers requested.
|
||||
#
|
||||
# The environment variables it reads are:
|
||||
# * SYNAPSE_CONFIG_PATH: The path where the generated `homeserver.yaml` will
|
||||
# be stored.
|
||||
# * SYNAPSE_CONFIG_DIR: The directory where generated config will be stored.
|
||||
# If `SYNAPSE_CONFIG_PATH` is not set, it will default to
|
||||
# SYNAPSE_CONFIG_DIR/homeserver.yaml.
|
||||
# * SYNAPSE_DATA_DIR: Where the generated config will put persistent data
|
||||
# such as the database and media store.
|
||||
# * SYNAPSE_CONFIG_TEMPLATE_DIR: The directory containing jinja2 templates for
|
||||
# configuration that this script will generate config from.
|
||||
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
|
||||
# * SYNAPSE_REPORT_STATS: Whether to report stats.
|
||||
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
|
||||
@@ -35,6 +44,8 @@
|
||||
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
|
||||
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
|
||||
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
|
||||
# * SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK: Whether worker logs should be written to disk,
|
||||
# in addition to stdout.
|
||||
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
|
||||
# will be treated as Application Service registration files.
|
||||
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
|
||||
@@ -48,12 +59,15 @@
|
||||
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
|
||||
# regardless of the SYNAPSE_LOG_LEVEL setting.
|
||||
# * SYNAPSE_LOG_TESTING: if set, Synapse will log additional information useful
|
||||
# for testing.
|
||||
# for testing.
|
||||
# * SYNAPSE_USE_UNIX_SOCKET: if set, workers will communicate via unix socket
|
||||
# rather than TCP.
|
||||
#
|
||||
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
|
||||
# in the project's README), this script may be run multiple times, and functionality should
|
||||
# continue to work if so.
|
||||
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
@@ -82,6 +96,7 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
||||
MAIN_PROCESS_INSTANCE_NAME = "main"
|
||||
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
|
||||
MAIN_PROCESS_REPLICATION_PORT = 9093
|
||||
MAIN_PROCESS_METRICS_PORT = 9094
|
||||
# Obviously, these would only be used with the UNIX socket option
|
||||
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
|
||||
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
|
||||
@@ -162,6 +177,17 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
"nginx_upstream_extra_conf": "hash $http_authorization consistent;",
|
||||
},
|
||||
"sliding_sync": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/unstable/org.matrix.simplified_msc3575/sync$",
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
"nginx_upstream_extra_conf": "hash $http_authorization consistent;",
|
||||
},
|
||||
"client_reader": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
@@ -194,6 +220,8 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
|
||||
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
|
||||
"^/_matrix/client/(r0|v3|unstable)/notifications$",
|
||||
"^/_matrix/client/(r0|v3|unstable)/keys/query$"
|
||||
"^/_matrix/client/(r0|v3|unstable)/keys/changes$"
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
@@ -260,6 +288,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$",
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
@@ -327,6 +356,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
NGINX_LOCATION_CONFIG_BLOCK = """
|
||||
location ~* {endpoint} {{
|
||||
proxy_pass {upstream};
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header X-Forwarded-For $remote_addr;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
proxy_set_header Host $host;
|
||||
@@ -604,7 +634,13 @@ def generate_base_homeserver_config() -> None:
|
||||
# start.py already does this for us, so just call that.
|
||||
# note that this script is copied in in the official, monolith dockerfile
|
||||
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
|
||||
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
||||
|
||||
# This script makes use of the `SYNAPSE_CONFIG_DIR` environment variable to
|
||||
# determine where to place the generated homeserver config.
|
||||
|
||||
# We use "python" instead of supplying an absolute path here to allow those
|
||||
# running the script to use python from a virtual env.
|
||||
subprocess.run(["python", "/start.py", "migrate_config"], check=True)
|
||||
|
||||
|
||||
def parse_worker_types(
|
||||
@@ -730,11 +766,39 @@ def parse_worker_types(
|
||||
|
||||
return dict_to_return
|
||||
|
||||
def generate_metrics_scrape_config(
|
||||
metrics_ports: Dict[str, int]
|
||||
) -> None:
|
||||
out_scrape_configs = []
|
||||
|
||||
for worker_name, worker_metrics_port in metrics_ports.items():
|
||||
job = worker_name.rstrip("0123456789")
|
||||
if len(job) < len(worker_name):
|
||||
index = int(worker_name[len(job):])
|
||||
else:
|
||||
# worker name without a number: assume index=1
|
||||
index = 1
|
||||
|
||||
out_scrape_configs.append({
|
||||
"targets": [f"127.0.0.1:{worker_metrics_port}"],
|
||||
"labels": {
|
||||
"job": f"synapse-{job}",
|
||||
"index": f"{index}",
|
||||
# we want all instance names to be the same, even on different scrape addresses
|
||||
"instance": "synapse",
|
||||
}
|
||||
})
|
||||
|
||||
with open("/out_scrape.json", "w") as fout:
|
||||
json.dump(out_scrape_configs, fout)
|
||||
|
||||
|
||||
def generate_worker_files(
|
||||
environ: Mapping[str, str],
|
||||
config_dir: str,
|
||||
config_path: str,
|
||||
data_dir: str,
|
||||
template_dir: str,
|
||||
requested_worker_types: Dict[str, Set[str]],
|
||||
) -> None:
|
||||
"""Read the desired workers(if any) that is passed in and generate shared
|
||||
@@ -742,9 +806,13 @@ def generate_worker_files(
|
||||
|
||||
Args:
|
||||
environ: os.environ instance.
|
||||
config_path: The location of the generated Synapse main worker config file.
|
||||
data_dir: The location of the synapse data directory. Where log and
|
||||
user-facing config files live.
|
||||
config_dir: The location of the configuration directory, where generated
|
||||
worker config files are written to.
|
||||
config_path: The location of the base Synapse homeserver config file.
|
||||
data_dir: The location of the synapse data directory. Where logs will be
|
||||
stored (if `SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK` is set).
|
||||
template_dir: The location of the template directory. Where jinja2
|
||||
templates for config files live.
|
||||
requested_worker_types: A Dict containing requested workers in the format of
|
||||
{'worker_name1': {'worker_type', ...}}
|
||||
"""
|
||||
@@ -774,6 +842,13 @@ def generate_worker_files(
|
||||
"resources": [{"names": ["replication"]}],
|
||||
}
|
||||
]
|
||||
|
||||
listeners.append({
|
||||
"port": MAIN_PROCESS_METRICS_PORT,
|
||||
"type": "metrics",
|
||||
"bind_address": "::",
|
||||
})
|
||||
|
||||
with open(config_path) as file_stream:
|
||||
original_config = yaml.safe_load(file_stream)
|
||||
original_listeners = original_config.get("listeners")
|
||||
@@ -806,8 +881,14 @@ def generate_worker_files(
|
||||
# with nginx_upstreams and placed in /etc/nginx/conf.d.
|
||||
nginx_locations: Dict[str, str] = {}
|
||||
|
||||
# A map from worker name to which HTTP port is in use for its metrics.
|
||||
metrics_ports: Dict[str, int] = {
|
||||
MAIN_PROCESS_INSTANCE_NAME: MAIN_PROCESS_METRICS_PORT,
|
||||
}
|
||||
|
||||
# Create the worker configuration directory if it doesn't already exist
|
||||
os.makedirs("/conf/workers", exist_ok=True)
|
||||
workers_config_dir = os.path.join(config_dir, "workers")
|
||||
os.makedirs(workers_config_dir, exist_ok=True)
|
||||
|
||||
# Start worker ports from this arbitrary port
|
||||
worker_port = 18009
|
||||
@@ -854,9 +935,11 @@ def generate_worker_files(
|
||||
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
|
||||
|
||||
worker_config.update(
|
||||
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
||||
{"name": worker_name, "port": worker_port}
|
||||
)
|
||||
|
||||
metrics_ports[worker_name] = worker_port + 1
|
||||
|
||||
# Update the shared config with any worker_type specific options. The first of a
|
||||
# given worker_type needs to stay assigned and not be replaced.
|
||||
worker_config["shared_extra_conf"].update(shared_config)
|
||||
@@ -877,13 +960,21 @@ def generate_worker_files(
|
||||
worker_descriptors.append(worker_config)
|
||||
|
||||
# Write out the worker's logging config file
|
||||
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
|
||||
log_config_filepath = generate_worker_log_config(
|
||||
environ, worker_name, workers_config_dir, template_dir, data_dir
|
||||
)
|
||||
|
||||
extra_env: Dict[str, str] = {}
|
||||
for extra_env_var in ("POSTGRES_HOST", "POSTGRES_PORT", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_SOCKET", "POSTGRES_DB", "SYNAPSE_USE_UNIX_SOCKET", "POSTGRES_CP_MIN", "POSTGRES_CP_MAX"):
|
||||
if extra_env_var in os.environ:
|
||||
extra_env[extra_env_var] = os.environ[extra_env_var]
|
||||
|
||||
# Then a worker config file
|
||||
convert(
|
||||
"/conf/worker.yaml.j2",
|
||||
f"/conf/workers/{worker_name}.yaml",
|
||||
os.path.join(template_dir, "worker.yaml.j2"),
|
||||
os.path.join(workers_config_dir, f"{worker_name}.yaml"),
|
||||
**worker_config,
|
||||
**extra_env,
|
||||
worker_log_config_filepath=log_config_filepath,
|
||||
using_unix_sockets=using_unix_sockets,
|
||||
)
|
||||
@@ -892,7 +983,7 @@ def generate_worker_files(
|
||||
for worker_type in worker_types_set:
|
||||
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
|
||||
|
||||
worker_port += 1
|
||||
worker_port += 2
|
||||
|
||||
# Build the nginx location config blocks
|
||||
nginx_location_config = ""
|
||||
@@ -914,6 +1005,11 @@ def generate_worker_files(
|
||||
for port in upstream_worker_ports:
|
||||
body += f" server localhost:{port};\n"
|
||||
|
||||
# Append extra lines required for the specialised type of worker
|
||||
extra_upstream_conf: Optional[str] = WORKERS_CONFIG[upstream_worker_base_name].get("nginx_upstream_extra_conf")
|
||||
if extra_upstream_conf:
|
||||
body += "".join(f"\n {line}" for line in extra_upstream_conf.split("\n"))
|
||||
|
||||
# Add to the list of configured upstreams
|
||||
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
|
||||
upstream_worker_base_name=upstream_worker_base_name,
|
||||
@@ -923,7 +1019,9 @@ def generate_worker_files(
|
||||
# Finally, we'll write out the config files.
|
||||
|
||||
# log config for the master process
|
||||
master_log_config = generate_worker_log_config(environ, "master", data_dir)
|
||||
master_log_config = generate_worker_log_config(
|
||||
environ, "master", workers_config_dir, template_dir, data_dir
|
||||
)
|
||||
shared_config["log_config"] = master_log_config
|
||||
|
||||
# Find application service registrations
|
||||
@@ -954,8 +1052,8 @@ def generate_worker_files(
|
||||
|
||||
# Shared homeserver config
|
||||
convert(
|
||||
"/conf/shared.yaml.j2",
|
||||
"/conf/workers/shared.yaml",
|
||||
os.path.join(template_dir, "shared.yaml.j2"),
|
||||
os.path.join(workers_config_dir, "shared.yaml"),
|
||||
shared_worker_config=yaml.dump(shared_config),
|
||||
appservice_registrations=appservice_registrations,
|
||||
enable_redis=workers_in_use,
|
||||
@@ -965,19 +1063,20 @@ def generate_worker_files(
|
||||
|
||||
# Nginx config
|
||||
convert(
|
||||
"/conf/nginx.conf.j2",
|
||||
os.path.join(template_dir, "nginx.conf.j2"),
|
||||
"/etc/nginx/conf.d/matrix-synapse.conf",
|
||||
worker_locations=nginx_location_config,
|
||||
upstream_directives=nginx_upstream_config,
|
||||
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
|
||||
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
|
||||
using_unix_sockets=using_unix_sockets,
|
||||
num_inbound_ports=int(os.environ.get("NGINX_INBOUND_PORTS", "1"))
|
||||
)
|
||||
|
||||
# Supervisord config
|
||||
os.makedirs("/etc/supervisor", exist_ok=True)
|
||||
convert(
|
||||
"/conf/supervisord.conf.j2",
|
||||
os.path.join(template_dir, "supervisord.conf.j2"),
|
||||
"/etc/supervisor/supervisord.conf",
|
||||
main_config_path=config_path,
|
||||
enable_redis=workers_in_use,
|
||||
@@ -985,7 +1084,7 @@ def generate_worker_files(
|
||||
)
|
||||
|
||||
convert(
|
||||
"/conf/synapse.supervisord.conf.j2",
|
||||
os.path.join(template_dir, "synapse.supervisord.conf.j2"),
|
||||
"/etc/supervisor/conf.d/synapse.conf",
|
||||
workers=worker_descriptors,
|
||||
main_config_path=config_path,
|
||||
@@ -994,7 +1093,7 @@ def generate_worker_files(
|
||||
|
||||
# healthcheck config
|
||||
convert(
|
||||
"/conf/healthcheck.sh.j2",
|
||||
os.path.join(template_dir, "healthcheck.sh.j2"),
|
||||
"/healthcheck.sh",
|
||||
healthcheck_urls=healthcheck_urls,
|
||||
)
|
||||
@@ -1004,12 +1103,29 @@ def generate_worker_files(
|
||||
if not os.path.exists(log_dir):
|
||||
os.mkdir(log_dir)
|
||||
|
||||
# Add metrics scraping configurations
|
||||
generate_metrics_scrape_config(metrics_ports)
|
||||
|
||||
|
||||
def generate_worker_log_config(
|
||||
environ: Mapping[str, str], worker_name: str, data_dir: str
|
||||
environ: Mapping[str, str],
|
||||
worker_name: str,
|
||||
workers_config_dir: str,
|
||||
template_dir: str,
|
||||
data_dir: str,
|
||||
) -> str:
|
||||
"""Generate a log.config file for the given worker.
|
||||
|
||||
Args:
|
||||
environ: A mapping representing the environment variables that this script
|
||||
is running with.
|
||||
worker_name: The name of the worker. Used in generated file paths.
|
||||
workers_config_dir: The location of the worker configuration directory,
|
||||
where the generated worker log config will be saved.
|
||||
template_dir: The directory containing jinja2 template files.
|
||||
data_dir: The directory where log files will be written (if
|
||||
`SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK` is set).
|
||||
|
||||
Returns: the path to the generated file
|
||||
"""
|
||||
# Check whether we should write worker logs to disk, in addition to the console
|
||||
@@ -1024,9 +1140,9 @@ def generate_worker_log_config(
|
||||
extra_log_template_args["SYNAPSE_LOG_TESTING"] = environ.get("SYNAPSE_LOG_TESTING")
|
||||
|
||||
# Render and write the file
|
||||
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
|
||||
log_config_filepath = os.path.join(workers_config_dir, f"{worker_name}.log.config")
|
||||
convert(
|
||||
"/conf/log.config",
|
||||
os.path.join(template_dir, "log.config"),
|
||||
log_config_filepath,
|
||||
worker_name=worker_name,
|
||||
**extra_log_template_args,
|
||||
@@ -1049,6 +1165,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
|
||||
|
||||
# override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
|
||||
# this needs to be handled by a frontend proxy
|
||||
@@ -1060,9 +1177,10 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
generate_base_homeserver_config()
|
||||
else:
|
||||
log("Base homeserver config exists—not regenerating")
|
||||
|
||||
# This script may be run multiple times (mostly by Complement, see note at top of
|
||||
# file). Don't re-configure workers in this instance.
|
||||
mark_filepath = "/conf/workers_have_been_configured"
|
||||
mark_filepath = os.path.join(config_dir, "workers_have_been_configured")
|
||||
if not os.path.exists(mark_filepath):
|
||||
# Collect and validate worker_type requests
|
||||
# Read the desired worker configuration from the environment
|
||||
@@ -1079,7 +1197,9 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
|
||||
# Always regenerate all other config files
|
||||
log("Generating worker config files")
|
||||
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
|
||||
generate_worker_files(
|
||||
environ, config_dir, config_path, data_dir, template_dir, requested_worker_types
|
||||
)
|
||||
|
||||
# Mark workers as being configured
|
||||
with open(mark_filepath, "w") as f:
|
||||
@@ -1114,3 +1234,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv[1:], os.environ)
|
||||
|
||||
|
||||
@@ -42,6 +42,8 @@ def convert(src: str, dst: str, environ: Mapping[str, object]) -> None:
|
||||
|
||||
|
||||
def generate_config_from_template(
|
||||
data_dir: str,
|
||||
template_dir: str,
|
||||
config_dir: str,
|
||||
config_path: str,
|
||||
os_environ: Mapping[str, str],
|
||||
@@ -50,6 +52,9 @@ def generate_config_from_template(
|
||||
"""Generate a homeserver.yaml from environment variables
|
||||
|
||||
Args:
|
||||
data_dir: where persistent data is stored
|
||||
template_dir: The location of the template directory. Where jinja2
|
||||
templates for config files live.
|
||||
config_dir: where to put generated config files
|
||||
config_path: where to put the main config file
|
||||
os_environ: environment mapping
|
||||
@@ -70,9 +75,13 @@ def generate_config_from_template(
|
||||
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY",
|
||||
}
|
||||
|
||||
if not os.path.exists(data_dir):
|
||||
os.mkdir(data_dir)
|
||||
|
||||
synapse_server_name = environ["SYNAPSE_SERVER_NAME"]
|
||||
for name, secret in secrets.items():
|
||||
if secret not in environ:
|
||||
filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
|
||||
filename = os.path.join(data_dir, f"{synapse_server_name}.{name}.key")
|
||||
|
||||
# if the file already exists, load in the existing value; otherwise,
|
||||
# generate a new secret and write it to a file
|
||||
@@ -88,7 +97,7 @@ def generate_config_from_template(
|
||||
handle.write(value)
|
||||
environ[secret] = value
|
||||
|
||||
environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
|
||||
environ["SYNAPSE_APPSERVICES"] = glob.glob(os.path.join(data_dir, "appservices", "*.yaml"))
|
||||
if not os.path.exists(config_dir):
|
||||
os.mkdir(config_dir)
|
||||
|
||||
@@ -111,12 +120,12 @@ def generate_config_from_template(
|
||||
environ["SYNAPSE_LOG_CONFIG"] = config_dir + "/log.config"
|
||||
|
||||
log("Generating synapse config file " + config_path)
|
||||
convert("/conf/homeserver.yaml", config_path, environ)
|
||||
convert(os.path.join(template_dir, "homeserver.yaml"), config_path, environ)
|
||||
|
||||
log_config_file = environ["SYNAPSE_LOG_CONFIG"]
|
||||
log("Generating log config file " + log_config_file)
|
||||
convert(
|
||||
"/conf/log.config",
|
||||
os.path.join(template_dir, "log.config"),
|
||||
log_config_file,
|
||||
{**environ, "include_worker_name_in_log_line": False},
|
||||
)
|
||||
@@ -128,15 +137,15 @@ def generate_config_from_template(
|
||||
"synapse.app.homeserver",
|
||||
"--config-path",
|
||||
config_path,
|
||||
# tell synapse to put generated keys in /data rather than /compiled
|
||||
# tell synapse to put generated keys in the data directory rather than /compiled
|
||||
"--keys-directory",
|
||||
config_dir,
|
||||
"--generate-keys",
|
||||
]
|
||||
|
||||
if ownership is not None:
|
||||
log(f"Setting ownership on /data to {ownership}")
|
||||
subprocess.run(["chown", "-R", ownership, "/data"], check=True)
|
||||
log(f"Setting ownership on the data dir to {ownership}")
|
||||
subprocess.run(["chown", "-R", ownership, data_dir], check=True)
|
||||
args = ["gosu", ownership] + args
|
||||
|
||||
subprocess.run(args, check=True)
|
||||
@@ -159,12 +168,13 @@ def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) ->
|
||||
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
|
||||
|
||||
# create a suitable log config from our template
|
||||
log_config_file = "%s/%s.log.config" % (config_dir, server_name)
|
||||
if not os.path.exists(log_config_file):
|
||||
log("Creating log config %s" % (log_config_file,))
|
||||
convert("/conf/log.config", log_config_file, environ)
|
||||
convert(os.path.join(template_dir, "log.config"), log_config_file, environ)
|
||||
|
||||
# generate the main config file, and a signing key.
|
||||
args = [
|
||||
@@ -216,12 +226,14 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
|
||||
if mode == "migrate_config":
|
||||
# generate a config based on environment vars.
|
||||
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||
config_path = environ.get(
|
||||
"SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml"
|
||||
)
|
||||
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
|
||||
return generate_config_from_template(
|
||||
config_dir, config_path, environ, ownership
|
||||
data_dir, template_dir, config_dir, config_path, environ, ownership
|
||||
)
|
||||
|
||||
if mode != "run":
|
||||
|
||||
@@ -47,6 +47,7 @@ from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.types import Requester, UserID, create_requester
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.rest.admin.experimental_features import ExperimentalFeature
|
||||
@@ -120,6 +121,13 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
self._http_client = hs.get_proxied_http_client()
|
||||
self._hostname = hs.hostname
|
||||
self._admin_token = self._config.admin_token
|
||||
self._introspection_cache = ExpiringCache(
|
||||
"introspection_cache",
|
||||
self._clock,
|
||||
max_len=50000,
|
||||
expiry_ms=120_000,
|
||||
reset_expiry_on_get=False,
|
||||
)
|
||||
|
||||
self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata](
|
||||
self._load_metadata
|
||||
@@ -202,6 +210,9 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
Returns:
|
||||
The introspection response
|
||||
"""
|
||||
cached_response = self._introspection_cache.get(token, None)
|
||||
if cached_response is not None:
|
||||
return cached_response
|
||||
introspection_endpoint = await self._introspection_endpoint()
|
||||
raw_headers: Dict[str, str] = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
@@ -256,7 +267,11 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
"The introspection endpoint returned an invalid JSON response."
|
||||
)
|
||||
|
||||
return IntrospectionToken(**resp)
|
||||
out = IntrospectionToken(**resp)
|
||||
|
||||
# Cache the response for a short time
|
||||
self._introspection_cache[token] = out
|
||||
return out
|
||||
|
||||
async def is_server_admin(self, requester: Requester) -> bool:
|
||||
return "urn:synapse:admin:*" in requester.scope
|
||||
|
||||
@@ -341,6 +341,38 @@ class SlidingSyncHandler:
|
||||
extensions=extensions,
|
||||
)
|
||||
|
||||
if rooms:
|
||||
live_rooms = 0
|
||||
previously_rooms = 0
|
||||
never_rooms = 0
|
||||
initial_rooms = 0
|
||||
limited_rooms = 0
|
||||
for room_id, room in rooms.items():
|
||||
if room.initial:
|
||||
initial_rooms += 1
|
||||
|
||||
if room.limited:
|
||||
limited_rooms += 1
|
||||
|
||||
status = previous_connection_state.rooms.have_sent_room(room_id)
|
||||
if status.status == HaveSentRoomFlag.LIVE:
|
||||
live_rooms += 1
|
||||
elif status.status == HaveSentRoomFlag.PREVIOUSLY:
|
||||
previously_rooms += 1
|
||||
elif status.status == HaveSentRoomFlag.NEVER:
|
||||
never_rooms += 1
|
||||
else:
|
||||
assert_never(status.status)
|
||||
|
||||
logger.info(
|
||||
"Room results: live: %s, previously: %s, never: %s, initial: %s, limited: %s",
|
||||
live_rooms,
|
||||
previously_rooms,
|
||||
never_rooms,
|
||||
initial_rooms,
|
||||
limited_rooms,
|
||||
)
|
||||
|
||||
# Make it easy to find traces for syncs that aren't empty
|
||||
set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result))
|
||||
set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)
|
||||
|
||||
@@ -572,7 +572,8 @@ class SlidingSyncExtensionHandler:
|
||||
|
||||
# Now record which rooms are now up to data, and which rooms have
|
||||
# pending updates to send.
|
||||
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
|
||||
new_connection_state.account_data.record_sent_rooms(previously_rooms.keys())
|
||||
new_connection_state.account_data.record_sent_rooms(initial_rooms)
|
||||
missing_updates = (
|
||||
all_updates_since_the_from_token.keys() - relevant_room_ids
|
||||
)
|
||||
@@ -763,9 +764,10 @@ class SlidingSyncExtensionHandler:
|
||||
|
||||
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
|
||||
|
||||
# Now we update the per-connection state to track which receipts we have
|
||||
# and haven't sent down.
|
||||
new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
|
||||
# Update the per-connection state to track which rooms we have sent
|
||||
# all the receipts for.
|
||||
new_connection_state.receipts.record_sent_rooms(previously_rooms.keys())
|
||||
new_connection_state.receipts.record_sent_rooms(initial_rooms)
|
||||
|
||||
if from_token:
|
||||
# Now find the set of rooms that may have receipts that we're not sending
|
||||
|
||||
@@ -123,6 +123,19 @@ class SlidingSyncInterestedRooms:
|
||||
newly_left_rooms: AbstractSet[str]
|
||||
dm_room_ids: AbstractSet[str]
|
||||
|
||||
@staticmethod
|
||||
def empty() -> "SlidingSyncInterestedRooms":
|
||||
return SlidingSyncInterestedRooms(
|
||||
lists={},
|
||||
relevant_room_map={},
|
||||
relevant_rooms_to_send_map={},
|
||||
all_rooms=set(),
|
||||
room_membership_for_user_map={},
|
||||
newly_joined_rooms=set(),
|
||||
newly_left_rooms=set(),
|
||||
dm_room_ids=set(),
|
||||
)
|
||||
|
||||
|
||||
def filter_membership_for_sync(
|
||||
*,
|
||||
@@ -181,6 +194,14 @@ class SlidingSyncRoomLists:
|
||||
from_token: Optional[StreamToken],
|
||||
) -> SlidingSyncInterestedRooms:
|
||||
"""Fetch the set of rooms that match the request"""
|
||||
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
|
||||
has_room_subscriptions = (
|
||||
sync_config.room_subscriptions is not None
|
||||
and len(sync_config.room_subscriptions) > 0
|
||||
)
|
||||
|
||||
if not has_lists and not has_room_subscriptions:
|
||||
return SlidingSyncInterestedRooms.empty()
|
||||
|
||||
if await self.store.have_finished_sliding_sync_background_jobs():
|
||||
return await self._compute_interested_rooms_new_tables(
|
||||
|
||||
@@ -41,6 +41,7 @@ import attr
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import (
|
||||
ISynapseReactor,
|
||||
JsonDict,
|
||||
MultiWriterStreamToken,
|
||||
PersistedEventPosition,
|
||||
@@ -61,8 +63,11 @@ from synapse.types import (
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||
from synapse.util.async_helpers import (
|
||||
timeout_deferred,
|
||||
)
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.stringutils import shortstr
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
|
||||
return n
|
||||
|
||||
|
||||
class _NotificationListener:
|
||||
"""This represents a single client connection to the events stream.
|
||||
The events stream handler will have yielded to the deferred, so to
|
||||
notify the handler it is sufficient to resolve the deferred.
|
||||
"""
|
||||
|
||||
__slots__ = ["deferred"]
|
||||
|
||||
def __init__(self, deferred: "defer.Deferred"):
|
||||
self.deferred = deferred
|
||||
|
||||
|
||||
class _NotifierUserStream:
|
||||
"""This represents a user connected to the event stream.
|
||||
It tracks the most recent stream token for that user.
|
||||
@@ -113,59 +106,49 @@ class _NotifierUserStream:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
user_id: str,
|
||||
rooms: StrCollection,
|
||||
current_token: StreamToken,
|
||||
time_now_ms: int,
|
||||
):
|
||||
self.reactor = reactor
|
||||
self.user_id = user_id
|
||||
self.rooms = set(rooms)
|
||||
self.current_token = current_token
|
||||
|
||||
# The last token for which we should wake up any streams that have a
|
||||
# token that comes before it. This gets updated every time we get poked.
|
||||
# We start it at the current token since if we get any streams
|
||||
# that have a token from before we have no idea whether they should be
|
||||
# woken up or not, so lets just wake them up.
|
||||
self.last_notified_token = current_token
|
||||
self.current_token = current_token
|
||||
self.last_notified_ms = time_now_ms
|
||||
|
||||
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
|
||||
defer.Deferred()
|
||||
)
|
||||
# Set of listeners that we need to wake up when there has been a change.
|
||||
self.listeners: Set[Deferred[StreamToken]] = set()
|
||||
|
||||
def notify(
|
||||
def update_and_fetch_deferreds(
|
||||
self,
|
||||
stream_key: StreamKeyType,
|
||||
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
|
||||
current_token: StreamToken,
|
||||
time_now_ms: int,
|
||||
) -> None:
|
||||
"""Notify any listeners for this user of a new event from an
|
||||
event source.
|
||||
) -> Collection["Deferred[StreamToken]"]:
|
||||
"""Update the stream for this user because of a new event from an
|
||||
event source, and return the set of deferreds to wake up.
|
||||
|
||||
Args:
|
||||
stream_key: The stream the event came from.
|
||||
stream_id: The new id for the stream the event came from.
|
||||
current_token: The new current token.
|
||||
time_now_ms: The current time in milliseconds.
|
||||
|
||||
Returns:
|
||||
The set of deferreds that need to be called.
|
||||
"""
|
||||
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
|
||||
self.last_notified_token = self.current_token
|
||||
self.current_token = current_token
|
||||
self.last_notified_ms = time_now_ms
|
||||
notify_deferred = self.notify_deferred
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"notify": self.user_id,
|
||||
"stream": stream_key,
|
||||
"stream_id": stream_id,
|
||||
"listeners": self.count_listeners(),
|
||||
}
|
||||
)
|
||||
listeners = self.listeners
|
||||
self.listeners = set()
|
||||
|
||||
users_woken_by_stream_counter.labels(stream_key).inc()
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notify_deferred = ObservableDeferred(defer.Deferred())
|
||||
notify_deferred.callback(self.current_token)
|
||||
return listeners
|
||||
|
||||
def remove(self, notifier: "Notifier") -> None:
|
||||
"""Remove this listener from all the indexes in the Notifier
|
||||
@@ -179,9 +162,9 @@ class _NotifierUserStream:
|
||||
notifier.user_to_user_stream.pop(self.user_id)
|
||||
|
||||
def count_listeners(self) -> int:
|
||||
return len(self.notify_deferred.observers())
|
||||
return len(self.listeners)
|
||||
|
||||
def new_listener(self, token: StreamToken) -> _NotificationListener:
|
||||
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
|
||||
"""Returns a deferred that is resolved when there is a new token
|
||||
greater than the given token.
|
||||
|
||||
@@ -191,10 +174,17 @@ class _NotifierUserStream:
|
||||
"""
|
||||
# Immediately wake up stream if something has already since happened
|
||||
# since their last token.
|
||||
if self.last_notified_token != token:
|
||||
return _NotificationListener(defer.succeed(self.current_token))
|
||||
else:
|
||||
return _NotificationListener(self.notify_deferred.observe())
|
||||
if token != self.current_token:
|
||||
return defer.succeed(self.current_token)
|
||||
|
||||
# Create a new deferred and add it to the set of listeners. We add a
|
||||
# cancel handler to remove it from the set again, to handle timeouts.
|
||||
deferred: "Deferred[StreamToken]" = Deferred(
|
||||
canceller=lambda d: self.listeners.discard(d)
|
||||
)
|
||||
self.listeners.add(deferred)
|
||||
|
||||
return deferred
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -247,6 +237,7 @@ class Notifier:
|
||||
# List of callbacks to be notified when a lock is released
|
||||
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
|
||||
|
||||
self.reactor = hs.get_reactor()
|
||||
self.clock = hs.get_clock()
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
self._pusher_pool = hs.get_pusherpool()
|
||||
@@ -342,14 +333,25 @@ class Notifier:
|
||||
# Wake up all related user stream notifiers
|
||||
user_streams = self.room_to_user_streams.get(room_id, set())
|
||||
time_now_ms = self.clock.time_msec()
|
||||
current_token = self.event_sources.get_current_token()
|
||||
|
||||
listeners: List["Deferred[StreamToken]"] = []
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
user_stream.notify(
|
||||
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
|
||||
listeners.extend(
|
||||
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
|
||||
len(user_streams)
|
||||
)
|
||||
|
||||
# Poke the replication so that other workers also see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
self.notify_replication()
|
||||
@@ -519,12 +521,16 @@ class Notifier:
|
||||
rooms = rooms or []
|
||||
|
||||
with Measure(self.clock, "on_new_event"):
|
||||
user_streams = set()
|
||||
user_streams: Set[_NotifierUserStream] = set()
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"waking_up_explicit_users": len(users),
|
||||
"waking_up_explicit_rooms": len(rooms),
|
||||
"users": shortstr(users),
|
||||
"rooms": shortstr(rooms),
|
||||
"stream": stream_key,
|
||||
"stream_id": new_token,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -544,12 +550,27 @@ class Notifier:
|
||||
)
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
current_token = self.event_sources.get_current_token()
|
||||
listeners: List["Deferred[StreamToken]"] = []
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
user_stream.notify(stream_key, new_token, time_now_ms)
|
||||
listeners.extend(
|
||||
user_stream.update_and_fetch_deferreds(
|
||||
current_token, time_now_ms
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
# We resolve all these deferreds in one go so that we only need to
|
||||
# call `PreserveLoggingContext` once, as it has a bunch of overhead
|
||||
# (to calculate performance stats)
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
|
||||
self.notify_replication()
|
||||
|
||||
# Notify appservices.
|
||||
@@ -586,6 +607,7 @@ class Notifier:
|
||||
if room_ids is None:
|
||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
||||
user_stream = _NotifierUserStream(
|
||||
reactor=self.reactor,
|
||||
user_id=user_id,
|
||||
rooms=room_ids,
|
||||
current_token=current_token,
|
||||
@@ -608,8 +630,8 @@ class Notifier:
|
||||
# Now we wait for the _NotifierUserStream to be told there
|
||||
# is a new token.
|
||||
listener = user_stream.new_listener(prev_token)
|
||||
listener.deferred = timeout_deferred(
|
||||
listener.deferred,
|
||||
listener = timeout_deferred(
|
||||
listener,
|
||||
(end_time - now) / 1000.0,
|
||||
self.hs.get_reactor(),
|
||||
)
|
||||
@@ -622,7 +644,7 @@ class Notifier:
|
||||
)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
await listener.deferred
|
||||
await listener
|
||||
|
||||
log_kv(
|
||||
{
|
||||
|
||||
@@ -299,7 +299,7 @@ class SlidingSyncBody(RequestBodyModel):
|
||||
int(value)
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
"'extensions.to_device.since' is invalid (should look like an int)"
|
||||
f"'extensions.to_device.since' is invalid (found '{value}', expected to be able to parse an int)"
|
||||
)
|
||||
|
||||
return value
|
||||
|
||||
@@ -142,9 +142,9 @@ class StreamChangeCache:
|
||||
"""
|
||||
assert isinstance(stream_pos, int)
|
||||
|
||||
# _cache is not valid at or before the earliest known stream position, so
|
||||
# _cache is not valid before the earliest known stream position, so
|
||||
# return that the entity has changed.
|
||||
if stream_pos <= self._earliest_known_stream_pos:
|
||||
if stream_pos < self._earliest_known_stream_pos:
|
||||
self.metrics.inc_misses()
|
||||
return True
|
||||
|
||||
@@ -186,7 +186,7 @@ class StreamChangeCache:
|
||||
This will be all entities if the given stream position is at or earlier
|
||||
than the earliest known stream position.
|
||||
"""
|
||||
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
|
||||
if not self._cache or stream_pos < self._earliest_known_stream_pos:
|
||||
self.metrics.inc_misses()
|
||||
return set(entities)
|
||||
|
||||
@@ -238,9 +238,9 @@ class StreamChangeCache:
|
||||
"""
|
||||
assert isinstance(stream_pos, int)
|
||||
|
||||
# _cache is not valid at or before the earliest known stream position, so
|
||||
# _cache is not valid before the earliest known stream position, so
|
||||
# return that an entity has changed.
|
||||
if stream_pos <= self._earliest_known_stream_pos:
|
||||
if stream_pos < self._earliest_known_stream_pos:
|
||||
self.metrics.inc_misses()
|
||||
return True
|
||||
|
||||
@@ -270,9 +270,9 @@ class StreamChangeCache:
|
||||
"""
|
||||
assert isinstance(stream_pos, int)
|
||||
|
||||
# _cache is not valid at or before the earliest known stream position, so
|
||||
# _cache is not valid before the earliest known stream position, so
|
||||
# return None to mark that it is unknown if an entity has changed.
|
||||
if stream_pos <= self._earliest_known_stream_pos:
|
||||
if stream_pos < self._earliest_known_stream_pos:
|
||||
return AllEntitiesChangedResult(None)
|
||||
|
||||
changed_entities: List[EntityType] = []
|
||||
|
||||
@@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase):
|
||||
self.assertEqual(200, channel.code)
|
||||
next_batch = channel.json_body["next_batch"]
|
||||
|
||||
# This should time out! But it does not, because our stream token is
|
||||
# ahead, and therefore it's saying the typing (that we've actually
|
||||
# already seen) is new, since it's got a token above our new, now-reset
|
||||
# stream token.
|
||||
channel = self.make_request("GET", sync_url % (access_token, next_batch))
|
||||
self.assertEqual(200, channel.code)
|
||||
next_batch = channel.json_body["next_batch"]
|
||||
|
||||
# Clear the typing information, so that it doesn't think everything is
|
||||
# in the future.
|
||||
# in the future. This happens automatically when the typing stream
|
||||
# resets.
|
||||
typing._reset()
|
||||
|
||||
# Now it SHOULD fail as it never completes!
|
||||
# Nothing new, so we time out.
|
||||
with self.assertRaises(TimedOutException):
|
||||
self.make_request("GET", sync_url % (access_token, next_batch))
|
||||
|
||||
# Sync and start typing again.
|
||||
sync_channel = self.make_request(
|
||||
"GET", sync_url % (access_token, next_batch), await_result=False
|
||||
)
|
||||
self.assertFalse(sync_channel.is_finished())
|
||||
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
typing_url % (room, other_user_id, other_access_token),
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
)
|
||||
self.assertEqual(200, channel.code)
|
||||
|
||||
# Sync should now return.
|
||||
sync_channel.await_result()
|
||||
self.assertEqual(200, sync_channel.code)
|
||||
next_batch = sync_channel.json_body["next_batch"]
|
||||
|
||||
|
||||
class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
|
||||
servlets = [
|
||||
|
||||
@@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
# return True, whether it's a known entity or not.
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
|
||||
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 3))
|
||||
self.assertTrue(cache.has_entity_changed("not@here.website", 3))
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
|
||||
self.assertTrue(cache.has_entity_changed("not@here.website", 2))
|
||||
|
||||
def test_entity_has_changed_pops_off_start(self) -> None:
|
||||
"""
|
||||
@@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
self.assertTrue("user@foo.com" not in cache._entity_to_key)
|
||||
|
||||
self.assertEqual(
|
||||
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
|
||||
cache.get_all_entities_changed(2).entities,
|
||||
["bar@baz.net", "user@elsewhere.org"],
|
||||
)
|
||||
self.assertFalse(cache.get_all_entities_changed(2).hit)
|
||||
self.assertFalse(cache.get_all_entities_changed(1).hit)
|
||||
self.assertTrue(cache.get_all_entities_changed(2).hit)
|
||||
|
||||
# If we update an existing entity, it keeps the two existing entities
|
||||
cache.entity_has_changed("bar@baz.net", 5)
|
||||
@@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
cache.get_all_entities_changed(3).entities,
|
||||
["user@elsewhere.org", "bar@baz.net"],
|
||||
)
|
||||
self.assertFalse(cache.get_all_entities_changed(2).hit)
|
||||
self.assertFalse(cache.get_all_entities_changed(1).hit)
|
||||
self.assertTrue(cache.get_all_entities_changed(2).hit)
|
||||
|
||||
def test_get_all_entities_changed(self) -> None:
|
||||
"""
|
||||
@@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
self.assertEqual(
|
||||
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
|
||||
)
|
||||
self.assertFalse(cache.get_all_entities_changed(1).hit)
|
||||
self.assertFalse(cache.get_all_entities_changed(0).hit)
|
||||
self.assertTrue(cache.get_all_entities_changed(1).hit)
|
||||
|
||||
# ... later, things gest more updates
|
||||
cache.entity_has_changed("user@foo.com", 5)
|
||||
@@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
# With no entities, it returns True for the past, present, and False for
|
||||
# the future.
|
||||
self.assertTrue(cache.has_any_entity_changed(0))
|
||||
self.assertTrue(cache.has_any_entity_changed(1))
|
||||
self.assertFalse(cache.has_any_entity_changed(1))
|
||||
self.assertFalse(cache.has_any_entity_changed(2))
|
||||
|
||||
# We add an entity
|
||||
|
||||
Reference in New Issue
Block a user