Compare commits

..

8 Commits

Author SHA1 Message Date
Andrew Morgan
9d567cc26c change other 404 to a SynapseError 2019-08-06 13:08:35 +01:00
Andrew Morgan
dc361798e5 raise a SynapseError instead of tuple 2019-08-05 12:39:36 +01:00
Andrew Morgan
f536e7c6a7 Add M_NOT_FOUND errcode to response 2019-07-31 15:14:19 +01:00
Andrew Morgan
ad2c415965 lint 2019-07-31 14:28:37 +01:00
Andrew Morgan
2347e3c362 Hide event existence 2019-07-31 14:26:16 +01:00
Andrew Morgan
e906115472 lint 2019-07-31 14:18:35 +01:00
Andrew Morgan
ccddd9b9a8 Add changelog 2019-07-31 14:03:58 +01:00
Andrew Morgan
228c3c405f Return 404 instead of 403 when retrieving an event without perms 2019-07-31 13:59:12 +01:00
87 changed files with 658 additions and 1220 deletions

View File

@@ -3,6 +3,10 @@
Message history can be paginated
m.room.history_visibility == "world_readable" allows/forbids appropriately for Guest users
m.room.history_visibility == "world_readable" allows/forbids appropriately for Real users
Can re-join room if re-invited
/upgrade creates a new room

1
.gitignore vendored
View File

@@ -16,7 +16,6 @@ _trial_temp*/
/*.log
/*.log.config
/*.pid
/.python-version
/*.signing.key
/env/
/homeserver*.yaml

View File

@@ -1,102 +1,3 @@
Synapse 1.3.1 (2019-08-17)
==========================
Features
--------
- Drop hard dependency on `sdnotify` python package. ([\#5871](https://github.com/matrix-org/synapse/issues/5871))
Bugfixes
--------
- Fix startup issue (hang on ACME provisioning) due to ordering of Twisted reactor startup. Thanks to @chrismoos for supplying the fix. ([\#5867](https://github.com/matrix-org/synapse/issues/5867))
Synapse 1.3.0 (2019-08-15)
==========================
Bugfixes
--------
- Fix 500 Internal Server Error on `publicRooms` when the public room list was
cached. ([\#5851](https://github.com/matrix-org/synapse/issues/5851))
Synapse 1.3.0rc1 (2019-08-13)
==========================
Features
--------
- Use `M_USER_DEACTIVATED` instead of `M_UNKNOWN` for errcode when a deactivated user attempts to login. ([\#5686](https://github.com/matrix-org/synapse/issues/5686))
- Add sd_notify hooks to ease systemd integration and allows usage of Type=Notify. ([\#5732](https://github.com/matrix-org/synapse/issues/5732))
- Synapse will no longer serve any media repo admin endpoints when `enable_media_repo` is set to False in the configuration. If a media repo worker is used, the admin APIs relating to the media repo will be served from it instead. ([\#5754](https://github.com/matrix-org/synapse/issues/5754), [\#5848](https://github.com/matrix-org/synapse/issues/5848))
- Synapse can now be configured to not join remote rooms of a given "complexity" (currently, state events) over federation. This option can be used to prevent adverse performance on resource-constrained homeservers. ([\#5783](https://github.com/matrix-org/synapse/issues/5783))
- Allow defining HTML templates to serve the user on account renewal attempt when using the account validity feature. ([\#5807](https://github.com/matrix-org/synapse/issues/5807))
Bugfixes
--------
- Fix UISIs during homeserver outage. ([\#5693](https://github.com/matrix-org/synapse/issues/5693), [\#5789](https://github.com/matrix-org/synapse/issues/5789))
- Fix stack overflow in server key lookup code. ([\#5724](https://github.com/matrix-org/synapse/issues/5724))
- start.sh no longer uses deprecated cli option. ([\#5725](https://github.com/matrix-org/synapse/issues/5725))
- Log when we receive an event receipt from an unexpected origin. ([\#5743](https://github.com/matrix-org/synapse/issues/5743))
- Fix debian packaging scripts to correctly build sid packages. ([\#5775](https://github.com/matrix-org/synapse/issues/5775))
- Correctly handle redactions of redactions. ([\#5788](https://github.com/matrix-org/synapse/issues/5788))
- Return 404 instead of 403 when accessing /rooms/{roomId}/event/{eventId} for an event without the appropriate permissions. ([\#5798](https://github.com/matrix-org/synapse/issues/5798))
- Fix check that tombstone is a state event in push rules. ([\#5804](https://github.com/matrix-org/synapse/issues/5804))
- Fix error when trying to login as a deactivated user when using a worker to handle login. ([\#5806](https://github.com/matrix-org/synapse/issues/5806))
- Fix bug where user `/sync` stream could get wedged in rare circumstances. ([\#5825](https://github.com/matrix-org/synapse/issues/5825))
- The purge_remote_media.sh script was fixed. ([\#5839](https://github.com/matrix-org/synapse/issues/5839))
Deprecations and Removals
-------------------------
- Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration. ([\#5678](https://github.com/matrix-org/synapse/issues/5678), [\#5729](https://github.com/matrix-org/synapse/issues/5729))
- Remove non-functional 'expire_access_token' setting. ([\#5782](https://github.com/matrix-org/synapse/issues/5782))
Internal Changes
----------------
- Make Jaeger fully configurable. ([\#5694](https://github.com/matrix-org/synapse/issues/5694))
- Add precautionary measures to prevent future abuse of `window.opener` in default welcome page. ([\#5695](https://github.com/matrix-org/synapse/issues/5695))
- Reduce database IO usage by optimising queries for current membership. ([\#5706](https://github.com/matrix-org/synapse/issues/5706), [\#5738](https://github.com/matrix-org/synapse/issues/5738), [\#5746](https://github.com/matrix-org/synapse/issues/5746), [\#5752](https://github.com/matrix-org/synapse/issues/5752), [\#5770](https://github.com/matrix-org/synapse/issues/5770), [\#5774](https://github.com/matrix-org/synapse/issues/5774), [\#5792](https://github.com/matrix-org/synapse/issues/5792), [\#5793](https://github.com/matrix-org/synapse/issues/5793))
- Improve caching when fetching `get_filtered_current_state_ids`. ([\#5713](https://github.com/matrix-org/synapse/issues/5713))
- Don't accept opentracing data from clients. ([\#5715](https://github.com/matrix-org/synapse/issues/5715))
- Speed up PostgreSQL unit tests in CI. ([\#5717](https://github.com/matrix-org/synapse/issues/5717))
- Update the coding style document. ([\#5719](https://github.com/matrix-org/synapse/issues/5719))
- Improve database query performance when recording retry intervals for remote hosts. ([\#5720](https://github.com/matrix-org/synapse/issues/5720))
- Add a set of opentracing utils. ([\#5722](https://github.com/matrix-org/synapse/issues/5722))
- Cache result of get_version_string to reduce overhead of `/version` federation requests. ([\#5730](https://github.com/matrix-org/synapse/issues/5730))
- Return 'user_type' in admin API user endpoints results. ([\#5731](https://github.com/matrix-org/synapse/issues/5731))
- Don't package the sytest test blacklist file. ([\#5733](https://github.com/matrix-org/synapse/issues/5733))
- Replace uses of returnValue with plain return, as returnValue is not needed on Python 3. ([\#5736](https://github.com/matrix-org/synapse/issues/5736))
- Blacklist some flakey tests in worker mode. ([\#5740](https://github.com/matrix-org/synapse/issues/5740))
- Fix some error cases in the caching layer. ([\#5749](https://github.com/matrix-org/synapse/issues/5749))
- Add a prometheus metric for pending cache lookups. ([\#5750](https://github.com/matrix-org/synapse/issues/5750))
- Stop trying to fetch events with event_id=None. ([\#5753](https://github.com/matrix-org/synapse/issues/5753))
- Convert RedactionTestCase to modern test style. ([\#5768](https://github.com/matrix-org/synapse/issues/5768))
- Allow looping calls to be given arguments. ([\#5780](https://github.com/matrix-org/synapse/issues/5780))
- Set the logs emitted when checking typing and presence timeouts to DEBUG level, not INFO. ([\#5785](https://github.com/matrix-org/synapse/issues/5785))
- Remove DelayedCall debugging from the test suite, as it is no longer required in the vast majority of Synapse's tests. ([\#5787](https://github.com/matrix-org/synapse/issues/5787))
- Remove some spurious exceptions from the logs where we failed to talk to a remote server. ([\#5790](https://github.com/matrix-org/synapse/issues/5790))
- Improve performance when making `.well-known` requests by sharing the SSL options between requests. ([\#5794](https://github.com/matrix-org/synapse/issues/5794))
- Disable codecov GitHub comments on PRs. ([\#5796](https://github.com/matrix-org/synapse/issues/5796))
- Don't allow clients to send tombstone events that reference the room it's sent in. ([\#5801](https://github.com/matrix-org/synapse/issues/5801))
- Deny redactions of events sent in a different room. ([\#5802](https://github.com/matrix-org/synapse/issues/5802))
- Deny sending well known state types as non-state events. ([\#5805](https://github.com/matrix-org/synapse/issues/5805))
- Handle incorrectly encoded query params correctly by returning a 400. ([\#5808](https://github.com/matrix-org/synapse/issues/5808))
- Handle pusher being deleted during processing rather than logging an exception. ([\#5809](https://github.com/matrix-org/synapse/issues/5809))
- Return 502 not 500 when failing to reach any remote server. ([\#5810](https://github.com/matrix-org/synapse/issues/5810))
- Reduce global pauses in the events stream caused by expensive state resolution during persistence. ([\#5826](https://github.com/matrix-org/synapse/issues/5826))
- Add a lower bound to well-known lookup cache time to avoid repeated lookups. ([\#5836](https://github.com/matrix-org/synapse/issues/5836))
- Whitelist history visbility sytests in worker mode tests. ([\#5843](https://github.com/matrix-org/synapse/issues/5843))
Synapse 1.2.1 (2019-07-26)
==========================

View File

@@ -419,11 +419,12 @@ If Synapse is not configured with an SMTP server, password reset via email will
## Registering a user
The easiest way to create a new user is to do so from a client like [Riot](https://riot.im).
You will need at least one user on your server in order to use a Matrix
client. Users can be registered either via a Matrix client, or via a
commandline script.
Alternatively you can do so from the command line if you have installed via pip.
This can be done as follows:
To get started, it is easiest to use the command line to register new
users. This can be done as follows:
```
$ source ~/synapse/env/bin/activate

1
changelog.d/5678.removal Normal file
View File

@@ -0,0 +1 @@
Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration.

1
changelog.d/5693.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix UISIs during homeserver outage.

1
changelog.d/5694.misc Normal file
View File

@@ -0,0 +1 @@
Make Jaeger fully configurable.

1
changelog.d/5695.misc Normal file
View File

@@ -0,0 +1 @@
Add precautionary measures to prevent future abuse of `window.opener` in default welcome page.

1
changelog.d/5706.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5713.misc Normal file
View File

@@ -0,0 +1 @@
Improve caching when fetching `get_filtered_current_state_ids`.

1
changelog.d/5715.misc Normal file
View File

@@ -0,0 +1 @@
Don't accept opentracing data from clients.

1
changelog.d/5717.misc Normal file
View File

@@ -0,0 +1 @@
Speed up PostgreSQL unit tests in CI.

1
changelog.d/5719.misc Normal file
View File

@@ -0,0 +1 @@
Update the coding style document.

1
changelog.d/5720.misc Normal file
View File

@@ -0,0 +1 @@
Improve database query performance when recording retry intervals for remote hosts.

1
changelog.d/5722.misc Normal file
View File

@@ -0,0 +1 @@
Add a set of opentracing utils.

1
changelog.d/5724.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix stack overflow in server key lookup code.

1
changelog.d/5725.bugfix Normal file
View File

@@ -0,0 +1 @@
start.sh no longer uses deprecated cli option.

1
changelog.d/5729.removal Normal file
View File

@@ -0,0 +1 @@
Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration.

1
changelog.d/5730.misc Normal file
View File

@@ -0,0 +1 @@
Cache result of get_version_string to reduce overhead of `/version` federation requests.

1
changelog.d/5731.misc Normal file
View File

@@ -0,0 +1 @@
Return 'user_type' in admin API user endpoints results.

1
changelog.d/5732.feature Normal file
View File

@@ -0,0 +1 @@
Add sd_notify hooks to ease systemd integration and allows usage of Type=Notify.

1
changelog.d/5733.misc Normal file
View File

@@ -0,0 +1 @@
Don't package the sytest test blacklist file.

1
changelog.d/5736.misc Normal file
View File

@@ -0,0 +1 @@
Replace uses of returnValue with plain return, as returnValue is not needed on Python 3.

1
changelog.d/5738.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5740.misc Normal file
View File

@@ -0,0 +1 @@
Blacklist some flakey tests in worker mode.

1
changelog.d/5743.bugfix Normal file
View File

@@ -0,0 +1 @@
Log when we receive an event receipt from an unexpected origin.

1
changelog.d/5746.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5749.misc Normal file
View File

@@ -0,0 +1 @@
Fix some error cases in the caching layer.

1
changelog.d/5750.misc Normal file
View File

@@ -0,0 +1 @@
Add a prometheus metric for pending cache lookups.

1
changelog.d/5752.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5753.misc Normal file
View File

@@ -0,0 +1 @@
Stop trying to fetch events with event_id=None.

1
changelog.d/5768.misc Normal file
View File

@@ -0,0 +1 @@
Convert RedactionTestCase to modern test style.

1
changelog.d/5770.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5774.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5775.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix debian packaging scripts to correctly build sid packages.

1
changelog.d/5780.misc Normal file
View File

@@ -0,0 +1 @@
Allow looping calls to be given arguments.

1
changelog.d/5782.removal Normal file
View File

@@ -0,0 +1 @@
Remove non-functional 'expire_access_token' setting.

1
changelog.d/5783.feature Normal file
View File

@@ -0,0 +1 @@
Synapse can now be configured to not join remote rooms of a given "complexity" (currently, state events) over federation. This option can be used to prevent adverse performance on resource-constrained homeservers.

1
changelog.d/5785.misc Normal file
View File

@@ -0,0 +1 @@
Set the logs emitted when checking typing and presence timeouts to DEBUG level, not INFO.

1
changelog.d/5787.misc Normal file
View File

@@ -0,0 +1 @@
Remove DelayedCall debugging from the test suite, as it is no longer required in the vast majority of Synapse's tests.

1
changelog.d/5789.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix UISIs during homeserver outage.

1
changelog.d/5792.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5793.misc Normal file
View File

@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

1
changelog.d/5794.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance when making `.well-known` requests by sharing the SSL options between requests.

1
changelog.d/5796.misc Normal file
View File

@@ -0,0 +1 @@
Disable codecov GitHub comments on PRs.

1
changelog.d/5798.bugfix Normal file
View File

@@ -0,0 +1 @@
Return 404 instead of 403 when accessing /rooms/{roomId}/event/{eventId} for an event without the appropriate permissions.

View File

@@ -51,4 +51,4 @@ TOKEN=$(sql "SELECT token FROM access_tokens WHERE user_id='$ADMIN' ORDER BY id
# finally start pruning media:
###############################################################################
set -x # for debugging the generated string
curl --header "Authorization: Bearer $TOKEN" -X POST "$API_URL/admin/purge_media_cache/?before_ts=$UNIX_TIMESTAMP"
curl --header "Authorization: Bearer $TOKEN" -v POST "$API_URL/admin/purge_media_cache/?before_ts=$UNIX_TIMESTAMP"

16
debian/changelog vendored
View File

@@ -1,18 +1,8 @@
matrix-synapse-py3 (1.3.1) stable; urgency=medium
matrix-synapse-py3 (1.2.1) stable; urgency=medium
* New synapse release 1.3.1.
* New synapse release 1.2.1.
-- Synapse Packaging team <packages@matrix.org> Sat, 17 Aug 2019 09:15:49 +0100
matrix-synapse-py3 (1.3.0) stable; urgency=medium
[ Andrew Morgan ]
* Remove libsqlite3-dev from required build dependencies.
[ Synapse Packaging team ]
* New synapse release 1.3.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 15 Aug 2019 12:04:23 +0100
-- Synapse Packaging team <packages@matrix.org> Fri, 26 Jul 2019 11:32:47 +0100
matrix-synapse-py3 (1.2.0) stable; urgency=medium

1
debian/control vendored
View File

@@ -15,6 +15,7 @@ Build-Depends:
python3-setuptools,
python3-pip,
python3-venv,
libsqlite3-dev,
tar,
Standards-Version: 3.9.8
Homepage: https://github.com/matrix-org/synapse

View File

@@ -565,13 +565,6 @@ log_config: "CONFDIR/SERVERNAME.log.config"
## Media Store ##
# Enable the media store service in the Synapse master. Uncomment the
# following if you are using a separate media store worker.
#
#enable_media_repo: false
# Directory where uploaded images and attachments are stored.
#
media_store_path: "DATADIR/media_store"
@@ -809,16 +802,6 @@ uploads_path: "DATADIR/uploads"
# period: 6w
# renew_at: 1w
# renew_email_subject: "Renew your %(app)s account"
# # Directory in which Synapse will try to find the HTML files to serve to the
# # user when trying to renew an account. Optional, defaults to
# # synapse/res/templates.
# template_dir: "res/templates"
# # HTML to be displayed to the user after they successfully renewed their
# # account. Optional.
# account_renewed_html_path: "account_renewed.html"
# # HTML to be displayed when the user tries to renew an account with an invalid
# # renewal token. Optional.
# invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#

View File

@@ -206,13 +206,6 @@ Handles the media repository. It can handle all endpoints starting with::
/_matrix/media/
And the following regular expressions matching media-specific administration
APIs::
^/_synapse/admin/v1/purge_media_cache$
^/_synapse/admin/v1/room/.*/media$
^/_synapse/admin/v1/quarantine_media/.*$
You should also set ``enable_media_repo: False`` in the shared configuration
file to stop the main synapse running background jobs related to managing the
media repository.

View File

@@ -35,4 +35,4 @@ try:
except ImportError:
pass
__version__ = "1.3.1"
__version__ = "1.2.1"

View File

@@ -61,7 +61,6 @@ class Codes(object):
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
USER_DEACTIVATED = "M_USER_DEACTIVATED"
class CodeMessageException(RuntimeError):
@@ -152,7 +151,7 @@ class UserDeactivatedError(SynapseError):
msg (str): The human-readable error message
"""
super(UserDeactivatedError, self).__init__(
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.UNKNOWN
)

View File

@@ -17,10 +17,10 @@ import gc
import logging
import os
import signal
import socket
import sys
import traceback
import sdnotify
from daemonize import Daemonize
from twisted.internet import defer, error, reactor
@@ -246,12 +246,13 @@ def start(hs, listeners=None):
def handle_sighup(*args, **kwargs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")
sd_channel = sdnotify.SystemdNotifier()
sd_channel.notify("RELOADING=1")
for i in _sighup_callbacks:
i(hs)
sdnotify(b"READY=1")
sd_channel.notify("READY=1")
signal.signal(signal.SIGHUP, handle_sighup)
@@ -307,12 +308,16 @@ def setup_sdnotify(hs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sd_channel = sdnotify.SystemdNotifier()
hs.get_reactor().addSystemEventTrigger(
"after", "startup", sdnotify, b"READY=1\nMAINPID=%i" % (os.getpid(),)
"after",
"startup",
lambda: sd_channel.notify("READY=1\nMAINPID=%s" % (os.getpid())),
)
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", sdnotify, b"STOPPING=1"
"before", "shutdown", lambda: sd_channel.notify("STOPPING=1")
)
@@ -409,35 +414,3 @@ class _DeferredResolutionReceiver(object):
def resolutionComplete(self):
self._deferred.callback(())
self._receiver.resolutionComplete()
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
def sdnotify(state):
"""
Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
This function is based on the sdnotify python package, but since it's only a few
lines of code, it's easier to duplicate it here than to add a dependency on a
package which many OSes don't include as a matter of principle.
Args:
state (bytes): notification to send
"""
if not isinstance(state, bytes):
raise TypeError("sdnotify should be called with a bytes")
if not sdnotify_sockaddr:
return
addr = sdnotify_sockaddr
if addr[0] == "@":
addr = "\0" + addr[1:]
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
sock.connect(addr)
sock.sendall(state)
except Exception as e:
# this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
# unless systemd is expecting us to notify it.
logger.warning("Unable to send notification to systemd: %s", e)

View File

@@ -447,7 +447,7 @@ def setup(config_options):
reactor.stop()
sys.exit(1)
reactor.callWhenRunning(start)
reactor.addSystemEventTrigger("before", "startup", start)
return hs

View File

@@ -26,7 +26,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
@@ -36,7 +35,6 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -73,12 +71,6 @@ class MediaRepositoryServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media":
media_repo = self.get_media_repository_resource()
# We need to serve the admin servlets for media on the
# worker.
admin_resource = JsonResource(self, canonical_json=False)
register_servlets_for_media_repo(self, admin_resource)
resources.update(
{
MEDIA_PREFIX: media_repo,
@@ -86,7 +78,6 @@ class MediaRepositoryServer(HomeServer):
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
"/_synapse/admin": admin_resource,
}
)

View File

@@ -13,11 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from distutils.util import strtobool
import pkg_resources
from synapse.config._base import Config, ConfigError
from synapse.types import RoomAlias
from synapse.util.stringutils import random_string_with_symbols
@@ -44,36 +41,8 @@ class AccountValidityConfig(Config):
self.startup_job_max_delta = self.period * 10.0 / 100.0
if self.renew_by_email_enabled:
if "public_baseurl" not in synapse_config:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
template_dir = config.get("template_dir")
if not template_dir:
template_dir = pkg_resources.resource_filename("synapse", "res/templates")
if "account_renewed_html_path" in config:
file_path = os.path.join(template_dir, config["account_renewed_html_path"])
self.account_renewed_html_content = self.read_file(
file_path, "account_validity.account_renewed_html_path"
)
else:
self.account_renewed_html_content = (
"<html><body>Your account has been successfully renewed.</body><html>"
)
if "invalid_token_html_path" in config:
file_path = os.path.join(template_dir, config["invalid_token_html_path"])
self.invalid_token_html_content = self.read_file(
file_path, "account_validity.invalid_token_html_path"
)
else:
self.invalid_token_html_content = (
"<html><body>Invalid renewal token.</body><html>"
)
if self.renew_by_email_enabled and "public_baseurl" not in synapse_config:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
class RegistrationConfig(Config):
@@ -176,16 +145,6 @@ class RegistrationConfig(Config):
# period: 6w
# renew_at: 1w
# renew_email_subject: "Renew your %%(app)s account"
# # Directory in which Synapse will try to find the HTML files to serve to the
# # user when trying to renew an account. Optional, defaults to
# # synapse/res/templates.
# template_dir: "res/templates"
# # HTML to be displayed to the user after they successfully renewed their
# # account. Optional.
# account_renewed_html_path: "account_renewed.html"
# # HTML to be displayed when the user tries to renew an account with an invalid
# # renewal token. Optional.
# invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#

View File

@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from collections import namedtuple
@@ -88,18 +87,6 @@ def parse_thumbnail_requirements(thumbnail_sizes):
class ContentRepositoryConfig(Config):
def read_config(self, config, **kwargs):
# Only enable the media repo if either the media repo is enabled or the
# current worker app is the media repo.
if (
self.enable_media_repo is False
and config.get("worker_app") != "synapse.app.media_repository"
):
self.can_load_media_repo = False
return
else:
self.can_load_media_repo = True
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
@@ -215,13 +202,6 @@ class ContentRepositoryConfig(Config):
return (
r"""
## Media Store ##
# Enable the media store service in the Synapse master. Uncomment the
# following if you are using a separate media store worker.
#
#enable_media_repo: false
# Directory where uploaded images and attachments are stored.
#
media_store_path: "%(media_store)s"

View File

@@ -95,10 +95,10 @@ class EventValidator(object):
elif event.type == EventTypes.Topic:
self._ensure_strings(event.content, ["topic"])
self._ensure_state_event(event)
elif event.type == EventTypes.Name:
self._ensure_strings(event.content, ["name"])
self._ensure_state_event(event)
elif event.type == EventTypes.Member:
if "membership" not in event.content:
raise SynapseError(400, "Content has not membership key")
@@ -106,25 +106,9 @@ class EventValidator(object):
if event.content["membership"] not in Membership.LIST:
raise SynapseError(400, "Invalid membership key")
self._ensure_state_event(event)
elif event.type == EventTypes.Tombstone:
if "replacement_room" not in event.content:
raise SynapseError(400, "Content has no replacement_room key")
if event.content["replacement_room"] == event.room_id:
raise SynapseError(
400, "Tombstone cannot reference the room it was sent in"
)
self._ensure_state_event(event)
def _ensure_strings(self, d, keys):
for s in keys:
if s not in d:
raise SynapseError(400, "'%s' not in content" % (s,))
if not isinstance(d[s], string_types):
raise SynapseError(400, "'%s' not a string type" % (s,))
def _ensure_state_event(self, event):
if not event.is_state():
raise SynapseError(400, "'%s' must be state events" % (event.type,))

View File

@@ -511,8 +511,9 @@ class FederationClient(FederationBase):
The [Deferred] result of callback, if it succeeds
Raises:
SynapseError if the chosen remote server returns a 300/400 code, or
no servers were reachable.
SynapseError if the chosen remote server returns a 300/400 code.
RuntimeError if no servers were reachable.
"""
for destination in destinations:
if destination == self.server_name:
@@ -537,7 +538,7 @@ class FederationClient(FederationBase):
except Exception:
logger.warn("Failed to %s via %s", description, destination, exc_info=1)
raise SynapseError(502, "Failed to %s via any server" % (description,))
raise RuntimeError("Failed to %s via any server" % (description,))
def make_membership_event(
self, destinations, room_id, user_id, membership, content, params

View File

@@ -19,8 +19,6 @@ import functools
import logging
import re
from twisted.internet.defer import maybeDeferred
import synapse
import synapse.logging.opentracing as opentracing
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
@@ -747,12 +745,8 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
data = await maybeDeferred(
self.handler.get_local_public_room_list,
limit,
since_token,
network_tuple=network_tuple,
from_federation=True,
data = await self.handler.get_local_public_room_list(
limit, since_token, network_tuple=network_tuple, from_federation=True
)
return 200, data

View File

@@ -226,19 +226,11 @@ class AccountValidityHandler(object):
Args:
renewal_token (str): Token sent with the renewal request.
Returns:
bool: Whether the provided token is valid.
"""
try:
user_id = yield self.store.get_user_from_renewal_token(renewal_token)
except StoreError:
defer.returnValue(False)
user_id = yield self.store.get_user_from_renewal_token(renewal_token)
logger.debug("Renewing an account for user %s", user_id)
yield self.renew_account_for_user(user_id)
defer.returnValue(True)
@defer.inlineCallbacks
def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False):
"""Renews the account attached to a given user by pushing back the

View File

@@ -978,9 +978,6 @@ class FederationHandler(BaseHandler):
except NotRetryingDestination as e:
logger.info(str(e))
continue
except RequestSendFailed as e:
logger.info("Falied to get backfill from %s because %s", dom, e)
continue
except FederationDeniedError as e:
logger.info(e)
continue

View File

@@ -126,12 +126,9 @@ class GroupsLocalHandler(object):
group_id, requester_user_id
)
else:
try:
res = yield self.transport_client.get_group_summary(
get_domain_from_id(group_id), group_id, requester_user_id
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.get_group_summary(
get_domain_from_id(group_id), group_id, requester_user_id
)
group_server_name = get_domain_from_id(group_id)
@@ -186,12 +183,9 @@ class GroupsLocalHandler(object):
content["user_profile"] = yield self.profile_handler.get_profile(user_id)
try:
res = yield self.transport_client.create_group(
get_domain_from_id(group_id), group_id, user_id, content
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.create_group(
get_domain_from_id(group_id), group_id, user_id, content
)
remote_attestation = res["attestation"]
yield self.attestations.verify_attestation(
@@ -227,12 +221,9 @@ class GroupsLocalHandler(object):
group_server_name = get_domain_from_id(group_id)
try:
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id
)
chunk = res["chunk"]
valid_entries = []
@@ -267,12 +258,9 @@ class GroupsLocalHandler(object):
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
try:
res = yield self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content
)
remote_attestation = res["attestation"]
@@ -311,12 +299,9 @@ class GroupsLocalHandler(object):
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
try:
res = yield self.transport_client.accept_group_invite(
get_domain_from_id(group_id), group_id, user_id, content
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.accept_group_invite(
get_domain_from_id(group_id), group_id, user_id, content
)
remote_attestation = res["attestation"]
@@ -353,16 +338,13 @@ class GroupsLocalHandler(object):
group_id, user_id, requester_user_id, content
)
else:
try:
res = yield self.transport_client.invite_to_group(
get_domain_from_id(group_id),
group_id,
user_id,
requester_user_id,
content,
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.invite_to_group(
get_domain_from_id(group_id),
group_id,
user_id,
requester_user_id,
content,
)
return res
@@ -416,16 +398,13 @@ class GroupsLocalHandler(object):
)
else:
content["requester_user_id"] = requester_user_id
try:
res = yield self.transport_client.remove_user_from_group(
get_domain_from_id(group_id),
group_id,
requester_user_id,
user_id,
content,
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
res = yield self.transport_client.remove_user_from_group(
get_domain_from_id(group_id),
group_id,
requester_user_id,
user_id,
content,
)
return res
@@ -456,13 +435,9 @@ class GroupsLocalHandler(object):
return {"groups": result}
else:
try:
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id]
)
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id]
)
result = bulk_result.get("users", {}).get(user_id)
# TODO: Verify attestations
return {"groups": result}

View File

@@ -795,6 +795,7 @@ class EventCreationHandler(object):
get_prev_content=False,
allow_rejected=False,
allow_none=True,
check_room_id=event.room_id,
)
# we can make some additional checks now if we have the original event.
@@ -802,9 +803,6 @@ class EventCreationHandler(object):
if original_event.type == EventTypes.Create:
raise AuthError(403, "Redacting create events is not permitted")
if original_event.room_id != event.room_id:
raise SynapseError(400, "Cannot redact event from a different room")
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True

View File

@@ -781,17 +781,9 @@ class SyncHandler(object):
lazy_load_members=lazy_load_members,
)
elif batch.limited:
if batch:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
else:
# Its not clear how we get here, but empirically we do
# (#5407). Logging has been added elsewhere to try and
# figure out where this state comes from.
state_at_timeline_start = yield self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
@@ -811,17 +803,9 @@ class SyncHandler(object):
room_id, stream_position=since_token, state_filter=state_filter
)
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
else:
# Its not clear how we get here, but empirically we do
# (#5407). Logging has been added elsewhere to try and
# figure out where this state comes from.
current_state_ids = yield self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
state_ids = _calculate_state(
timeline_contains=timeline_state,
@@ -1771,21 +1755,6 @@ class SyncHandler(object):
newly_joined_room=newly_joined,
)
if not batch and batch.limited:
# This resulted in #5407, which is weird, so lets log! We do it
# here as we have the maximum amount of information.
user_id = sync_result_builder.sync_config.user.to_string()
logger.info(
"Issue #5407: Found limited batch with no events. user %s, room %s,"
" sync_config %s, newly_joined %s, events %s, batch %s.",
user_id,
room_id,
sync_config,
newly_joined,
events,
batch,
)
if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(

View File

@@ -12,8 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import random
import time
import attr
from netaddr import IPAddress
@@ -22,16 +24,31 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
# jitter to add to the .well-known default cache ttl
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
# period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
logger = logging.getLogger(__name__)
well_known_cache = TTLCache("well-known")
@implementer(IAgent)
@@ -61,7 +78,7 @@ class MatrixFederationAgent(object):
reactor,
tls_client_options_factory,
_srv_resolver=None,
_well_known_cache=None,
_well_known_cache=well_known_cache,
):
self._reactor = reactor
self._clock = Clock(reactor)
@@ -76,15 +93,20 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
self._well_known_resolver = WellKnownResolver(
self._reactor,
agent=Agent(
_well_known_agent = RedirectAgent(
Agent(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
),
well_known_cache=_well_known_cache,
)
)
self._well_known_agent = _well_known_agent
# our cache of .well-known lookup results, mapping from server name
# to delegated name. The values can be:
# `bytes`: a valid server-name
# `None`: there is no (valid) .well-known here
self._well_known_cache = _well_known_cache
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
@@ -195,10 +217,7 @@ class MatrixFederationAgent(object):
if lookup_well_known:
# try a .well-known lookup
well_known_result = yield self._well_known_resolver.get_well_known(
parsed_uri.host
)
well_known_server = well_known_result.delegated_server
well_known_server = yield self._get_well_known(parsed_uri.host)
if well_known_server:
# if we found a .well-known, start again, but don't do another
@@ -261,6 +280,85 @@ class MatrixFederationAgent(object):
target_port=port,
)
@defer.inlineCallbacks
def _get_well_known(self, server_name):
"""Attempt to fetch and parse a .well-known file for the given server
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[bytes|None]: either the new server name, from the .well-known, or
None if there was no .well-known file.
"""
try:
result = self._well_known_cache[server_name]
except KeyError:
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._do_get_well_known(server_name)
if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period)
return result
@defer.inlineCallbacks
def _do_get_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[Tuple[bytes|None|object],int]:
result, cache period, where result is one of:
- the new server name from the .well-known (as a `bytes`)
- None if there was no .well-known file.
- INVALID_WELL_KNOWN if the .well-known was invalid
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if response.code != 200:
raise Exception("Non-200 response %s" % (response.code,))
parsed_body = json.loads(body.decode("utf-8"))
logger.info("Response from .well-known: %s", parsed_body)
if not isinstance(parsed_body, dict):
raise Exception("not a dict")
if "m.server" not in parsed_body:
raise Exception("Missing key 'm.server'")
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
# add some randomness to the TTL to avoid a stampeding herd every hour
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
return (None, cache_period)
result = parsed_body["m.server"].encode("ascii")
cache_period = _cache_period_from_headers(
response.headers, time_now=self._reactor.seconds
)
if cache_period is None:
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
# after startup
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
return (result, cache_period)
@implementer(IStreamClientEndpoint)
class LoggingHostnameEndpoint(object):
@@ -276,6 +374,44 @@ class LoggingHostnameEndpoint(object):
return self.ep.connect(protocol_factory)
def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers)
if b"no-store" in cache_controls:
return 0
if b"max-age" in cache_controls:
try:
max_age = int(cache_controls[b"max-age"])
return max_age
except ValueError:
pass
expires = headers.getRawHeaders(b"expires")
if expires is not None:
try:
expires_date = stringToDatetime(expires[-1])
return expires_date - time_now()
except ValueError:
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
# especially the value "0", as representing a time in the past (i.e.,
# "already expired").
return 0
return None
def _parse_cache_control(headers):
cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","):
splits = [x.strip() for x in directive.split(b"=", 1)]
k = splits[0].lower()
v = splits[1] if len(splits) > 1 else None
cache_controls[k] = v
return cache_controls
@attr.s
class _RoutingResult(object):
"""The result returned by `_route_matrix_uri`.

View File

@@ -1,187 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import random
import time
import attr
from twisted.internet import defer
from twisted.web.client import RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
# jitter to add to the .well-known default cache ttl
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
# period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
# lower bound for .well-known cache period
WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
logger = logging.getLogger(__name__)
_well_known_cache = TTLCache("well-known")
@attr.s(slots=True, frozen=True)
class WellKnownLookupResult(object):
delegated_server = attr.ib()
class WellKnownResolver(object):
"""Handles well-known lookups for matrix servers.
"""
def __init__(self, reactor, agent, well_known_cache=None):
self._reactor = reactor
self._clock = Clock(reactor)
if well_known_cache is None:
well_known_cache = _well_known_cache
self._well_known_cache = well_known_cache
self._well_known_agent = RedirectAgent(agent)
@defer.inlineCallbacks
def get_well_known(self, server_name):
"""Attempt to fetch and parse a .well-known file for the given server
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
"""
try:
result = self._well_known_cache[server_name]
except KeyError:
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._do_get_well_known(server_name)
if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period)
return WellKnownLookupResult(delegated_server=result)
@defer.inlineCallbacks
def _do_get_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[Tuple[bytes|None|object],int]:
result, cache period, where result is one of:
- the new server name from the .well-known (as a `bytes`)
- None if there was no .well-known file.
- INVALID_WELL_KNOWN if the .well-known was invalid
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if response.code != 200:
raise Exception("Non-200 response %s" % (response.code,))
parsed_body = json.loads(body.decode("utf-8"))
logger.info("Response from .well-known: %s", parsed_body)
if not isinstance(parsed_body, dict):
raise Exception("not a dict")
if "m.server" not in parsed_body:
raise Exception("Missing key 'm.server'")
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
# add some randomness to the TTL to avoid a stampeding herd every hour
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
return (None, cache_period)
result = parsed_body["m.server"].encode("ascii")
cache_period = _cache_period_from_headers(
response.headers, time_now=self._reactor.seconds
)
if cache_period is None:
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
# after startup
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
return (result, cache_period)
def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers)
if b"no-store" in cache_controls:
return 0
if b"max-age" in cache_controls:
try:
max_age = int(cache_controls[b"max-age"])
return max_age
except ValueError:
pass
expires = headers.getRawHeaders(b"expires")
if expires is not None:
try:
expires_date = stringToDatetime(expires[-1])
return expires_date - time_now()
except ValueError:
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
# especially the value "0", as representing a time in the past (i.e.,
# "already expired").
return 0
return None
def _parse_cache_control(headers):
cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","):
splits = [x.strip() for x in directive.split(b"=", 1)]
k = splits[0].lower()
v = splits[1] if len(splits) > 1 else None
cache_controls[k] = v
return cache_controls

View File

@@ -166,12 +166,7 @@ def parse_string_from_args(
value = args[name][0]
if encoding:
try:
value = value.decode(encoding)
except ValueError:
raise SynapseError(
400, "Query parameter %r must be %s" % (name, encoding)
)
value = value.decode(encoding)
if allowed_values is not None and value not in allowed_values:
message = "Query parameter %r must be one of [%s]" % (

View File

@@ -245,13 +245,7 @@ BASE_APPEND_OVERRIDE_RULES = [
"key": "type",
"pattern": "m.room.tombstone",
"_id": "_tombstone",
},
{
"kind": "event_match",
"key": "state_key",
"pattern": "",
"_id": "_tombstone_statekey",
},
}
],
"actions": ["notify", {"set_tweak": "highlight", "value": True}],
},

View File

@@ -234,19 +234,13 @@ class EmailPusher(object):
return
self.last_stream_ordering = last_stream_ordering
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
last_stream_ordering,
self.clock.time_msec(),
)
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
last_stream_ordering,
self.clock.time_msec(),
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000

View File

@@ -199,21 +199,13 @@ class HttpPusher(object):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
self.clock.time_msec(),
)
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
self.clock.time_msec(),
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return
if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
@@ -242,17 +234,12 @@ class HttpPusher(object):
)
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
yield self.store.update_pusher_last_stream_ordering(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return
self.failing_since = None
yield self.store.update_pusher_failing_since(

View File

@@ -72,6 +72,7 @@ REQUIREMENTS = [
"netaddr>=0.7.18",
"Jinja2>=2.9",
"bleach>=1.4.3",
"sdnotify>=0.3",
]
CONDITIONAL_REQUIREMENTS = {

View File

@@ -1 +0,0 @@
<html><body>Your account has been successfully renewed.</body><html>

View File

@@ -1 +0,0 @@
<html><body>Invalid renewal token.</body><html>

View File

@@ -27,7 +27,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import Membership, UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.http.server import JsonResource
from synapse.http.servlet import (
RestServlet,
@@ -36,12 +36,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
from synapse.rest.admin._base import (
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string
@@ -49,6 +44,28 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
def historical_admin_path_patterns(path_regex):
"""Returns the list of patterns for an admin endpoint, including historical ones
This is a backwards-compatibility hack. Previously, the Admin API was exposed at
various paths under /_matrix/client. This function returns a list of patterns
matching those paths (as well as the new one), so that existing scripts which rely
on the endpoints being available there are not broken.
Note that this should only be used for existing endpoints: new ones should just
register for the /_synapse/admin path.
"""
return list(
re.compile(prefix + path_regex)
for prefix in (
"^/_synapse/admin/v1",
"^/_matrix/client/api/v1/admin",
"^/_matrix/client/unstable/admin",
"^/_matrix/client/r0/admin",
)
)
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
@@ -238,6 +255,25 @@ class WhoisRestServlet(RestServlet):
return (200, ret)
class PurgeMediaCacheRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/purge_media_cache")
def __init__(self, hs):
self.media_repository = hs.get_media_repository()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request):
yield assert_requester_is_admin(self.auth, request)
before_ts = parse_integer(request, "before_ts", required=True)
logger.info("before_ts: %r", before_ts)
ret = yield self.media_repository.delete_old_remote_media(before_ts)
return (200, ret)
class PurgeHistoryRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns(
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@@ -506,6 +542,50 @@ class ShutdownRoomRestServlet(RestServlet):
)
class QuarantineMediaInRoom(RestServlet):
"""Quarantines all media in a room so that no one can download it via
this server.
"""
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield assert_user_is_admin(self.auth, requester.user)
num_quarantined = yield self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string()
)
return (200, {"num_quarantined": num_quarantined})
class ListMediaInRoom(RestServlet):
"""Lists all of the media in a given room.
"""
PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
return (200, {"local": local_mxcs, "remote": remote_mxcs})
class ResetPasswordRestServlet(RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
@@ -745,6 +825,7 @@ def register_servlets(hs, http_server):
def register_servlets_for_client_rest_resource(hs, http_server):
"""Register only the servlets which need to be exposed on /_matrix/client/xxx"""
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
PurgeHistoryStatusRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
@@ -753,13 +834,10 @@ def register_servlets_for_client_rest_resource(hs, http_server):
GetUsersPaginatedRestServlet(hs).register(http_server)
SearchUsersRestServlet(hs).register(http_server)
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)
DeleteGroupAdminRestServlet(hs).register(http_server)
AccountValidityRenewServlet(hs).register(http_server)
# Load the media repo ones if we're using them.
if hs.config.can_load_media_repo:
register_servlets_for_media_repo(hs, http_server)
# don't add more things here: new servlets should only be exposed on
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.

View File

@@ -12,36 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from twisted.internet import defer
from synapse.api.errors import AuthError
def historical_admin_path_patterns(path_regex):
"""Returns the list of patterns for an admin endpoint, including historical ones
This is a backwards-compatibility hack. Previously, the Admin API was exposed at
various paths under /_matrix/client. This function returns a list of patterns
matching those paths (as well as the new one), so that existing scripts which rely
on the endpoints being available there are not broken.
Note that this should only be used for existing endpoints: new ones should just
register for the /_synapse/admin path.
"""
return list(
re.compile(prefix + path_regex)
for prefix in (
"^/_synapse/admin/v1",
"^/_matrix/client/api/v1/admin",
"^/_matrix/client/unstable/admin",
"^/_matrix/client/r0/admin",
)
)
@defer.inlineCallbacks
def assert_requester_is_admin(auth, request):
"""Verify that the requester is an admin user

View File

@@ -1,101 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.errors import AuthError
from synapse.http.servlet import RestServlet, parse_integer
from synapse.rest.admin._base import (
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
)
logger = logging.getLogger(__name__)
class QuarantineMediaInRoom(RestServlet):
"""Quarantines all media in a room so that no one can download it via
this server.
"""
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield assert_user_is_admin(self.auth, requester.user)
num_quarantined = yield self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string()
)
return (200, {"num_quarantined": num_quarantined})
class ListMediaInRoom(RestServlet):
"""Lists all of the media in a given room.
"""
PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
return (200, {"local": local_mxcs, "remote": remote_mxcs})
class PurgeMediaCacheRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/purge_media_cache")
def __init__(self, hs):
self.media_repository = hs.get_media_repository()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request):
yield assert_requester_is_admin(self.auth, request)
before_ts = parse_integer(request, "before_ts", required=True)
logger.info("before_ts: %r", before_ts)
ret = yield self.media_repository.delete_old_remote_media(before_ts)
return (200, ret)
def register_servlets_for_media_repo(hs, http_server):
"""
Media repo specific APIs.
"""
PurgeMediaCacheRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)

View File

@@ -42,8 +42,6 @@ class AccountValidityRenewServlet(RestServlet):
self.hs = hs
self.account_activity_handler = hs.get_account_validity_handler()
self.auth = hs.get_auth()
self.success_html = hs.config.account_validity.account_renewed_html_content
self.failure_html = hs.config.account_validity.invalid_token_html_content
@defer.inlineCallbacks
def on_GET(self, request):
@@ -51,23 +49,16 @@ class AccountValidityRenewServlet(RestServlet):
raise SynapseError(400, "Missing renewal token")
renewal_token = request.args[b"token"][0]
token_valid = yield self.account_activity_handler.renew_account(
renewal_token.decode("utf8")
)
yield self.account_activity_handler.renew_account(renewal_token.decode("utf8"))
if token_valid:
status_code = 200
response = self.success_html
else:
status_code = 404
response = self.failure_html
request.setResponseCode(status_code)
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%d" % (len(response),))
request.write(response.encode("utf8"))
request.setHeader(
b"Content-Length", b"%d" % (len(AccountValidityRenewServlet.SUCCESS_HTML),)
)
request.write(AccountValidityRenewServlet.SUCCESS_HTML)
finish_request(request)
defer.returnValue(None)
return None
class AccountValiditySendMailServlet(RestServlet):
@@ -96,7 +87,7 @@ class AccountValiditySendMailServlet(RestServlet):
user_id = requester.user.to_string()
yield self.account_activity_handler.send_renewal_email_to_user(user_id)
defer.returnValue((200, {}))
return (200, {})
def register_servlets(hs, http_server):

View File

@@ -33,7 +33,6 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
from synapse.config._base import ConfigError
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async_helpers import Linearizer
@@ -754,11 +753,8 @@ class MediaRepositoryResource(Resource):
"""
def __init__(self, hs):
# If we're not configured to use it, raise if we somehow got here.
if not hs.config.can_load_media_repo:
raise ConfigError("Synapse is not configured to use a media repo.")
Resource.__init__(self)
super().__init__()
media_repo = hs.get_media_repository()
self.putChild(b"upload", UploadResource(hs, media_repo))

View File

@@ -364,161 +364,147 @@ class EventsStore(
if not events_and_contexts:
return
chunks = [
events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
]
if backfilled:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(
with stream_ordering_manager as stream_orderings:
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
# NB: Assumes that we are only persisting events for one room
# at a time.
chunks = [
events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
]
# map room_id->list[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = {}
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}
# NB: Assumes that we are only persisting events for one room
# at a time.
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
# map room_id->list[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = {}
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = {}
for event, context in chunk:
events_by_room.setdefault(event.room_id, []).append(
(event, context)
)
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}
for room_id, ev_ctx_rm in iteritems(events_by_room):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
latest_event_ids = set(latest_event_ids)
if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"
new_forward_extremeties[room_id] = new_latest_event_ids
len_1 = (
len(latest_event_ids) == 1
and len(new_latest_event_ids) == 1
)
if len_1:
all_single_prev_not_state = all(
len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = {}
for event, context in chunk:
events_by_room.setdefault(event.room_id, []).append(
(event, context)
)
# Don't bother calculating state if they're just
# a long chain of single ancestor non-state events.
if all_single_prev_not_state:
for room_id, ev_ctx_rm in iteritems(events_by_room):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
latest_event_ids = set(latest_event_ids)
if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
state_delta_counter.inc()
if len(new_latest_event_ids) == 1:
state_delta_single_event_counter.inc()
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
# processing one of these events.
# What we're interested in is if the latest extremities
# were the same when we created the event as they are
# now. When this server creates a new event (as opposed
# to receiving it over federation) it will use the
# forward extremities as the prev_events, so we can
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
new_forward_extremeties[room_id] = new_latest_event_ids
logger.info("Calculating state delta for room %s", room_id)
with Measure(
self._clock, "persist_events.get_new_state_after_events"
):
res = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
len_1 = (
len(latest_event_ids) == 1
and len(new_latest_event_ids) == 1
)
current_state, delta_ids = res
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
):
delta = yield self._calculate_state_delta(
room_id, current_state
if len_1:
all_single_prev_not_state = all(
len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
)
state_delta_for_room[room_id] = delta
# Don't bother calculating state if they're just
# a long chain of single ancestor non-state events.
if all_single_prev_not_state:
continue
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
state_delta_counter.inc()
if len(new_latest_event_ids) == 1:
state_delta_single_event_counter.inc()
# We want to calculate the stream orderings as late as possible, as
# we only notify after all events with a lesser stream ordering have
# been persisted. I.e. if we spend 10s inside the with block then
# that will delay all subsequent events from being notified about.
# Hence why we do it down here rather than wrapping the entire
# function.
#
# Its safe to do this after calculating the state deltas etc as we
# only need to protect the *persistence* of the events. This is to
# ensure that queries of the form "fetch events since X" don't
# return events and stream positions after events that are still in
# flight, as otherwise subsequent requests "fetch event since Y"
# will not return those events.
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(chunk)
)
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(len(chunk))
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
# processing one of these events.
# What we're interested in is if the latest extremities
# were the same when we created the event as they are
# now. When this server creates a new event (as opposed
# to receiving it over federation) it will use the
# forward extremities as the prev_events, so we can
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
with stream_ordering_manager as stream_orderings:
for (event, context), stream in zip(chunk, stream_orderings):
event.internal_metadata.stream_ordering = stream
logger.info("Calculating state delta for room %s", room_id)
with Measure(
self._clock, "persist_events.get_new_state_after_events"
):
res = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
)
current_state, delta_ids = res
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
):
delta = yield self._calculate_state_delta(
room_id, current_state
)
state_delta_for_room[room_id] = delta
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
yield self.runInteraction(
"persist_events",

View File

@@ -29,7 +29,12 @@ from synapse.api.room_versions import EventFormatVersions
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util import batch_iter
@@ -337,12 +342,13 @@ class EventsWorkerStore(SQLBaseStore):
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
# Note that _get_events_from_db is also responsible for turning db rows
# Note that _enqueue_events is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
missing_events = yield self._get_events_from_db(
# _enqueue_events is a bit of a rubbish name but naming is hard.
missing_events = yield self._enqueue_events(
missing_events_ids, allow_rejected=allow_rejected
)
@@ -415,28 +421,28 @@ class EventsWorkerStore(SQLBaseStore):
The fetch requests. Each entry consists of a list of event
ids to be fetched, and a deferred to be completed once the
events have been fetched.
The deferreds are callbacked with a dictionary mapping from event id
to event row. Note that it may well contain additional events that
were not part of this request.
"""
with Measure(self._clock, "_fetch_event_list"):
try:
events_to_fetch = set(
event_id for events, _ in event_list for event_id in events
)
event_id_lists = list(zip(*event_list))[0]
event_ids = [item for sublist in event_id_lists for item in sublist]
row_dict = self._new_transaction(
conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
conn, "do_fetch", [], [], self._fetch_event_rows, event_ids
)
# We only want to resolve deferreds from the main thread
def fire():
for _, d in event_list:
d.callback(row_dict)
def fire(lst, res):
for ids, d in lst:
if not d.called:
try:
with PreserveLoggingContext():
d.callback([res[i] for i in ids if i in res])
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire)
self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@@ -451,98 +457,13 @@ class EventsWorkerStore(SQLBaseStore):
self.hs.get_reactor().callFromThread(fire, event_list, e)
@defer.inlineCallbacks
def _get_events_from_db(self, event_ids, allow_rejected=False):
"""Fetch a bunch of events from the database.
Returned events will be added to the cache for future lookups.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
map from event id to result. May return extra events which
weren't asked for.
"""
fetched_events = {}
events_to_fetch = event_ids
while events_to_fetch:
row_map = yield self._enqueue_events(events_to_fetch)
# we need to recursively fetch any redactions of those events
redaction_ids = set()
for event_id in events_to_fetch:
row = row_map.get(event_id)
fetched_events[event_id] = row
if row:
redaction_ids.update(row["redactions"])
events_to_fetch = redaction_ids.difference(fetched_events.keys())
if events_to_fetch:
logger.debug("Also fetching redaction events %s", events_to_fetch)
# build a map from event_id to EventBase
event_map = {}
for event_id, row in fetched_events.items():
if not row:
continue
assert row["event_id"] == event_id
rejected_reason = row["rejected_reason"]
if not allow_rejected and rejected_reason:
continue
d = json.loads(row["json"])
internal_metadata = json.loads(row["internal_metadata"])
format_version = row["format_version"]
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
format_version = EventFormatVersions.V1
original_ev = event_type_from_format_version(format_version)(
event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
event_map[event_id] = original_ev
# finally, we can decide whether each one nededs redacting, and build
# the cache entries.
result_map = {}
for event_id, original_ev in event_map.items():
redactions = fetched_events[event_id]["redactions"]
redacted_event = self._maybe_redact_event_row(
original_ev, redactions, event_map
)
cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)
self._get_event_cache.prefill((event_id,), cache_entry)
result_map[event_id] = cache_entry
return result_map
@defer.inlineCallbacks
def _enqueue_events(self, events):
def _enqueue_events(self, events, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
Args:
events (Iterable[str]): events to be fetched.
Returns:
Deferred[Dict[str, Dict]]: map from event id to row data from the database.
May contain events that weren't requested.
"""
if not events:
return {}
events_d = defer.Deferred()
with self._event_fetch_lock:
@@ -561,12 +482,32 @@ class EventsWorkerStore(SQLBaseStore):
"fetch_events", self.runWithConnection, self._do_fetch
)
logger.debug("Loading %d events: %s", len(events), events)
logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
row_map = yield events_d
logger.debug("Loaded %d events (%d rows)", len(events), len(row_map))
rows = yield events_d
logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
return row_map
if not allow_rejected:
rows[:] = [r for r in rows if r["rejected_reason"] is None]
res = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._get_event_from_row,
row["internal_metadata"],
row["json"],
row["redactions"],
rejected_reason=row["rejected_reason"],
format_version=row["format_version"],
)
for row in rows
],
consumeErrors=True,
)
)
return {e.event.event_id: e for e in res if e}
def _fetch_event_rows(self, txn, event_ids):
"""Fetch event rows from the database
@@ -639,7 +580,50 @@ class EventsWorkerStore(SQLBaseStore):
return event_dict
def _maybe_redact_event_row(self, original_ev, redactions, event_map):
@defer.inlineCallbacks
def _get_event_from_row(
self, internal_metadata, js, redactions, format_version, rejected_reason=None
):
"""Parse an event row which has been read from the database
Args:
internal_metadata (str): json-encoded internal_metadata column
js (str): json-encoded event body from event_json
redactions (list[str]): a list of the events which claim to have redacted
this event, from the redactions table
format_version: (str): the 'format_version' column
rejected_reason (str|None): the reason this event was rejected, if any
Returns:
_EventCacheEntry
"""
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
format_version = EventFormatVersions.V1
original_ev = event_type_from_format_version(format_version)(
event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
redacted_event = yield self._maybe_redact_event_row(original_ev, redactions)
cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
return cache_entry
@defer.inlineCallbacks
def _maybe_redact_event_row(self, original_ev, redactions):
"""Given an event object and a list of possible redacting event ids,
determine whether to honour any of those redactions and if so return a redacted
event.
@@ -647,8 +631,6 @@ class EventsWorkerStore(SQLBaseStore):
Args:
original_ev (EventBase):
redactions (iterable[str]): list of event ids of potential redaction events
event_map (dict[str, EventBase]): other events which have been fetched, in
which we can look up the redaaction events. Map from event id to event.
Returns:
Deferred[EventBase|None]: if the event should be redacted, a pruned
@@ -658,9 +640,15 @@ class EventsWorkerStore(SQLBaseStore):
# we choose to ignore redactions of m.room.create events.
return None
if original_ev.type == "m.room.redaction":
# ... and redaction events
return None
redaction_map = yield self._get_events_from_cache_or_db(redactions)
for redaction_id in redactions:
redaction_event = event_map.get(redaction_id)
if not redaction_event or redaction_event.rejected_reason:
redaction_entry = redaction_map.get(redaction_id)
if not redaction_entry:
# we don't have the redaction event, or the redaction event was not
# authorized.
logger.debug(
@@ -670,6 +658,7 @@ class EventsWorkerStore(SQLBaseStore):
)
continue
redaction_event = redaction_entry.event
if redaction_event.room_id != original_ev.room_id:
logger.debug(
"%s was redacted by %s but redaction was in a different room!",

View File

@@ -308,36 +308,22 @@ class PusherStore(PusherWorkerStore):
def update_pusher_last_stream_ordering_and_success(
self, app_id, pushkey, user_id, last_stream_ordering, last_success
):
"""Update the last stream ordering position we've processed up to for
the given pusher.
Args:
app_id (str)
pushkey (str)
last_stream_ordering (int)
last_success (int)
Returns:
Deferred[bool]: True if the pusher still exists; False if it has been deleted.
"""
updated = yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={
yield self._simple_update_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{
"last_stream_ordering": last_stream_ordering,
"last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
return bool(updated)
@defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={"failing_since": failing_since},
yield self._simple_update_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{"failing_since": failing_since},
desc="update_pusher_failing_since",
)

View File

@@ -569,27 +569,6 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="get_id_servers_user_bound",
)
@cachedInlineCallbacks()
def get_user_deactivated_status(self, user_id):
"""Retrieve the value for the `deactivated` property for the provided user.
Args:
user_id (str): The ID of the user to retrieve the status for.
Returns:
defer.Deferred(bool): The requested value.
"""
res = yield self._simple_select_one_onecol(
table="users",
keyvalues={"name": user_id},
retcol="deactivated",
desc="get_user_deactivated_status",
)
# Convert the integer into a boolean.
return res == 1
class RegistrationStore(
RegistrationWorkerStore, background_updates.BackgroundUpdateStore
@@ -1338,3 +1317,24 @@ class RegistrationStore(
user_id,
deactivated,
)
@cachedInlineCallbacks()
def get_user_deactivated_status(self, user_id):
"""Retrieve the value for the `deactivated` property for the provided user.
Args:
user_id (str): The ID of the user to retrieve the status for.
Returns:
defer.Deferred(bool): The requested value.
"""
res = yield self._simple_select_one_onecol(
table="users",
keyvalues={"name": user_id},
retcol="deactivated",
desc="get_user_deactivated_status",
)
# Convert the integer into a boolean.
return res == 1

View File

@@ -25,19 +25,17 @@ from twisted.internet._sslverify import ClientTLSOptions, OpenSSLCertificateOpti
from twisted.internet.protocol import Factory
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web._newclient import ResponseNeverReceived
from twisted.web.client import Agent
from twisted.web.http import HTTPChannel
from twisted.web.http_headers import Headers
from twisted.web.iweb import IPolicyForHTTPS
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto.context_factory import ClientTLSOptionsFactory
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.federation.srv_resolver import Server
from synapse.http.federation.well_known_resolver import (
WellKnownResolver,
from synapse.http.federation.matrix_federation_agent import (
MatrixFederationAgent,
_cache_period_from_headers,
)
from synapse.http.federation.srv_resolver import Server
from synapse.logging.context import LoggingContext
from synapse.util.caches.ttlcache import TTLCache
@@ -81,10 +79,9 @@ class MatrixFederationAgentTests(TestCase):
self._config = config = HomeServerConfig()
config.parse_config_dict(config_dict, "", "")
self.tls_factory = ClientTLSOptionsFactory(config)
self.agent = MatrixFederationAgent(
reactor=self.reactor,
tls_client_options_factory=self.tls_factory,
tls_client_options_factory=ClientTLSOptionsFactory(config),
_srv_resolver=self.mock_resolver,
_well_known_cache=self.well_known_cache,
)
@@ -931,16 +928,20 @@ class MatrixFederationAgentTests(TestCase):
self.reactor.pump((0.1,))
self.successResultOf(test_d)
def test_well_known_cache(self):
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
@defer.inlineCallbacks
def do_get_well_known(self, serv):
try:
result = yield self.agent._get_well_known(serv)
logger.info("Result from well-known fetch: %s", result)
except Exception as e:
logger.warning("Error fetching well-known: %s", e)
raise
return result
def test_well_known_cache(self):
self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.do_get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
@@ -952,26 +953,26 @@ class MatrixFederationAgentTests(TestCase):
well_known_server = self._handle_well_known_connection(
client_factory,
expected_sni=b"testserv",
response_headers={b"Cache-Control": b"max-age=1000"},
response_headers={b"Cache-Control": b"max-age=10"},
content=b'{ "m.server": "target-server" }',
)
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")
self.assertEqual(r, b"target-server")
# close the tcp connection
well_known_server.loseConnection()
# repeat the request: it should hit the cache
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.do_get_well_known(b"testserv")
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")
self.assertEqual(r, b"target-server")
# expire the cache
self.reactor.pump((1000.0,))
self.reactor.pump((10.0,))
# now it should connect again
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.do_get_well_known(b"testserv")
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
@@ -985,7 +986,7 @@ class MatrixFederationAgentTests(TestCase):
)
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"other-server")
self.assertEqual(r, b"other-server")
class TestCachePeriodFromHeaders(TestCase):

View File

@@ -323,8 +323,6 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
"renew_at": 172800000, # Time in ms for 2 days
"renew_by_email_enabled": True,
"renew_email_subject": "Renew your account",
"account_renewed_html_path": "account_renewed.html",
"invalid_token_html_path": "invalid_token.html",
}
# Email config.
@@ -375,19 +373,6 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
self.render(request)
self.assertEquals(channel.result["code"], b"200", channel.result)
# Check that we're getting HTML back.
content_type = None
for header in channel.result.get("headers", []):
if header[0] == b"Content-Type":
content_type = header[1]
self.assertEqual(content_type, b"text/html; charset=utf-8", channel.result)
# Check that the HTML we're getting is the one we expect on a successful renewal.
expected_html = self.hs.config.account_validity.account_renewed_html_content
self.assertEqual(
channel.result["body"], expected_html.encode("utf8"), channel.result
)
# Move 3 days forward. If the renewal failed, every authed request with
# our access token should be denied from now, otherwise they should
# succeed.
@@ -396,28 +381,6 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
self.render(request)
self.assertEquals(channel.result["code"], b"200", channel.result)
def test_renewal_invalid_token(self):
# Hit the renewal endpoint with an invalid token and check that it behaves as
# expected, i.e. that it responds with 404 Not Found and the correct HTML.
url = "/_matrix/client/unstable/account_validity/renew?token=123"
request, channel = self.make_request(b"GET", url)
self.render(request)
self.assertEquals(channel.result["code"], b"404", channel.result)
# Check that we're getting HTML back.
content_type = None
for header in channel.result.get("headers", []):
if header[0] == b"Content-Type":
content_type = header[1]
self.assertEqual(content_type, b"text/html; charset=utf-8", channel.result)
# Check that the HTML we're getting is the one we expect when using an
# invalid/unknown token.
expected_html = self.hs.config.account_validity.invalid_token_html_content
self.assertEqual(
channel.result["body"], expected_html.encode("utf8"), channel.result
)
def test_manual_email_send(self):
self.email_attempts = []

View File

@@ -17,8 +17,6 @@
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.types import RoomID, UserID
@@ -218,71 +216,3 @@ class RedactionTestCase(unittest.HomeserverTestCase):
},
event.unsigned["redacted_because"],
)
def test_circular_redaction(self):
redaction_event_id1 = "$redaction1_id:test"
redaction_event_id2 = "$redaction2_id:test"
class EventIdManglingBuilder:
def __init__(self, base_builder, event_id):
self._base_builder = base_builder
self._event_id = event_id
@defer.inlineCallbacks
def build(self, prev_event_ids):
built_event = yield self._base_builder.build(prev_event_ids)
built_event.event_id = self._event_id
built_event._event_dict["event_id"] = self._event_id
return built_event
@property
def room_id(self):
return self._base_builder.room_id
event_1, context_1 = self.get_success(
self.event_creation_handler.create_new_client_event(
EventIdManglingBuilder(
self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Redaction,
"sender": self.u_alice.to_string(),
"room_id": self.room1.to_string(),
"content": {"reason": "test"},
"redacts": redaction_event_id2,
},
),
redaction_event_id1,
)
)
)
self.get_success(self.store.persist_event(event_1, context_1))
event_2, context_2 = self.get_success(
self.event_creation_handler.create_new_client_event(
EventIdManglingBuilder(
self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Redaction,
"sender": self.u_alice.to_string(),
"room_id": self.room1.to_string(),
"content": {"reason": "test"},
"redacts": redaction_event_id1,
},
),
redaction_event_id2,
)
)
)
self.get_success(self.store.persist_event(event_2, context_2))
# fetch one of the redactions
fetched = self.get_success(self.store.get_event(redaction_event_id1))
# it should have been redacted
self.assertEqual(fetched.unsigned["redacted_by"], redaction_event_id2)
self.assertEqual(
fetched.unsigned["redacted_because"].event_id, redaction_event_id2
)