Compare commits

...

5 Commits

Author SHA1 Message Date
Erik Johnston
b1e498d6e1 Handle prefill 2019-11-22 18:39:26 +00:00
Erik Johnston
9cca5ee743 Track cache hit ratios 2019-11-22 18:19:55 +00:00
Aaron Raimist
24cc31ee96 Fix link to user_dir_populate.sql in the user directory docs (#6388) 2019-11-21 17:38:14 +00:00
Andrew Morgan
3916e1b97a Clean up newline quote marks around the codebase (#6362) 2019-11-21 12:00:14 +00:00
Matthew Hodgson
9cc168e42e update macOS installation instructions 2019-11-20 18:44:45 +00:00
30 changed files with 154 additions and 51 deletions

View File

@@ -133,9 +133,9 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
sudo yum groupinstall "Development Tools"
```
#### Mac OS X
#### macOS
Installing prerequisites on Mac OS X:
Installing prerequisites on macOS:
```
xcode-select --install
@@ -144,6 +144,14 @@ sudo pip install virtualenv
brew install pkg-config libffi
```
On macOS Catalina (10.15) you may need to explicitly install OpenSSL
via brew and inform `pip` about it so that `psycopg2` builds:
```
brew install openssl@1.1
export LDFLAGS=-L/usr/local/Cellar/openssl\@1.1/1.1.1d/lib/
```
#### OpenSUSE
Installing prerequisites on openSUSE:

1
changelog.d/6362.misc Normal file
View File

@@ -0,0 +1 @@
Clean up some unnecessary quotation marks around the codebase.

1
changelog.d/6388.doc Normal file
View File

@@ -0,0 +1 @@
Fix link in the user directory documentation.

View File

@@ -7,7 +7,6 @@ who are present in a publicly viewable room present on the server.
The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL here
https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
and then restart synapse. This should then start a background task to
flush the current tables and regenerate the directory.

View File

@@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn):
sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()

View File

@@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient):
if not _is_valid_3pe_metadata(info):
logger.warning(
"query_3pe_protocol to %s did not return a" " valid result", uri
"query_3pe_protocol to %s did not return a valid result", uri
)
return None

View File

@@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename):
for regex_obj in as_info["namespaces"][ns]:
if not isinstance(regex_obj, dict):
raise ValueError(
"Expected namespace entry in %s to be an object," " but got %s",
"Expected namespace entry in %s to be an object, but got %s",
ns,
regex_obj,
)

View File

@@ -170,7 +170,7 @@ class _RoomDirectoryRule(object):
self.action = action
else:
raise ConfigError(
"%s rules can only have action of 'allow'" " or 'deny'" % (option_name,)
"%s rules can only have action of 'allow' or 'deny'" % (option_name,)
)
self._alias_matches_all = alias == "*"

View File

@@ -223,7 +223,7 @@ class ServerConfig(Config):
self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
except Exception as e:
raise ConfigError(
"Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
)
if self.public_baseurl is not None:
@@ -787,14 +787,14 @@ class ServerConfig(Config):
"--print-pidfile",
action="store_true",
default=None,
help="Print the path to the pidfile just" " before daemonizing",
help="Print the path to the pidfile just before daemonizing",
)
server_group.add_argument(
"--manhole",
metavar="PORT",
dest="manhole",
type=int,
help="Turn on the twisted telnet manhole" " service on the given port.",
help="Turn on the twisted telnet manhole service on the given port.",
)

View File

@@ -44,7 +44,7 @@ class TransactionActions(object):
response code and response body.
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.get_received_txn_response(transaction.transaction_id, origin)
@@ -56,7 +56,7 @@ class TransactionActions(object):
Deferred
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.set_received_txn_response(
transaction.transaction_id, origin, code, response

View File

@@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total",
"" "Total number of PDUs queued for sending across all destinations",
"Total number of PDUs queued for sending across all destinations",
)

View File

@@ -84,7 +84,7 @@ class TransactionManager(object):
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
"TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
@@ -103,7 +103,7 @@ class TransactionManager(object):
self._next_txn_id += 1
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
"TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,

View File

@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
"This application service has not reserved" " this kind of alias.",
"This application service has not reserved this kind of alias.",
errcode=Codes.EXCLUSIVE,
)
else:

View File

@@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False):
return {b"true": True, b"false": False}[args[name][0]]
except Exception:
message = (
"Boolean query parameter %r must be one of" " ['true', 'false']"
"Boolean query parameter %r must be one of ['true', 'false']"
) % (name,)
raise SynapseError(400, message)
else:

View File

@@ -246,7 +246,7 @@ class HttpPusher(object):
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warning(
"Giving up on a notification to user %s, " "pushkey %s",
"Giving up on a notification to user %s, pushkey %s",
self.user_id,
self.pushkey,
)
@@ -299,8 +299,7 @@ class HttpPusher(object):
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warning(
("Ignoring rejected pushkey %s because we" " didn't send it"),
pk,
("Ignoring rejected pushkey %s because we didn't send it"), pk,
)
else:
logger.info("Pushkey %s was rejected: removing", pk)

View File

@@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
MESSAGE_FROM_PERSON_IN_ROOM = (
"You have a message on %(app)s from %(person)s " "in the %(room)s room..."
"You have a message on %(app)s from %(person)s in the %(room)s room..."
)
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
"You have messages on %(app)s from %(person)s and others..."
)
INVITE_FROM_PERSON_TO_ROOM = (
"%(person)s has invited you to join the " "%(room)s room on %(app)s..."
"%(person)s has invited you to join the %(room)s room on %(app)s..."
)
INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."

View File

@@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource):
pattern = entry[attrib]
value = getattr(url_tuple, attrib)
logger.debug(
"Matching attrib '%s' with value '%s' against" " pattern '%s'",
"Matching attrib '%s' with value '%s' against pattern '%s'",
attrib,
value,
pattern,

View File

@@ -54,7 +54,7 @@ class ConsentServerNotices(object):
)
if "body" not in self._server_notice_content:
raise ConfigError(
"user_consent server_notice_consent must contain a 'body' " "key."
"user_consent server_notice_consent must contain a 'body' key."
)
self._consent_uri_builder = ConsentURIBuilder(hs.config)

View File

@@ -851,7 +851,7 @@ class SQLBaseStore(object):
allvalues.update(values)
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),

View File

@@ -380,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
sql = "SELECT device_id FROM devices WHERE user_id = ?"
txn.execute(sql, (user_id,))
message_json = json.dumps(messages_by_device["*"])
for row in txn:

View File

@@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
result.setdefault(user_id, {})[device_id] = None
# get signatures on the device
signature_sql = (
"SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s"
) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
" OR ".join("(" + q + ")" for q in signature_query_clauses)
)
txn.execute(signature_sql, signature_query_params)
rows = self.cursor_to_dict(txn)

View File

@@ -713,9 +713,7 @@ class EventsStore(
metadata_json = encode_json(event.internal_metadata.get_dict())
sql = (
"UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
)
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the
@@ -732,7 +730,7 @@ class EventsStore(
},
)
sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this
@@ -1479,7 +1477,7 @@ class EventsStore(
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall()

View File

@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
if filter_id_response is not None:
return filter_id_response[0]
sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_localpart,))
max_id = txn.fetchone()[0]
if max_id is None:

View File

@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
if len(media_ids) == 0:
return
sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
return
def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])

View File

@@ -377,9 +377,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
def f(txn):
sql = (
"SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
)
sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
txn.execute(sql, (user_id,))
return dict(txn)

View File

@@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def _get_max_topological_txn(self, txn, room_id):
txn.execute(
"SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
"SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
(room_id,),
)

View File

@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
)
def get_tag_content(txn, tag_ids):
sql = (
"SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
)
sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
results = []
for stream_id, user_id, room_id in tag_ids:
txn.execute(sql, (user_id, room_id))

View File

@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
# Mark as done.
cur.execute(
database_engine.convert_param_style(
"INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
"INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
),
(modname, name),
)

View File

@@ -88,9 +88,12 @@ class PaginationConfig(object):
raise SynapseError(400, "Invalid request.")
def __repr__(self):
return (
"PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
) % (self.from_token, self.to_token, self.direction, self.limit)
return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % (
self.from_token,
self.to_token,
self.direction,
self.limit,
)
def get_source_config(self, source_name):
keyname = "%s_key" % source_name

View File

@@ -16,13 +16,16 @@
import functools
import inspect
import logging
import math
import threading
from collections import deque
from typing import Any, Tuple, Union, cast
from weakref import WeakValueDictionary
from six import itervalues
from prometheus_client import Gauge
from bloom_filter import BloomFilter
from prometheus_client import Gauge, Histogram
from typing_extensions import Protocol
from twisted.internet import defer
@@ -38,6 +41,13 @@ from . import register_cache
logger = logging.getLogger(__name__)
cache_size_counts = Histogram(
"synapse_util_caches_miss_rate",
"",
["name"],
buckets=[0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2, "+Inf"],
)
CacheKey = Union[Tuple, Any]
@@ -87,6 +97,8 @@ class Cache(object):
"thread",
"metrics",
"_pending_deferred_cache",
"ratio_tracking",
"ratio_metric",
)
def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False):
@@ -104,6 +116,9 @@ class Cache(object):
self.name = name
self.keylen = keylen
self.thread = None
self.ratio_tracking = CacheHitRatioTracking(max_entries)
self.metrics = register_cache(
"cache",
name,
@@ -111,6 +126,8 @@ class Cache(object):
collect_callback=self._metrics_collection_callback,
)
self.ratio_metric = cache_size_counts.labels(name)
def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)
@@ -148,6 +165,11 @@ class Cache(object):
self.metrics.inc_hits()
return val.deferred
ratio = self.ratio_tracking.add(str(hash(key)))
if ratio is None:
ratio = 10
self.ratio_metric.observe(ratio)
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
if val is not _CacheSentinel:
self.metrics.inc_hits()
@@ -222,6 +244,9 @@ class Cache(object):
def prefill(self, key, value, callback=None):
callbacks = [callback] if callback else []
self.ratio_tracking.add(str(hash(key)))
self.cache.set(key, value, callbacks=callbacks)
def invalidate(self, key):
@@ -724,3 +749,76 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
num_args=num_args,
inlineCallbacks=inlineCallbacks,
)
class CacheHitRatioTracking(object):
def __init__(self, target_size, buckets=20, error_rate=0.01):
self._bucket_size = 2 * target_size / buckets
self._error_rate = error_rate
self._target_size = target_size
self._buckets = deque(
(
[
BloomFilter(
max_elements=self._target_size, error_rate=self._error_rate
),
0,
]
for _ in range(buckets)
),
maxlen=buckets,
)
def add(self, key):
found_in_bucket = None
for i, (bucket, _) in enumerate(self._buckets):
if key in bucket:
found_in_bucket = i
break
else:
self._buckets[i][1] += 1
self._buckets[0][0].add(key)
while self._buckets[-1][1] > 2 * self._target_size:
self._buckets.pop()
if self._buckets[0][1] > self._bucket_size:
self._buckets.appendleft(
[
BloomFilter(
max_elements=self._target_size, error_rate=self._error_rate
),
0,
]
)
if found_in_bucket is not None:
return self._buckets[found_in_bucket][1] / self._target_size
def _count_set_bits(n):
count = 0
while n:
count += n & 1
n >>= 1
return count
def _num_bits_set(bloom):
count = 0
for c in bloom.backend.array_:
if c == 0:
continue
count += _count_set_bits(c)
return count
def _get_estimate_bloom_size(bloom):
X = _num_bits_set(bloom)
m = bloom.num_bits_m
k = bloom.num_probes_k
return -(m / k) * math.log(1 - X / m)