Compare commits

...

50 Commits

Author SHA1 Message Date
Olivier 'reivilibre
cd78f3d2ee Add a noddy token introspection cache
For demonstration purposes only
2024-09-30 17:27:20 +01:00
Olivier 'reivilibre
2057fee653 Support nginx listening on multiple inbound ports 2024-09-28 16:34:39 +01:00
Olivier 'reivilibre
09831a666e Merge remote-tracking branch 'origin/erikj/ss_record_rooms' into anoa/configure_workers_updates 2024-09-27 18:14:34 +01:00
Olivier 'reivilibre
d29144cb31 Merge remote-tracking branch 'origin/erikj/fast_path_e2ee' into anoa/configure_workers_updates 2024-09-27 17:04:12 +01:00
Olivier 'reivilibre
29d0c44b16 Merge remote-tracking branch 'origin/erikj/ss_logs' into anoa/configure_workers_updates 2024-09-27 17:04:03 +01:00
Erik Johnston
9b39b11817 Fix lint 2024-09-27 16:47:10 +01:00
Erik Johnston
7d639bbecf Add some sliding sync logging 2024-09-27 16:46:13 +01:00
Erik Johnston
075e8d3ca8 Newsfile 2024-09-27 16:40:34 +01:00
Erik Johnston
706bfbbb88 Add fast path for sliding sync streams that only ask for extensions
Principally useful for EX e2ee sliding sync connections.
2024-09-27 16:39:58 +01:00
Olivier 'reivilibre
7d398d52b7 Use HTTP/1.1 for nginx to Synapse 2024-09-27 16:23:25 +01:00
Olivier 'reivilibre
be472547bb Merge remote-tracking branch 'origin/erikj/stream_change_cache' into anoa/configure_workers_updates 2024-09-27 15:29:02 +01:00
Erik Johnston
1845e4ced6 Newsfile 2024-09-27 15:28:21 +01:00
Erik Johnston
b1f707a802 Fix perf when streams don't change often
There is a bug with the `StreamChangeCache` where it would incorrectly
return that all entities had changed if asked for entities changed
*since* the earliest stream position.
2024-09-27 15:25:14 +01:00
Olivier 'reivilibre
3c67fda366 Merge remote-tracking branch 'origin/erikj/optimise_notifier2' into anoa/configure_workers_updates 2024-09-27 10:36:45 +01:00
Erik Johnston
e49ee3dd1c Newsfile 2024-09-27 10:36:13 +01:00
Olivier 'reivilibre
9d02bd5b1c Merge remote-tracking branch 'origin/erikj/optimise_notifier' into anoa/configure_workers_updates 2024-09-27 10:34:57 +01:00
Erik Johnston
e52725b2ac Optimise notifier mk2 2024-09-27 10:32:58 +01:00
Erik Johnston
210f76f4b9 Fix up doc string 2024-09-27 10:23:37 +01:00
Erik Johnston
e85f8f77c2 Fix typing tests 2024-09-27 10:19:42 +01:00
Erik Johnston
6452e22e8f Newsfile 2024-09-27 09:53:33 +01:00
Erik Johnston
9d3e8d7fcd Calculate new current token once
Turns out doing `.copy_and_advance` can be expensive
2024-09-27 09:13:43 +01:00
Erik Johnston
a3f8ec284a Move metrics out of hot path
We can update the counter once outside of the loop.
2024-09-27 09:12:54 +01:00
Olivier 'reivilibre
843e048673 Merge branch 'develop' into anoa/configure_workers_updates 2024-09-26 17:45:27 +01:00
Andrew Morgan
1d101ce7a8 log invalid SSS to_device.since fields 2024-09-25 16:12:15 +01:00
Erik Johnston
ac3918d8ac Newsfile 2024-09-24 14:00:18 +01:00
Erik Johnston
dab15bf7d7 Minor perf speed up for large acccounts on SSS
This works as instead of passing *all* rooms to `record_sent_rooms` we
only need to pass rooms that were previously not in the LIVE state.
2024-09-24 13:57:55 +01:00
Olivier 'reivilibre
d5f42f4655 Use sticky routing for synchrotrons 2024-09-24 13:21:06 +01:00
Andrew Morgan
9d12232f07 Merge branch 'develop' of github.com:element-hq/synapse into anoa/configure_workers_updates 2024-09-11 12:42:31 +01:00
Olivier 'reivilibre
b21487300c Add application_names to the postgres connections based on the worker names 2024-09-09 16:15:52 +01:00
Andrew Morgan
1e73e7dde2 Merge branch 'develop' of github.com:element-hq/synapse into anoa/configure_workers_updates 2024-09-09 12:12:04 +01:00
Andrew Morgan
b2141a3640 Merge branch 'develop' of github.com:element-hq/synapse into anoa/configure_workers_updates 2024-09-04 12:34:18 +01:00
Olivier 'reivilibre
5d2241e372 Configure sliding-sync workers with sticky load balancing 2024-09-02 16:27:02 +01:00
Olivier 'reivilibre
bd46391d31 Merge branch 'develop' into anoa/configure_workers_updates 2024-08-30 14:51:48 +01:00
Till Faelligen
e64a35cb00 Route key queries to the client_reader 2024-08-30 12:01:31 +02:00
Till Faelligen
58c8a2c6fd Merge branch 'develop' of github.com:element-hq/synapse into anoa/configure_workers_updates 2024-08-29 15:40:21 +02:00
Olivier 'reivilibre
56bcba2b16 Add metrics listener config for main process 2024-08-29 11:48:22 +01:00
Olivier 'reivilibre
40a684e169 Gen metrics scraping configurations, for use with file_sd_configs 2024-08-28 18:58:02 +01:00
Till Faelligen
84880cbe94 Route createRoom to event creators, add sliding_sync worker type 2024-08-21 11:56:08 +02:00
Till Faelligen
50bb114032 Add metrics 2024-08-19 14:53:24 +02:00
Till Faelligen
e466404b4f Fix paths, remove logging (for now), disable passwords 2024-08-15 14:22:50 +02:00
Till Faelligen
4dd02db657 Fix parameter ordering 2024-08-15 11:33:10 +02:00
Andrew Morgan
333f806d06 fixup: Add SYNAPSE_CONFIG_TEMPLATE_DIR env var 2024-08-14 19:00:57 +01:00
Andrew Morgan
550d2a1e77 start.py: Support SYNAPSE_CONFIG_TEMPLATE_DIR 2024-08-14 18:57:23 +01:00
Andrew Morgan
61349bd860 create data directory if it doesn't exist
ideally this would be done in `main`
2024-08-14 18:52:47 +01:00
Andrew Morgan
20bd26ecad Allow configuring data dir in start.py 2024-08-14 18:47:47 +01:00
Andrew Morgan
e91cd9c722 Allow running python from a venv 2024-08-14 17:27:36 +01:00
Andrew Morgan
18a331a18c Document additional environment variables 2024-08-14 17:27:34 +01:00
Andrew Morgan
d7e3925a14 Allow specifying where generated config files are stored
* Store generated worker config files in SYNAPSE_CONFIG_DIR, rather than hardcoding
  `/conf`.
* Move the `workers_have_been_configured` filepath to the config dir.
2024-08-14 17:27:30 +01:00
Andrew Morgan
9df46343b9 Remove unused config_path dict entry 2024-08-14 17:27:27 +01:00
Andrew Morgan
a035d84ce0 Add SYNAPSE_CONFIG_TEMPLATE_DIR env var to worker config script
This allows specifying a directory other than `/conf` where config templates should live. This allows this
script to be used outside of the context of a docker container where /conf is bound to a dir on the host.
2024-08-14 17:27:24 +01:00
21 changed files with 400 additions and 155 deletions

1
changelog.d/17751.misc Normal file
View File

@@ -0,0 +1 @@
Minor performance increase for large accounts using sliding sync.

1
changelog.d/17765.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17766.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17767.misc Normal file
View File

@@ -0,0 +1 @@
Fix perf when streams don't change often.

1
changelog.d/17768.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance of sliding sync connections that do not ask for any rooms.

View File

@@ -6,8 +6,10 @@
server {
# Listen on an unoccupied port number
listen 8008;
listen [::]:8008;
{% for port in range(8008, 8008 + num_inbound_ports) %}
listen {{ port }};
listen [::]:{{ port }};
{% endfor %}
{% if tls_cert_path is not none and tls_key_path is not none %}
listen 8448 ssl;

View File

@@ -11,24 +11,16 @@ files = /etc/supervisor/conf.d/*.conf
[program:nginx]
command=/usr/local/bin/prefix-log /usr/sbin/nginx -g "daemon off;"
priority=500
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
username=www-data
autorestart=true
[program:redis]
{% if using_unix_sockets %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock
command=/usr/local/bin/prefix-log /usr/bin/redis-server --unixsocket /tmp/redis.sock
{% else %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
command=/usr/local/bin/prefix-log /usr/bin/redis-server
{% endif %}
priority=1
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
username=redis
autorestart=true

View File

@@ -1,52 +1,38 @@
{% if use_forking_launcher %}
[program:synapse_fork]
command=/usr/local/bin/python -m synapse.app.complement_fork_starter
command=/usr/local/bin/prefix-log /root/synapse/env/bin/python -m synapse.app.complement_fork_starter
{{ main_config_path }}
synapse.app.homeserver
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/root/synapse/config/workers/shared.yaml
{%- for worker in workers %}
-- {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml
--config-path=/root/synapse/config/workers/shared.yaml
--config-path=/root/synapse/config/workers/{{ worker.name }}.yaml
{%- endfor %}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
autorestart=unexpected
exitcodes=0
{% else %}
[program:synapse_main]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver
command=/usr/local/bin/prefix-log /root/synapse/env/bin/python -m synapse.app.homeserver
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/root/synapse/config/workers/shared.yaml
priority=10
# Log startup failures to supervisord's stdout/err
# Regular synapse logs will still go in the configured data directory
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
autorestart=unexpected
exitcodes=0
{% for worker in workers %}
[program:synapse_{{ worker.name }}]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
command=/usr/local/bin/prefix-log /root/synapse/env/bin/python -m {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml
--config-path=/root/synapse/config/workers/shared.yaml
--config-path=/root/synapse/config/workers/{{ worker.name }}.yaml
autorestart=unexpected
priority=500
exitcodes=0
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
{% endfor %}
{% endif %}

View File

@@ -20,7 +20,26 @@ worker_listeners:
- {{ resource }}
{%- endfor %}
{% endif %}
- type: metrics
port: {{ port + 1 }}
resources:
- metrics
worker_log_config: {{ worker_log_config_filepath }}
database:
name: "psycopg2"
args:
user: "{{ POSTGRES_USER or "synapse" }}"
password: "{{ POSTGRES_PASSWORD }}"
database: "{{ POSTGRES_DB or "synapse" }}"
{% if not SYNAPSE_USE_UNIX_SOCKET %}
{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #}
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
{% endif %}
cp_min: {{ POSTGRES_CP_MIN or 5 }}
cp_max: {{ POSTGRES_CP_MAX or 10 }}
application_name: "{{ name }}"
{{ worker_extra_conf }}

View File

@@ -69,6 +69,7 @@ database:
{% endif %}
cp_min: {{ POSTGRES_CP_MIN or 5 }}
cp_max: {{ POSTGRES_CP_MAX or 10 }}
application_name: synapse
{% else %}
database:
name: "sqlite3"
@@ -92,7 +93,7 @@ federation_rc_concurrent: 3
## Files ##
media_store_path: "/data/media"
media_store_path: "/root/synapse/data/media"
max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "50M" }}"
max_image_pixels: "32M"
dynamic_thumbnails: false
@@ -179,7 +180,7 @@ macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
## Signing Keys ##
signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
signing_key_path: "/root/synapse/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
old_signing_keys: {}
key_refresh_interval: "1d" # 1 Day.
@@ -190,4 +191,4 @@ trusted_key_servers:
"ed25519:auto": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
password_config:
enabled: true
enabled: false

View File

@@ -24,6 +24,15 @@
# nginx and supervisord configs depending on the workers requested.
#
# The environment variables it reads are:
# * SYNAPSE_CONFIG_PATH: The path where the generated `homeserver.yaml` will
# be stored.
# * SYNAPSE_CONFIG_DIR: The directory where generated config will be stored.
# If `SYNAPSE_CONFIG_PATH` is not set, it will default to
# SYNAPSE_CONFIG_DIR/homeserver.yaml.
# * SYNAPSE_DATA_DIR: Where the generated config will put persistent data
# such as the database and media store.
# * SYNAPSE_CONFIG_TEMPLATE_DIR: The directory containing jinja2 templates for
# configuration that this script will generate config from.
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
@@ -35,6 +44,8 @@
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
# * SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK: Whether worker logs should be written to disk,
# in addition to stdout.
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -48,12 +59,15 @@
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
# regardless of the SYNAPSE_LOG_LEVEL setting.
# * SYNAPSE_LOG_TESTING: if set, Synapse will log additional information useful
# for testing.
# for testing.
# * SYNAPSE_USE_UNIX_SOCKET: if set, workers will communicate via unix socket
# rather than TCP.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.
import json
import os
import platform
import re
@@ -82,6 +96,7 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
MAIN_PROCESS_INSTANCE_NAME = "main"
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
MAIN_PROCESS_REPLICATION_PORT = 9093
MAIN_PROCESS_METRICS_PORT = 9094
# Obviously, these would only be used with the UNIX socket option
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
@@ -162,6 +177,17 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
],
"shared_extra_conf": {},
"worker_extra_conf": "",
"nginx_upstream_extra_conf": "hash $http_authorization consistent;",
},
"sliding_sync": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/unstable/org.matrix.simplified_msc3575/sync$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
"nginx_upstream_extra_conf": "hash $http_authorization consistent;",
},
"client_reader": {
"app": "synapse.app.generic_worker",
@@ -194,6 +220,8 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
"^/_matrix/client/(r0|v3|unstable)/keys/query$"
"^/_matrix/client/(r0|v3|unstable)/keys/changes$"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
@@ -260,6 +288,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
@@ -327,6 +356,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
NGINX_LOCATION_CONFIG_BLOCK = """
location ~* {endpoint} {{
proxy_pass {upstream};
proxy_http_version 1.1;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host;
@@ -604,7 +634,13 @@ def generate_base_homeserver_config() -> None:
# start.py already does this for us, so just call that.
# note that this script is copied in in the official, monolith dockerfile
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
# This script makes use of the `SYNAPSE_CONFIG_DIR` environment variable to
# determine where to place the generated homeserver config.
# We use "python" instead of supplying an absolute path here to allow those
# running the script to use python from a virtual env.
subprocess.run(["python", "/start.py", "migrate_config"], check=True)
def parse_worker_types(
@@ -730,11 +766,39 @@ def parse_worker_types(
return dict_to_return
def generate_metrics_scrape_config(
metrics_ports: Dict[str, int]
) -> None:
out_scrape_configs = []
for worker_name, worker_metrics_port in metrics_ports.items():
job = worker_name.rstrip("0123456789")
if len(job) < len(worker_name):
index = int(worker_name[len(job):])
else:
# worker name without a number: assume index=1
index = 1
out_scrape_configs.append({
"targets": [f"127.0.0.1:{worker_metrics_port}"],
"labels": {
"job": f"synapse-{job}",
"index": f"{index}",
# we want all instance names to be the same, even on different scrape addresses
"instance": "synapse",
}
})
with open("/out_scrape.json", "w") as fout:
json.dump(out_scrape_configs, fout)
def generate_worker_files(
environ: Mapping[str, str],
config_dir: str,
config_path: str,
data_dir: str,
template_dir: str,
requested_worker_types: Dict[str, Set[str]],
) -> None:
"""Read the desired workers(if any) that is passed in and generate shared
@@ -742,9 +806,13 @@ def generate_worker_files(
Args:
environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
config_dir: The location of the configuration directory, where generated
worker config files are written to.
config_path: The location of the base Synapse homeserver config file.
data_dir: The location of the synapse data directory. Where logs will be
stored (if `SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK` is set).
template_dir: The location of the template directory. Where jinja2
templates for config files live.
requested_worker_types: A Dict containing requested workers in the format of
{'worker_name1': {'worker_type', ...}}
"""
@@ -774,6 +842,13 @@ def generate_worker_files(
"resources": [{"names": ["replication"]}],
}
]
listeners.append({
"port": MAIN_PROCESS_METRICS_PORT,
"type": "metrics",
"bind_address": "::",
})
with open(config_path) as file_stream:
original_config = yaml.safe_load(file_stream)
original_listeners = original_config.get("listeners")
@@ -806,8 +881,14 @@ def generate_worker_files(
# with nginx_upstreams and placed in /etc/nginx/conf.d.
nginx_locations: Dict[str, str] = {}
# A map from worker name to which HTTP port is in use for its metrics.
metrics_ports: Dict[str, int] = {
MAIN_PROCESS_INSTANCE_NAME: MAIN_PROCESS_METRICS_PORT,
}
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
workers_config_dir = os.path.join(config_dir, "workers")
os.makedirs(workers_config_dir, exist_ok=True)
# Start worker ports from this arbitrary port
worker_port = 18009
@@ -854,9 +935,11 @@ def generate_worker_files(
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
{"name": worker_name, "port": worker_port}
)
metrics_ports[worker_name] = worker_port + 1
# Update the shared config with any worker_type specific options. The first of a
# given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config)
@@ -877,13 +960,21 @@ def generate_worker_files(
worker_descriptors.append(worker_config)
# Write out the worker's logging config file
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
log_config_filepath = generate_worker_log_config(
environ, worker_name, workers_config_dir, template_dir, data_dir
)
extra_env: Dict[str, str] = {}
for extra_env_var in ("POSTGRES_HOST", "POSTGRES_PORT", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_SOCKET", "POSTGRES_DB", "SYNAPSE_USE_UNIX_SOCKET", "POSTGRES_CP_MIN", "POSTGRES_CP_MAX"):
if extra_env_var in os.environ:
extra_env[extra_env_var] = os.environ[extra_env_var]
# Then a worker config file
convert(
"/conf/worker.yaml.j2",
f"/conf/workers/{worker_name}.yaml",
os.path.join(template_dir, "worker.yaml.j2"),
os.path.join(workers_config_dir, f"{worker_name}.yaml"),
**worker_config,
**extra_env,
worker_log_config_filepath=log_config_filepath,
using_unix_sockets=using_unix_sockets,
)
@@ -892,7 +983,7 @@ def generate_worker_files(
for worker_type in worker_types_set:
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
worker_port += 1
worker_port += 2
# Build the nginx location config blocks
nginx_location_config = ""
@@ -914,6 +1005,11 @@ def generate_worker_files(
for port in upstream_worker_ports:
body += f" server localhost:{port};\n"
# Append extra lines required for the specialised type of worker
extra_upstream_conf: Optional[str] = WORKERS_CONFIG[upstream_worker_base_name].get("nginx_upstream_extra_conf")
if extra_upstream_conf:
body += "".join(f"\n {line}" for line in extra_upstream_conf.split("\n"))
# Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
upstream_worker_base_name=upstream_worker_base_name,
@@ -923,7 +1019,9 @@ def generate_worker_files(
# Finally, we'll write out the config files.
# log config for the master process
master_log_config = generate_worker_log_config(environ, "master", data_dir)
master_log_config = generate_worker_log_config(
environ, "master", workers_config_dir, template_dir, data_dir
)
shared_config["log_config"] = master_log_config
# Find application service registrations
@@ -954,8 +1052,8 @@ def generate_worker_files(
# Shared homeserver config
convert(
"/conf/shared.yaml.j2",
"/conf/workers/shared.yaml",
os.path.join(template_dir, "shared.yaml.j2"),
os.path.join(workers_config_dir, "shared.yaml"),
shared_worker_config=yaml.dump(shared_config),
appservice_registrations=appservice_registrations,
enable_redis=workers_in_use,
@@ -965,19 +1063,20 @@ def generate_worker_files(
# Nginx config
convert(
"/conf/nginx.conf.j2",
os.path.join(template_dir, "nginx.conf.j2"),
"/etc/nginx/conf.d/matrix-synapse.conf",
worker_locations=nginx_location_config,
upstream_directives=nginx_upstream_config,
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
using_unix_sockets=using_unix_sockets,
num_inbound_ports=int(os.environ.get("NGINX_INBOUND_PORTS", "1"))
)
# Supervisord config
os.makedirs("/etc/supervisor", exist_ok=True)
convert(
"/conf/supervisord.conf.j2",
os.path.join(template_dir, "supervisord.conf.j2"),
"/etc/supervisor/supervisord.conf",
main_config_path=config_path,
enable_redis=workers_in_use,
@@ -985,7 +1084,7 @@ def generate_worker_files(
)
convert(
"/conf/synapse.supervisord.conf.j2",
os.path.join(template_dir, "synapse.supervisord.conf.j2"),
"/etc/supervisor/conf.d/synapse.conf",
workers=worker_descriptors,
main_config_path=config_path,
@@ -994,7 +1093,7 @@ def generate_worker_files(
# healthcheck config
convert(
"/conf/healthcheck.sh.j2",
os.path.join(template_dir, "healthcheck.sh.j2"),
"/healthcheck.sh",
healthcheck_urls=healthcheck_urls,
)
@@ -1004,12 +1103,29 @@ def generate_worker_files(
if not os.path.exists(log_dir):
os.mkdir(log_dir)
# Add metrics scraping configurations
generate_metrics_scrape_config(metrics_ports)
def generate_worker_log_config(
environ: Mapping[str, str], worker_name: str, data_dir: str
environ: Mapping[str, str],
worker_name: str,
workers_config_dir: str,
template_dir: str,
data_dir: str,
) -> str:
"""Generate a log.config file for the given worker.
Args:
environ: A mapping representing the environment variables that this script
is running with.
worker_name: The name of the worker. Used in generated file paths.
workers_config_dir: The location of the worker configuration directory,
where the generated worker log config will be saved.
template_dir: The directory containing jinja2 template files.
data_dir: The directory where log files will be written (if
`SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK` is set).
Returns: the path to the generated file
"""
# Check whether we should write worker logs to disk, in addition to the console
@@ -1024,9 +1140,9 @@ def generate_worker_log_config(
extra_log_template_args["SYNAPSE_LOG_TESTING"] = environ.get("SYNAPSE_LOG_TESTING")
# Render and write the file
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
log_config_filepath = os.path.join(workers_config_dir, f"{worker_name}.log.config")
convert(
"/conf/log.config",
os.path.join(template_dir, "log.config"),
log_config_filepath,
worker_name=worker_name,
**extra_log_template_args,
@@ -1049,6 +1165,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
# override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
# this needs to be handled by a frontend proxy
@@ -1060,9 +1177,10 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_base_homeserver_config()
else:
log("Base homeserver config exists—not regenerating")
# This script may be run multiple times (mostly by Complement, see note at top of
# file). Don't re-configure workers in this instance.
mark_filepath = "/conf/workers_have_been_configured"
mark_filepath = os.path.join(config_dir, "workers_have_been_configured")
if not os.path.exists(mark_filepath):
# Collect and validate worker_type requests
# Read the desired worker configuration from the environment
@@ -1079,7 +1197,9 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
# Always regenerate all other config files
log("Generating worker config files")
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
generate_worker_files(
environ, config_dir, config_path, data_dir, template_dir, requested_worker_types
)
# Mark workers as being configured
with open(mark_filepath, "w") as f:
@@ -1114,3 +1234,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if __name__ == "__main__":
main(sys.argv[1:], os.environ)

View File

@@ -42,6 +42,8 @@ def convert(src: str, dst: str, environ: Mapping[str, object]) -> None:
def generate_config_from_template(
data_dir: str,
template_dir: str,
config_dir: str,
config_path: str,
os_environ: Mapping[str, str],
@@ -50,6 +52,9 @@ def generate_config_from_template(
"""Generate a homeserver.yaml from environment variables
Args:
data_dir: where persistent data is stored
template_dir: The location of the template directory. Where jinja2
templates for config files live.
config_dir: where to put generated config files
config_path: where to put the main config file
os_environ: environment mapping
@@ -70,9 +75,13 @@ def generate_config_from_template(
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY",
}
if not os.path.exists(data_dir):
os.mkdir(data_dir)
synapse_server_name = environ["SYNAPSE_SERVER_NAME"]
for name, secret in secrets.items():
if secret not in environ:
filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
filename = os.path.join(data_dir, f"{synapse_server_name}.{name}.key")
# if the file already exists, load in the existing value; otherwise,
# generate a new secret and write it to a file
@@ -88,7 +97,7 @@ def generate_config_from_template(
handle.write(value)
environ[secret] = value
environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
environ["SYNAPSE_APPSERVICES"] = glob.glob(os.path.join(data_dir, "appservices", "*.yaml"))
if not os.path.exists(config_dir):
os.mkdir(config_dir)
@@ -111,12 +120,12 @@ def generate_config_from_template(
environ["SYNAPSE_LOG_CONFIG"] = config_dir + "/log.config"
log("Generating synapse config file " + config_path)
convert("/conf/homeserver.yaml", config_path, environ)
convert(os.path.join(template_dir, "homeserver.yaml"), config_path, environ)
log_config_file = environ["SYNAPSE_LOG_CONFIG"]
log("Generating log config file " + log_config_file)
convert(
"/conf/log.config",
os.path.join(template_dir, "log.config"),
log_config_file,
{**environ, "include_worker_name_in_log_line": False},
)
@@ -128,15 +137,15 @@ def generate_config_from_template(
"synapse.app.homeserver",
"--config-path",
config_path,
# tell synapse to put generated keys in /data rather than /compiled
# tell synapse to put generated keys in the data directory rather than /compiled
"--keys-directory",
config_dir,
"--generate-keys",
]
if ownership is not None:
log(f"Setting ownership on /data to {ownership}")
subprocess.run(["chown", "-R", ownership, "/data"], check=True)
log(f"Setting ownership on the data dir to {ownership}")
subprocess.run(["chown", "-R", ownership, data_dir], check=True)
args = ["gosu", ownership] + args
subprocess.run(args, check=True)
@@ -159,12 +168,13 @@ def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) ->
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
# create a suitable log config from our template
log_config_file = "%s/%s.log.config" % (config_dir, server_name)
if not os.path.exists(log_config_file):
log("Creating log config %s" % (log_config_file,))
convert("/conf/log.config", log_config_file, environ)
convert(os.path.join(template_dir, "log.config"), log_config_file, environ)
# generate the main config file, and a signing key.
args = [
@@ -216,12 +226,14 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if mode == "migrate_config":
# generate a config based on environment vars.
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get(
"SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml"
)
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
return generate_config_from_template(
config_dir, config_path, environ, ownership
data_dir, template_dir, config_dir, config_path, environ, ownership
)
if mode != "run":

View File

@@ -47,6 +47,7 @@ from synapse.logging.context import make_deferred_yieldable
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.expiringcache import ExpiringCache
if TYPE_CHECKING:
from synapse.rest.admin.experimental_features import ExperimentalFeature
@@ -120,6 +121,13 @@ class MSC3861DelegatedAuth(BaseAuth):
self._http_client = hs.get_proxied_http_client()
self._hostname = hs.hostname
self._admin_token = self._config.admin_token
self._introspection_cache = ExpiringCache(
"introspection_cache",
self._clock,
max_len=50000,
expiry_ms=120_000,
reset_expiry_on_get=False,
)
self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata](
self._load_metadata
@@ -202,6 +210,9 @@ class MSC3861DelegatedAuth(BaseAuth):
Returns:
The introspection response
"""
cached_response = self._introspection_cache.get(token, None)
if cached_response is not None:
return cached_response
introspection_endpoint = await self._introspection_endpoint()
raw_headers: Dict[str, str] = {
"Content-Type": "application/x-www-form-urlencoded",
@@ -256,7 +267,11 @@ class MSC3861DelegatedAuth(BaseAuth):
"The introspection endpoint returned an invalid JSON response."
)
return IntrospectionToken(**resp)
out = IntrospectionToken(**resp)
# Cache the response for a short time
self._introspection_cache[token] = out
return out
async def is_server_admin(self, requester: Requester) -> bool:
return "urn:synapse:admin:*" in requester.scope

View File

@@ -341,6 +341,38 @@ class SlidingSyncHandler:
extensions=extensions,
)
if rooms:
live_rooms = 0
previously_rooms = 0
never_rooms = 0
initial_rooms = 0
limited_rooms = 0
for room_id, room in rooms.items():
if room.initial:
initial_rooms += 1
if room.limited:
limited_rooms += 1
status = previous_connection_state.rooms.have_sent_room(room_id)
if status.status == HaveSentRoomFlag.LIVE:
live_rooms += 1
elif status.status == HaveSentRoomFlag.PREVIOUSLY:
previously_rooms += 1
elif status.status == HaveSentRoomFlag.NEVER:
never_rooms += 1
else:
assert_never(status.status)
logger.info(
"Room results: live: %s, previously: %s, never: %s, initial: %s, limited: %s",
live_rooms,
previously_rooms,
never_rooms,
initial_rooms,
limited_rooms,
)
# Make it easy to find traces for syncs that aren't empty
set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result))
set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)

View File

@@ -572,7 +572,8 @@ class SlidingSyncExtensionHandler:
# Now record which rooms are now up to data, and which rooms have
# pending updates to send.
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
new_connection_state.account_data.record_sent_rooms(previously_rooms.keys())
new_connection_state.account_data.record_sent_rooms(initial_rooms)
missing_updates = (
all_updates_since_the_from_token.keys() - relevant_room_ids
)
@@ -763,9 +764,10 @@ class SlidingSyncExtensionHandler:
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
# Now we update the per-connection state to track which receipts we have
# and haven't sent down.
new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
# Update the per-connection state to track which rooms we have sent
# all the receipts for.
new_connection_state.receipts.record_sent_rooms(previously_rooms.keys())
new_connection_state.receipts.record_sent_rooms(initial_rooms)
if from_token:
# Now find the set of rooms that may have receipts that we're not sending

View File

@@ -123,6 +123,19 @@ class SlidingSyncInterestedRooms:
newly_left_rooms: AbstractSet[str]
dm_room_ids: AbstractSet[str]
@staticmethod
def empty() -> "SlidingSyncInterestedRooms":
return SlidingSyncInterestedRooms(
lists={},
relevant_room_map={},
relevant_rooms_to_send_map={},
all_rooms=set(),
room_membership_for_user_map={},
newly_joined_rooms=set(),
newly_left_rooms=set(),
dm_room_ids=set(),
)
def filter_membership_for_sync(
*,
@@ -181,6 +194,14 @@ class SlidingSyncRoomLists:
from_token: Optional[StreamToken],
) -> SlidingSyncInterestedRooms:
"""Fetch the set of rooms that match the request"""
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
has_room_subscriptions = (
sync_config.room_subscriptions is not None
and len(sync_config.room_subscriptions) > 0
)
if not has_lists and not has_room_subscriptions:
return SlidingSyncInterestedRooms.empty()
if await self.store.have_finished_sliding_sync_background_jobs():
return await self._compute_interested_rooms_new_tables(

View File

@@ -41,6 +41,7 @@ import attr
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
@@ -61,8 +63,11 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n
class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
__slots__ = ["deferred"]
def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred
class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
@@ -113,59 +106,49 @@ class _NotifierUserStream:
def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()
def notify(
def update_and_fetch_deferreds(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.
Args:
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
current_token: The new current token.
time_now_ms: The current time in milliseconds.
Returns:
The set of deferreds that need to be called.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred
log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)
listeners = self.listeners
self.listeners = set()
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
return listeners
def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
@@ -179,9 +162,9 @@ class _NotifierUserStream:
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)
def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@@ -191,10 +174,17 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)
# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)
return deferred
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -247,6 +237,7 @@ class Notifier:
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -342,14 +333,25 @@ class Notifier:
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
@@ -519,12 +521,16 @@ class Notifier:
rooms = rooms or []
with Measure(self.clock, "on_new_event"):
user_streams = set()
user_streams: Set[_NotifierUserStream] = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
@@ -544,12 +550,27 @@ class Notifier:
)
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
# Notify appservices.
@@ -586,6 +607,7 @@ class Notifier:
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
@@ -608,8 +630,8 @@ class Notifier:
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
@@ -622,7 +644,7 @@ class Notifier:
)
with PreserveLoggingContext():
await listener.deferred
await listener
log_kv(
{

View File

@@ -299,7 +299,7 @@ class SlidingSyncBody(RequestBodyModel):
int(value)
except ValueError:
raise ValueError(
"'extensions.to_device.since' is invalid (should look like an int)"
f"'extensions.to_device.since' is invalid (found '{value}', expected to be able to parse an int)"
)
return value

View File

@@ -142,9 +142,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)
@@ -238,9 +238,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -270,9 +270,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []

View File

@@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Clear the typing information, so that it doesn't think everything is
# in the future.
# in the future. This happens automatically when the typing stream
# resets.
typing._reset()
# Now it SHOULD fail as it never completes!
# Nothing new, so we time out.
with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch))
# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())
channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)
# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]
class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [

View File

@@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3))
self.assertTrue(cache.has_entity_changed("not@here.website", 3))
self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
self.assertTrue(cache.has_entity_changed("not@here.website", 2))
def test_entity_has_changed_pops_off_start(self) -> None:
"""
@@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertTrue("user@foo.com" not in cache._entity_to_key)
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
cache.get_all_entities_changed(2).entities,
["bar@baz.net", "user@elsewhere.org"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
# If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5)
@@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
cache.get_all_entities_changed(3).entities,
["user@elsewhere.org", "bar@baz.net"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
def test_get_all_entities_changed(self) -> None:
"""
@@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)
# ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5)
@@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# With no entities, it returns True for the past, present, and False for
# the future.
self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity