Compare commits

...

27 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
b5a66b4c00 Move the worker type error checking upfront 2023-03-09 15:08:35 +00:00
Olivier Wilkinson (reivilibre)
aeefc5dcf2 Try to simplify the nginx generation code 2023-03-09 15:02:52 +00:00
Olivier Wilkinson (reivilibre)
65de783b13 Try to simplify parse_worker_types 2023-03-09 14:52:25 +00:00
Jason Little
97a0af000e Move final creation of a worker name to after the final check of why it can't exist. 2023-03-09 04:54:03 -06:00
Jason Little
e2fecab988 Try and apply review comments about sanitizing names.
1. Update a few comments for clarity.
2. Apply a replace() to remove whitespace in a requested worker name and use an underscore instead.
3. Stop-error if requesting a worker name, but forgetting to actually put one in.
4. If a worker name isn't requested, just use the set of worker types, sorted and concatenated with a hyphen. This avoids having to sanitize the worker name in this case.
5. Add regex to avoid nasty name surprises.
2023-03-08 20:49:49 -06:00
Jason Little
0cf5e80870 Merge branch 'develop' into comp-worker-shorthand 2023-03-08 06:22:27 -06:00
Jason Little
7c32bdb9de Remove condition based on number of ports
...so that even with a single port, a worker gets it's own upstream. Nginx says this can
lead to performance improvements if some features are enabled. Revisit this later.
2023-03-08 06:18:36 -06:00
Jason Little
204f61e022 Update how worker_type specific options are merged into shared_config, and update comments to match. 2023-03-08 05:24:42 -06:00
Jason Little
b8741d743e Update comments and a few other texty bits:
1. Update comments and edit(or create) docstrings and a single type hint that was incomplete(split_and_strip_string()).
2. Rename parse_worker_types_from_env to parse_worker_types as it's not coming from the environment.
2023-03-07 20:29:07 -06:00
Jason Little
98da503963 Merge branch 'develop' into comp-worker-shorthand 2023-03-04 20:53:12 -06:00
Jason Little
4442c0a1b6 Disambiguate a confusing mess of 'worker_roles', 'workers_roles', 'worker_type', and similar and that should have been 'worker_types' when in use by sets and a couple other places. 2023-03-04 20:52:48 -06:00
Jason Little
dd9773cb59 Change function name and fix docstring to be correct. 2023-03-04 19:29:22 -06:00
Jason Little
5dc3318299 Merge branch 'develop' into comp-worker-shorthand 2023-03-03 05:54:53 -06:00
Jason Little
47a965f720 Apply changes from review.
1. Factor processing of all worker types from the environment out and away from
    generating config files.
2. Fix up early templating of 'worker_config' to remove boilerplate.
3. Wrangle nginx upstream processing to accommodate odd combinations for overlapping
    workers. e.g. 'user_dir, user_dir+presence' should be able to handle the(in this case
    single) endpoint over either worker. I think this was promised in a previous commit,
    consider it delivered.
4. Update a bunch of comments, and adjust some pre-existing to fit inside the line
    length count of 88(aka, make the green squiggles go away)
5. Move processing of unusable characters for worker names to a later position so it
    will check the name produced if one was not requested as well.
2023-03-03 05:49:24 -06:00
Jason Little
3a7df064c9 Merge branch 'develop' into comp-worker-shorthand 2023-02-24 05:35:24 -06:00
Jason Little
aba5585156 Fix conditional and update error message and comments. 2023-02-18 22:53:09 -06:00
realtyem
f964a1a0aa Merge branch 'develop' into comp-worker-shorthand 2023-02-18 20:05:54 -06:00
Jason Little
8d1b37a6ef Apply changes from review:
1. Turn 'placeholder_name' into a magic constant.
2. Fix casting on a worker_type_list from 'list' to 'List[str]'.
3. In 'insert_worker_name_for_shared_config', use new WORKER_PLACEHOLDER_NAME constant and reduce complexity by using value from items() directly.
4. In 'is_sharding_allowed_for_worker_type', simplify the return value.
5. Adjust 'split_string_and_strip' to allow for using the 'maxsplit' kwarg on 'strip' and update comment.
6. Remove function 'increment_counter' and use 'defaultdict' instead to reduce complexity.
7. Remove function 'is_name_allowed_for_worker', as this can be accomplished with an 'if' statement and only checked for a worker base name in a
    dict. Adjust 'generate_worker_files' to accommadate.
8. Rename a few dict's to remove misnomer's and adjust comments for extra clarity.
9. In 'generate_worker_files':
  a. Use 'ValueError' exception handling to catch additional errors when parsing an integer and simplify counting of appended worker_type's.
  b. Add additional error cases to check(single and double quotes, spaces, and last character being an integer) for potential file name/nginx issues.
  c. Adjust various variable names to improve readability.
10. Update various comments throughout.
2023-02-18 20:01:34 -06:00
realtyem
beb2531a86 Merge branch 'develop' into comp-worker-shorthand 2023-02-14 05:08:34 -06:00
realtyem
497f01de11 Merge branch 'develop' into comp-worker-shorthand 2023-02-04 23:59:45 -06:00
realtyem
e00989f9d6 Merge branch 'develop' into comp-worker-shorthand 2023-01-26 05:58:48 -06:00
Jason Little
c858d45eb8 Changelog 2023-01-26 05:58:17 -06:00
Jason Little
bd6b73ee76 Caught a typo 2023-01-26 03:38:35 -06:00
Jason Little
02178ced59 Merge branch 'develop' into comp-worker-shorthand 2023-01-25 05:27:39 -06:00
Jason Little
8538cb6780 Install examples in start_for_complement.sh.
Instead of adding stream writer workers individually, utilize new functionality to make one worker with all included. Also utilitize new shorthand for event_persister's.
2023-01-25 05:23:32 -06:00
Jason Little
e29ef6f215 Allow for worker types to be merged and given a name.
In the Synapse-worker docker image, multiple worker types are defined but only allowed to be that one single definition.
This makes it so that any two(or more) worker types can be merged into a single worker. For example:
1. If you wish to have one worker with both client_reader and federation_sender functions enabled, you can set
SYNAPSE_WORKER_TYPES='client_reader+federation_sender'
2. If you wish to have all stream_writers(excepting event_persister) you can set
SYNAPSE_WORKER_TYPES='account_data+presence+receipts+to_device+typing'

Multiple types can be combined, but some error checking to dis-allow multiples of worker types that shouldn't be
enabled more than once has been added. For example:
SYNAPSE_WORKER_TYPES='background_worker+event_persister, background_worker+event_persister' will not work as
background_worker is only allowed to have a single worker for the entire deployment.

Giving a worker or a combination of workers a custom name is as simple as adding the name then an equal sign in front
of the given worker type(s). For example:
SYNAPSE_WORKER_TYPES='alice=federation_reader'
or
SYNAPSE_WORKER_TYPES='bob=federation_inbound+federation_sender'
or
SYNAPSE_WORKER_TYPES='charlie=event_persister:2, derek=media_repository + pusher + user_dir + appservice + event_creator'
2023-01-25 04:56:23 -06:00
Jason Little
4647d592cd Allow for a multipler to be added to a requested worker.
This allows for appending a ':' and a positive integer to a requested worker type for the SYNAPSE_WORKER_TYPES
environment variable(and likewise, the WORKER_TYPES) used in Complement. For example:

'event_persister:2' would be yield identical results of 'event_persister, event_persister'
2023-01-22 23:25:31 -06:00
3 changed files with 416 additions and 114 deletions

1
changelog.d/14921.misc Normal file
View File

@@ -0,0 +1 @@
Add additional functionality to declaring worker types when starting Complement in worker mode.

View File

@@ -51,8 +51,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
# -z True if the length of string is zero.
if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then
export SYNAPSE_WORKER_TYPES="\
event_persister, \
event_persister, \
event_persister:2, \
background_worker, \
frontend_proxy, \
event_creator, \
@@ -64,7 +63,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
synchrotron, \
client_reader, \
appservice, \
pusher"
pusher, \
stream_writers=account_data+presence+receipts+to_device+typing"
fi
log "Workers requested: $SYNAPSE_WORKER_TYPES"

View File

@@ -19,8 +19,15 @@
# The environment variables it reads are:
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
# below. Leave empty for no workers.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
# below. Leave empty for no workers. Add a ':' and a number at the end to
# multiply that worker. Append multiple worker types with '+' to merge the
# worker types into a single worker. Add a name and a '=' to the front of a
# worker type to give this instance a name in logs and nginx.
# Examples:
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -40,16 +47,33 @@
import os
import platform
import re
import subprocess
import sys
from collections import defaultdict
from itertools import chain
from pathlib import Path
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
from typing import (
Any,
Dict,
List,
Mapping,
MutableMapping,
NoReturn,
Optional,
Set,
SupportsIndex,
)
import yaml
from jinja2 import Environment, FileSystemLoader
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
# Watching /_matrix/federation needs a "federation" listener
@@ -70,11 +94,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
"shared_extra_conf": {
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"media_repository": {
"app": "synapse.app.media_repository",
"app": "synapse.app.generic_worker",
"listener_resources": ["media"],
"endpoint_patterns": [
"^/_matrix/media/",
@@ -87,7 +113,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
# The first configured media worker will run the media background jobs
"shared_extra_conf": {
"enable_media_repo": False,
"media_instance_running_background_jobs": "media_repository1",
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
},
"worker_extra_conf": "enable_media_repo: true",
},
@@ -95,7 +121,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
"shared_extra_conf": {
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
},
"worker_extra_conf": "",
},
"federation_sender": {
@@ -192,9 +220,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
# This worker cannot be sharded. Therefore there should only ever be one background
# worker, and it should be named background_worker1
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
# This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database.
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
"worker_extra_conf": "",
},
"event_creator": {
@@ -275,7 +303,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """
"""
NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
upstream {upstream_worker_base_name} {{
{body}
}}
"""
@@ -326,7 +354,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
def add_worker_roles_to_shared_config(
shared_config: dict,
worker_type: str,
worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
@@ -334,22 +362,36 @@ def add_worker_roles_to_shared_config(
append appropriate worker information to it for the current worker_type instance.
Args:
shared_config: The config dict that all worker instances share (after being converted to YAML)
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication streams
# The instance_map config field marks the workers that write to various replication
# streams
instance_map = shared_config.setdefault("instance_map", {})
# Worker-type specific sharding config
if worker_type == "pusher":
# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
]
# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)
elif worker_type == "federation_sender":
if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
elif worker_type == "event_persister":
if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
@@ -362,19 +404,154 @@ def add_worker_roles_to_shared_config(
"port": worker_port,
}
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
# Update the list of stream writers
# It's convenient that the name of the worker type is the same as the stream to write
shared_config.setdefault("stream_writers", {}).setdefault(
worker_type, []
).append(worker_name)
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs(
existing_dict: Dict[str, Any] | None,
to_be_merged_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""When given an existing dict of worker template configuration consisting with both
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
return new dict.
Args:
existing_dict: Either an existing worker template or a fresh blank one.
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
existing_dict.
Returns: The newly merged together dict values.
"""
new_dict: Dict[str, Any] = {}
if not existing_dict:
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
new_dict = to_be_merged_dict.copy()
else:
for i in to_be_merged_dict.keys():
if (i == "endpoint_patterns") or (i == "listener_resources"):
# merge the two lists, remove duplicates
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
elif i == "shared_extra_conf":
# merge dictionary's, the worker name will be replaced later
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
elif i == "worker_extra_conf":
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
else:
# Everything else should be identical, like "app", which only works
# because all apps are now generic_workers.
new_dict[i] = to_be_merged_dict[i]
return new_dict
def insert_worker_name_for_worker_config(
existing_dict: Dict[str, Any], worker_name: str
) -> Dict[str, Any]:
"""Insert a given worker name into the worker's configuration dict.
Args:
existing_dict: The worker_config dict that is imported into shared_config.
worker_name: The name of the worker to insert.
Returns: Copy of the dict with newly inserted worker name
"""
dict_to_edit = existing_dict.copy()
for k, v in dict_to_edit["shared_extra_conf"].items():
# Only proceed if it's the placeholder name string
if v == WORKER_PLACEHOLDER_NAME:
dict_to_edit["shared_extra_conf"][k] = worker_name
return dict_to_edit
def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
"""
Apply multiplier(if found) by returning a new expanded list with some basic error
checking.
Args:
worker_types: The unprocessed List of requested workers
Returns:
A new list with all requested workers expanded.
"""
# Checking performed:
# 1. if worker:2 or more is declared, it will create additional workers up to number
# 2. if worker:1, it will create a single copy of this worker as if no number was
# given
# 3. if worker:0 is declared, this worker will be ignored. This is to allow for
# scripting and automated expansion and is intended behaviour.
# 4. if worker:NaN or is a negative number, it will error and log it.
new_worker_types = []
for worker_type in worker_types:
if ":" in worker_type:
worker_type_components = split_and_strip_string(worker_type, ":", 1)
worker_count = 0
# Should only be 2 components, a type of worker(s) and an integer as a
# string. Cast the number as an int then it can be used as a counter.
try:
worker_count = int(worker_type_components[1])
except ValueError:
error(
f"Bad number in worker count for '{worker_type}': "
f"'{worker_type_components[1]}' is not an integer"
)
# As long as there are more than 0, we add one to the list to make below.
for _ in range(worker_count):
new_worker_types.append(worker_type_components[0])
else:
# If it's not a real worker_type, it will error out later.
new_worker_types.append(worker_type)
return new_worker_types
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"""Helper to check to make sure worker types that cannot have multiples do not.
Args:
worker_type: The type of worker to check against.
Returns: True if allowed, False if not
"""
return worker_type not in [
"background_worker",
"account_data",
"presence",
"receipts",
"typing",
"to_device",
]
def split_and_strip_string(
given_string: str, split_char: str, max_split: SupportsIndex = -1
) -> List[str]:
"""
Helper to split a string on split_char and strip whitespace from each end of each
element.
Args:
given_string: The string to split
split_char: The character to split the string on
max_split: kwarg for split() to limit how many times the split() happens
Returns:
A List of strings
"""
# Removes whitespace from ends of result strings before adding to list. Allow for
# overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]
def generate_base_homeserver_config() -> None:
@@ -389,29 +566,157 @@ def generate_base_homeserver_config() -> None:
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def parse_worker_types(
requested_worker_types: List[str],
) -> Dict[str, Set[str]]:
"""Read the desired list of requested workers and prepare the data for use in
generating worker config files while also checking for potential gotchas.
Args:
requested_worker_types: The list formed from the split environment variable
containing the unprocessed requests for workers.
Returns: A dict of worker names to set of worker types. Format:
{'worker_name':
{'worker_type', 'worker_type2'}
}
"""
# Checking performed:
# 1. If a requested name contains a space
# 2. If a requested name contains either kind of quote mark
# 3. If a requested name ends with a digit
# A counter of worker_base_name -> int. Used for determining the name for a given
# worker when generating its config file, as each worker's name is just
# worker_base_name followed by instance number
worker_base_name_counter: Dict[str, int] = defaultdict(int)
# Similar to above, but more finely grained. This is used to determine we don't have
# more than a single worker for cases where multiples would be bad(e.g. presence).
worker_type_shard_counter: Dict[str, int] = defaultdict(int)
# The final result of all this processing
dict_to_return: Dict[str, Set[str]] = {}
# Handle any multipliers requested for given workers.
multiple_processed_worker_types = apply_requested_multiplier_for_worker(
requested_worker_types
)
# Process each worker_type_string
# Examples of expected formats:
# - requested_name=type1+type2+type3
# - synchrotron
# - event_creator+event_persister
for worker_type_string in multiple_processed_worker_types:
# First, if a name is requested, use that — otherwise generate one.
worker_base_name: str = ""
if "=" in worker_type_string:
# Split on "=", remove extra whitespace from ends then make list
worker_type_split = split_and_strip_string(worker_type_string, "=")
if len(worker_type_split) > 2:
error(
"There should only be one '=' in the worker type string. "
f"Please fix: {worker_type_string}"
)
# Assign the name
worker_base_name = worker_type_split[0]
if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z0-9_+-]$", worker_base_name):
# Apply a fairly narrow regex to the worker names. Some characters
# aren't safe for use in file paths or nginx configurations.
# Don't allow to end with a number because we'll add a number
# ourselves in a moment.
error(
"Invalid worker name; please choose a name consisting of"
"alphanumeric letters, _ + -, but not ending with a digit: "
f"{worker_base_name!r}"
)
# Continue processing the remainder of the worker_type string
# with the name override removed.
worker_type_string = worker_type_split[1]
# Split the worker_type_string on "+", remove whitespace from ends then make
# the list a set so it's deduplicated.
worker_types_set: Set[str] = set(
split_and_strip_string(worker_type_string, "+")
)
if not worker_base_name:
# No base name specified: generate one deterministically from set of
# types
worker_base_name = "+".join(sorted(worker_types_set))
# At this point, we have:
# worker_base_name which is the name for the worker, without counter.
# worker_types_set which is the set of worker types for this worker.
# Make sure we don't allow sharding for a worker type that doesn't support it.
# Will error and stop if it is a problem, e.g. 'background_worker'.
for worker_type in worker_types_set:
# Verify this is a real defined worker type. If it's not, stop everything so
# it can be fixed.
if worker_type not in WORKERS_CONFIG:
error(
f"{worker_type} is an unknown worker type! Was found in "
f"'{worker_type_string}'. Please fix!"
)
if worker_type in worker_type_shard_counter:
if not is_sharding_allowed_for_worker_type(worker_type):
error(
f"There can be only a single worker with {worker_type} "
"type. Please recount and remove."
)
# Not in shard counter, must not have seen it yet, add it.
worker_type_shard_counter[worker_type] += 1
# Generate the number for the worker using incrementing counter
worker_base_name_counter[worker_base_name] += 1
worker_number = worker_base_name_counter[worker_base_name]
worker_name = f"{worker_base_name}{worker_number}"
if worker_number > 1:
# If this isn't the first worker, check that we don't have a confusing
# mixture of worker types with the same base name.
first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"]
if first_worker_with_base_name != worker_types_set:
error(
f"Can not use worker_name: '{worker_name}' for worker_type(s): "
f"{worker_types_set!r}. It is already in use by "
f"worker_type(s): {first_worker_with_base_name!r}"
)
dict_to_return[worker_name] = worker_types_set
return dict_to_return
def generate_worker_files(
environ: Mapping[str, str], config_path: str, data_dir: str
environ: Mapping[str, str],
config_path: str,
data_dir: str,
requested_worker_types: Dict[str, Set[str]],
) -> None:
"""Read the desired list of workers from environment variables and generate
shared homeserver, nginx and supervisord configs.
"""Read the desired workers(if any) that is passed in and generate shared
homeserver, nginx and supervisord configs.
Args:
environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
requested_worker_types: A Dict containing requested workers in the format of
{'worker_name1': {'worker_type', ...}}
"""
# Note that yaml cares about indentation, so care should be taken to insert lines
# into files at the correct indentation below.
# shared_config is the contents of a Synapse config file that will be shared amongst
# the main Synapse process as well as all workers.
# It is intended mainly for disabling functionality when certain workers are spun up,
# and adding a replication listener.
# First read the original config file and extract the listeners block. Then we'll add
# another listener for replication. Later we'll write out the result to the shared
# config file.
# First read the original config file and extract the listeners block. Then we'll
# add another listener for replication. Later we'll write out the result to the
# shared config file.
listeners = [
{
"port": 9093,
@@ -427,9 +732,9 @@ def generate_worker_files(
listeners += original_listeners
# The shared homeserver config. The contents of which will be inserted into the
# base shared worker jinja2 template.
#
# This config file will be passed to all workers, included Synapse's main process.
# base shared worker jinja2 template. This config file will be passed to all
# workers, included Synapse's main process. It is intended mainly for disabling
# functionality when certain workers are spun up, and adding a replication listener.
shared_config: Dict[str, Any] = {"listeners": listeners}
# List of dicts that describe workers.
@@ -437,31 +742,20 @@ def generate_worker_files(
# program blocks.
worker_descriptors: List[Dict[str, Any]] = []
# Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
# ports of each worker. For example:
# Upstreams for load-balancing purposes. This dict takes the form of the worker
# type to the ports of each worker. For example:
# {
# worker_type: {1234, 1235, ...}}
# }
# and will be used to construct 'upstream' nginx directives.
nginx_upstreams: Dict[str, Set[int]] = {}
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
# placed after the proxy_pass directive. The main benefit to representing this data as a
# dict over a str is that we can easily deduplicate endpoints across multiple instances
# of the same worker.
#
# An nginx site config that will be amended to depending on the workers that are
# spun up. To be placed in /etc/nginx/conf.d.
nginx_locations = {}
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
if not worker_types_env:
# No workers, just the main process
worker_types = []
else:
# Split type names by comma, ignoring whitespace.
worker_types = [x.strip() for x in worker_types_env.split(",")]
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
# will be placed after the proxy_pass directive. The main benefit to representing
# this data as a dict over a str is that we can easily deduplicate endpoints
# across multiple instances of the same worker. The final rendering will be combined
# with nginx_upstreams and placed in /etc/nginx/conf.d.
nginx_locations: Dict[str, str] = {}
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
@@ -469,66 +763,57 @@ def generate_worker_files(
# Start worker ports from this arbitrary port
worker_port = 18009
# A counter of worker_type -> int. Used for determining the name for a given
# worker type when generating its config file, as each worker's name is just
# worker_type + instance #
worker_type_counter: Dict[str, int] = {}
# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"]
# For each worker type specified by the user, create config values
for worker_type in worker_types:
worker_config = WORKERS_CONFIG.get(worker_type)
if worker_config:
worker_config = worker_config.copy()
else:
error(worker_type + " is an unknown worker type! Please fix!")
# Get the set of all worker types that we have configured
all_worker_types_in_use = set(chain(*requested_worker_types.values()))
# Map locations to upstreams (corresponding to worker types) in Nginx
# but only if we use the appropriate worker type
for worker_type in all_worker_types_in_use:
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count
# For each worker type specified by the user, create config values and write it's
# yaml config file
for worker_name, worker_types_set in requested_worker_types.items():
# The collected and processed data will live here.
worker_config: Dict[str, Any] = {}
# Merge all worker config templates for this worker into a single config
for worker_type in worker_types_set:
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
# Merge worker type template configuration data. It's a combination of lists
# and dicts, so use this helper.
worker_config = merge_worker_template_configs(
worker_config, copy_of_template_config
)
# Replace placeholder names in the config template with the actual worker name.
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
# Name workers by their type concatenated with an incrementing number
# e.g. federation_reader1
worker_name = worker_type + str(new_worker_count)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any worker-type specific options
shared_config.update(worker_config["shared_extra_conf"])
# Update the shared config with any worker_type specific options. The first of a
# given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config)
shared_config = worker_config["shared_extra_conf"]
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Check if more than one instance of this worker type has been specified
worker_type_total_count = worker_types.count(worker_type)
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_type, worker_name, worker_port
shared_config, worker_types_set, worker_name, worker_port
)
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
# Add nginx location blocks for this worker's endpoints (if any are defined)
for pattern in worker_config["endpoint_patterns"]:
# Determine whether we need to load-balance this worker
if worker_type_total_count > 1:
# Create or add to a load-balanced upstream for this worker
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
# Upstreams are named after the worker_type
upstream = "http://" + worker_type
else:
upstream = "http://localhost:%d" % (worker_port,)
# Note that this endpoint should proxy to this upstream
nginx_locations[pattern] = upstream
# Write out the worker's logging config file
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
# Then a worker config file
@@ -539,6 +824,10 @@ def generate_worker_files(
worker_log_config_filepath=log_config_filepath,
)
# Save this worker's port number to the correct nginx upstreams
for worker_type in worker_types_set:
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
worker_port += 1
# Build the nginx location config blocks
@@ -551,15 +840,14 @@ def generate_worker_files(
# Determine the load-balancing upstreams to configure
nginx_upstream_config = ""
for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
body = ""
for port in upstream_worker_ports:
body += " server localhost:%d;\n" % (port,)
body += f" server localhost:{port};\n"
# Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
upstream_worker_type=upstream_worker_type,
upstream_worker_base_name=upstream_worker_base_name,
body=body,
)
@@ -580,7 +868,7 @@ def generate_worker_files(
if reg_path.suffix.lower() in (".yaml", ".yml")
]
workers_in_use = len(worker_types) > 0
workers_in_use = len(requested_worker_types) > 0
# Shared homeserver config
convert(
@@ -678,13 +966,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_base_homeserver_config()
else:
log("Base homeserver config exists—not regenerating")
# This script may be run multiple times (mostly by Complement, see note at top of file).
# Don't re-configure workers in this instance.
# This script may be run multiple times (mostly by Complement, see note at top of
# file). Don't re-configure workers in this instance.
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
# Collect and validate worker_type requests
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
# Only process worker_types if they exist
if not worker_types_env:
# No workers, just the main process
worker_types = []
requested_worker_types: Dict[str, Any] = {}
else:
# Split type names by comma, ignoring whitespace.
worker_types = split_and_strip_string(worker_types_env, ",")
requested_worker_types = parse_worker_types(worker_types)
# Always regenerate all other config files
log("Generating worker config files")
generate_worker_files(environ, config_path, data_dir)
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
# Mark workers as being configured
with open(mark_filepath, "w") as f: