mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
36 Commits
hs/delayed
...
shay/fix_g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
404a6e3d79 | ||
|
|
99aa2136c2 | ||
|
|
d950d99ab5 | ||
|
|
abc497bae6 | ||
|
|
2bb3af5310 | ||
|
|
fb5ac9208a | ||
|
|
5e72a85f33 | ||
|
|
b80daa3b2b | ||
|
|
86d135ac63 | ||
|
|
f0b65d057b | ||
|
|
466d2c5dfb | ||
|
|
0a91b06ede | ||
|
|
ab297fa0a1 | ||
|
|
b419e55d1e | ||
|
|
8f8a264fb2 | ||
|
|
7020aeac52 | ||
|
|
d06fcba002 | ||
|
|
69beef22c2 | ||
|
|
6630daba47 | ||
|
|
070f1bb720 | ||
|
|
059746dec4 | ||
|
|
2d06a39dd0 | ||
|
|
172b651832 | ||
|
|
b38f3e9217 | ||
|
|
28eca036bb | ||
|
|
49686ede02 | ||
|
|
af2a5f3893 | ||
|
|
066045f03e | ||
|
|
47bd7cff4e | ||
|
|
0da05c5a2c | ||
|
|
740a48de76 | ||
|
|
a7ed07944f | ||
|
|
b8ed35841d | ||
|
|
e215109b74 | ||
|
|
9adffc2c95 | ||
|
|
a08f32f8ed |
1
changelog.d/13487.misc
Normal file
1
changelog.d/13487.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactor ` _send_events_for_new_room` to separate creating and sending events.
|
||||
@@ -56,13 +56,16 @@ from synapse.logging import opentracing
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
|
||||
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
MutableStateMap,
|
||||
PersistedEventPosition,
|
||||
Requester,
|
||||
RoomAlias,
|
||||
StateMap,
|
||||
StreamToken,
|
||||
UserID,
|
||||
create_requester,
|
||||
@@ -492,6 +495,7 @@ class EventCreationHandler:
|
||||
self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
|
||||
|
||||
self.send_event = ReplicationSendEventRestServlet.make_client(hs)
|
||||
self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
|
||||
|
||||
self.request_ratelimiter = hs.get_request_ratelimiter()
|
||||
|
||||
@@ -627,49 +631,10 @@ class EventCreationHandler:
|
||||
"""
|
||||
await self.auth_blocking.check_auth_blocking(requester=requester)
|
||||
|
||||
if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
|
||||
room_version_id = event_dict["content"]["room_version"]
|
||||
maybe_room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version_id)
|
||||
if not maybe_room_version_obj:
|
||||
# this can happen if support is withdrawn for a room version
|
||||
raise UnsupportedRoomVersionError(room_version_id)
|
||||
room_version_obj = maybe_room_version_obj
|
||||
else:
|
||||
try:
|
||||
room_version_obj = await self.store.get_room_version(
|
||||
event_dict["room_id"]
|
||||
)
|
||||
except NotFoundError:
|
||||
raise AuthError(403, "Unknown room")
|
||||
|
||||
builder = self.event_builder_factory.for_room_version(
|
||||
room_version_obj, event_dict
|
||||
)
|
||||
|
||||
self.validator.validate_builder(builder)
|
||||
builder = await self._get_and_validate_builder(event_dict)
|
||||
|
||||
if builder.type == EventTypes.Member:
|
||||
membership = builder.content.get("membership", None)
|
||||
target = UserID.from_string(builder.state_key)
|
||||
|
||||
if membership in self.membership_types_to_include_profile_data_in:
|
||||
# If event doesn't include a display name, add one.
|
||||
profile = self.profile_handler
|
||||
content = builder.content
|
||||
|
||||
try:
|
||||
if "displayname" not in content:
|
||||
displayname = await profile.get_displayname(target)
|
||||
if displayname is not None:
|
||||
content["displayname"] = displayname
|
||||
if "avatar_url" not in content:
|
||||
avatar_url = await profile.get_avatar_url(target)
|
||||
if avatar_url is not None:
|
||||
content["avatar_url"] = avatar_url
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Failed to get profile information for %r: %s", target, e
|
||||
)
|
||||
await self._build_profile_data(builder)
|
||||
|
||||
is_exempt = await self._is_exempt_from_privacy_policy(builder, requester)
|
||||
if require_consent and not is_exempt:
|
||||
@@ -735,6 +700,126 @@ class EventCreationHandler:
|
||||
|
||||
return event, context
|
||||
|
||||
async def create_event_for_batch(
|
||||
self,
|
||||
requester: Requester,
|
||||
event_dict: dict,
|
||||
prev_event_ids: List[str],
|
||||
depth: int,
|
||||
state_map: StateMap,
|
||||
txn_id: Optional[str] = None,
|
||||
require_consent: bool = True,
|
||||
outlier: bool = False,
|
||||
) -> EventBase:
|
||||
"""
|
||||
Given a dict from a client, create a new event. Notably does not create an event
|
||||
context. Adds display names to Join membership events.
|
||||
|
||||
Args:
|
||||
requester
|
||||
event_dict: An entire event
|
||||
txn_id
|
||||
prev_event_ids:
|
||||
the forward extremities to use as the prev_events for the
|
||||
new event.
|
||||
state_map: a state_map of previously created events for batching. Will be used
|
||||
to calculate the auth_ids for the event, as the previously created events for
|
||||
batching will not yet have been persisted to the db
|
||||
require_consent: Whether to check if the requester has
|
||||
consented to the privacy policy.
|
||||
outlier: Indicates whether the event is an `outlier`, i.e. if
|
||||
it's from an arbitrary point and floating in the DAG as
|
||||
opposed to being inline with the current DAG.
|
||||
depth: Override the depth used to order the event in the DAG.
|
||||
|
||||
Returns:
|
||||
the created event
|
||||
"""
|
||||
await self.auth_blocking.check_auth_blocking(requester=requester)
|
||||
|
||||
builder = await self._get_and_validate_builder(event_dict)
|
||||
|
||||
if builder.type == EventTypes.Member:
|
||||
await self._build_profile_data(builder)
|
||||
|
||||
is_exempt = await self._is_exempt_from_privacy_policy(builder, requester)
|
||||
if require_consent and not is_exempt:
|
||||
await self.assert_accepted_privacy_policy(requester)
|
||||
|
||||
if requester.access_token_id is not None:
|
||||
builder.internal_metadata.token_id = requester.access_token_id
|
||||
|
||||
if txn_id is not None:
|
||||
builder.internal_metadata.txn_id = txn_id
|
||||
|
||||
builder.internal_metadata.outlier = outlier
|
||||
|
||||
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
|
||||
event = await builder.build(
|
||||
prev_event_ids=prev_event_ids,
|
||||
auth_event_ids=auth_ids,
|
||||
depth=depth,
|
||||
)
|
||||
# Pass on the outlier property from the builder to the event
|
||||
# after it is created
|
||||
if builder.internal_metadata.outlier:
|
||||
event.internal_metadata.outlier = True
|
||||
|
||||
self.validator.validate_new(event, self.config)
|
||||
|
||||
return event
|
||||
|
||||
async def _build_profile_data(self, builder: EventBuilder) -> None:
|
||||
"""
|
||||
Helper method to add profile information to membership event
|
||||
"""
|
||||
membership = builder.content.get("membership", None)
|
||||
target = UserID.from_string(builder.state_key)
|
||||
|
||||
if membership in self.membership_types_to_include_profile_data_in:
|
||||
# If event doesn't include a display name, add one.
|
||||
profile = self.profile_handler
|
||||
content = builder.content
|
||||
|
||||
try:
|
||||
if "displayname" not in content:
|
||||
displayname = await profile.get_displayname(target)
|
||||
if displayname is not None:
|
||||
content["displayname"] = displayname
|
||||
if "avatar_url" not in content:
|
||||
avatar_url = await profile.get_avatar_url(target)
|
||||
if avatar_url is not None:
|
||||
content["avatar_url"] = avatar_url
|
||||
except Exception as e:
|
||||
logger.info("Failed to get profile information for %r: %s", target, e)
|
||||
|
||||
async def _get_and_validate_builder(self, event_dict: dict) -> EventBuilder:
|
||||
"""
|
||||
Helper method to create and validate a builder object when creating an event
|
||||
"""
|
||||
if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
|
||||
room_version_id = event_dict["content"]["room_version"]
|
||||
maybe_room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version_id)
|
||||
if not maybe_room_version_obj:
|
||||
# this can happen if support is withdrawn for a room version
|
||||
raise UnsupportedRoomVersionError(room_version_id)
|
||||
room_version_obj = maybe_room_version_obj
|
||||
else:
|
||||
try:
|
||||
room_version_obj = await self.store.get_room_version(
|
||||
event_dict["room_id"]
|
||||
)
|
||||
except NotFoundError:
|
||||
raise AuthError(403, "Unknown room")
|
||||
|
||||
builder = self.event_builder_factory.for_room_version(
|
||||
room_version_obj, event_dict
|
||||
)
|
||||
|
||||
self.validator.validate_builder(builder)
|
||||
|
||||
return builder
|
||||
|
||||
async def _is_exempt_from_privacy_policy(
|
||||
self, builder: EventBuilder, requester: Requester
|
||||
) -> bool:
|
||||
@@ -1234,6 +1319,147 @@ class EventCreationHandler:
|
||||
400, "Cannot start threads from an event with a relation"
|
||||
)
|
||||
|
||||
async def handle_create_room_events(
|
||||
self,
|
||||
requester: Requester,
|
||||
events_and_ctx: List[Tuple[EventBase, EventContext]],
|
||||
ratelimit: bool = True,
|
||||
) -> EventBase:
|
||||
"""
|
||||
Process a batch of room creation events. For each event in the list it checks
|
||||
the authorization and that the event can be serialized. Returns the last event in the
|
||||
list once it has been persisted.
|
||||
Args:
|
||||
requester: the room creator
|
||||
events_and_ctx: a set of events and their associated contexts to persist
|
||||
ratelimit: whether to ratelimit this request
|
||||
"""
|
||||
for event, context in events_and_ctx:
|
||||
try:
|
||||
validate_event_for_room_version(event)
|
||||
await self._event_auth_handler.check_auth_rules_from_context(
|
||||
event, context
|
||||
)
|
||||
except AuthError as err:
|
||||
logger.warning("Denying new event %r because %s", event, err)
|
||||
raise err
|
||||
|
||||
# Ensure that we can round trip before trying to persist in db
|
||||
try:
|
||||
dump = json_encoder.encode(event.content)
|
||||
json_decoder.decode(dump)
|
||||
except Exception:
|
||||
logger.exception("Failed to encode content: %r", event.content)
|
||||
raise
|
||||
|
||||
# We now persist the events
|
||||
try:
|
||||
result = await self._persist_events_batch(
|
||||
requester, events_and_ctx, ratelimit
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info(f"Encountered an error persisting events: {e}")
|
||||
|
||||
return result
|
||||
|
||||
async def _persist_events_batch(
|
||||
self,
|
||||
requester: Requester,
|
||||
events_and_ctx: List[Tuple[EventBase, EventContext]],
|
||||
ratelimit: bool = True,
|
||||
) -> EventBase:
|
||||
"""
|
||||
Processes the push actions and adds them to the push staging area before attempting to
|
||||
persist the batch of events.
|
||||
See handle_create_room_events for arguments
|
||||
Returns the last event in the list if persisted successfully
|
||||
"""
|
||||
for event, context in events_and_ctx:
|
||||
with opentracing.start_active_span("calculate_push_actions"):
|
||||
await self._bulk_push_rule_evaluator.action_for_event_by_user(
|
||||
event, context
|
||||
)
|
||||
try:
|
||||
# If we're a worker we need to hit out to the master.
|
||||
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
||||
if writer_instance != self._instance_name:
|
||||
try:
|
||||
result = await self.send_events(
|
||||
instance_name=writer_instance,
|
||||
store=self.store,
|
||||
requester=requester,
|
||||
events_and_ctx=events_and_ctx,
|
||||
ratelimit=ratelimit,
|
||||
)
|
||||
except SynapseError as e:
|
||||
if e.code == HTTPStatus.CONFLICT:
|
||||
raise PartialStateConflictError()
|
||||
raise
|
||||
stream_id = result["stream_id"]
|
||||
|
||||
# If we newly persisted the event then we need to update its
|
||||
# stream_ordering entry manually (as it was persisted on
|
||||
# another worker).
|
||||
event.internal_metadata.stream_ordering = stream_id
|
||||
return event
|
||||
|
||||
last_event = await self.persist_and_notify_batched_events(
|
||||
requester, events_and_ctx, ratelimit
|
||||
)
|
||||
except Exception:
|
||||
# Ensure that we actually remove the entries in the push actions
|
||||
# staging area, if we calculated them.
|
||||
for event, _ in events_and_ctx:
|
||||
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||
raise
|
||||
|
||||
return last_event
|
||||
|
||||
async def persist_and_notify_batched_events(
|
||||
self,
|
||||
requester: Requester,
|
||||
events_and_ctx: List[Tuple[EventBase, EventContext]],
|
||||
ratelimit: bool = True,
|
||||
) -> EventBase:
|
||||
"""
|
||||
Handles the actual persisting of a batch of events to the DB, and sends the appropriate
|
||||
notifications when this is done.
|
||||
Args:
|
||||
requester: the room creator
|
||||
events_and_ctx: list of events and their associated contexts to persist
|
||||
ratelimit: whether to apply ratelimiting to this request
|
||||
"""
|
||||
if ratelimit:
|
||||
await self.request_ratelimiter.ratelimit(requester)
|
||||
|
||||
for event, context in events_and_ctx:
|
||||
await self._actions_by_event_type(event, context)
|
||||
|
||||
assert self._storage_controllers.persistence is not None
|
||||
(
|
||||
persisted_events,
|
||||
max_stream_token,
|
||||
) = await self._storage_controllers.persistence.persist_events(events_and_ctx)
|
||||
|
||||
stream_ordering = persisted_events[-1].internal_metadata.stream_ordering
|
||||
assert stream_ordering is not None
|
||||
pos = PersistedEventPosition(self._instance_name, stream_ordering)
|
||||
|
||||
async def _notify() -> None:
|
||||
try:
|
||||
await self.notifier.on_new_room_event(
|
||||
persisted_events[-1], pos, max_stream_token
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Error notifying about new room event %s",
|
||||
event.event_id,
|
||||
)
|
||||
|
||||
run_in_background(_notify)
|
||||
|
||||
return persisted_events[-1]
|
||||
|
||||
@measure_func("handle_new_client_event")
|
||||
async def handle_new_client_event(
|
||||
self,
|
||||
@@ -1568,6 +1794,55 @@ class EventCreationHandler:
|
||||
requester, is_admin_redaction=is_admin_redaction
|
||||
)
|
||||
|
||||
# run checks/actions on event based on type
|
||||
await self._actions_by_event_type(event, context)
|
||||
|
||||
# Mark any `m.historical` messages as backfilled so they don't appear
|
||||
# in `/sync` and have the proper decrementing `stream_ordering` as we import
|
||||
backfilled = False
|
||||
if event.internal_metadata.is_historical():
|
||||
backfilled = True
|
||||
|
||||
# Note that this returns the event that was persisted, which may not be
|
||||
# the same as we passed in if it was deduplicated due transaction IDs.
|
||||
(
|
||||
event,
|
||||
event_pos,
|
||||
max_stream_token,
|
||||
) = await self._storage_controllers.persistence.persist_event(
|
||||
event, context=context, backfilled=backfilled
|
||||
)
|
||||
|
||||
if self._ephemeral_events_enabled:
|
||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
async def _notify() -> None:
|
||||
try:
|
||||
await self.notifier.on_new_room_event(
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Error notifying about new room event %s",
|
||||
event.event_id,
|
||||
)
|
||||
|
||||
run_in_background(_notify)
|
||||
|
||||
if event.type == EventTypes.Message:
|
||||
# We don't want to block sending messages on any presence code. This
|
||||
# matters as sometimes presence code can take a while.
|
||||
run_in_background(self._bump_active_time, requester.user)
|
||||
|
||||
return event
|
||||
|
||||
async def _actions_by_event_type(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
"""
|
||||
Helper function to execute actions/checks based on the event type
|
||||
"""
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
(
|
||||
current_membership,
|
||||
@@ -1588,11 +1863,13 @@ class EventCreationHandler:
|
||||
|
||||
original_event_id = event.unsigned.get("replaces_state")
|
||||
if original_event_id:
|
||||
original_event = await self.store.get_event(original_event_id)
|
||||
original_alias_event = await self.store.get_event(original_event_id)
|
||||
|
||||
if original_event:
|
||||
original_alias = original_event.content.get("alias", None)
|
||||
original_alt_aliases = original_event.content.get("alt_aliases", [])
|
||||
if original_alias_event:
|
||||
original_alias = original_alias_event.content.get("alias", None)
|
||||
original_alt_aliases = original_alias_event.content.get(
|
||||
"alt_aliases", []
|
||||
)
|
||||
|
||||
# Check the alias is currently valid (if it has changed).
|
||||
room_alias_str = event.content.get("alias", None)
|
||||
@@ -1770,46 +2047,6 @@ class EventCreationHandler:
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
# Mark any `m.historical` messages as backfilled so they don't appear
|
||||
# in `/sync` and have the proper decrementing `stream_ordering` as we import
|
||||
backfilled = False
|
||||
if event.internal_metadata.is_historical():
|
||||
backfilled = True
|
||||
|
||||
# Note that this returns the event that was persisted, which may not be
|
||||
# the same as we passed in if it was deduplicated due transaction IDs.
|
||||
(
|
||||
event,
|
||||
event_pos,
|
||||
max_stream_token,
|
||||
) = await self._storage_controllers.persistence.persist_event(
|
||||
event, context=context, backfilled=backfilled
|
||||
)
|
||||
|
||||
if self._ephemeral_events_enabled:
|
||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
async def _notify() -> None:
|
||||
try:
|
||||
await self.notifier.on_new_room_event(
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Error notifying about new room event %s",
|
||||
event.event_id,
|
||||
)
|
||||
|
||||
run_in_background(_notify)
|
||||
|
||||
if event.type == EventTypes.Message:
|
||||
# We don't want to block sending messages on any presence code. This
|
||||
# matters as sometimes presence code can take a while.
|
||||
run_in_background(self._bump_active_time, requester.user)
|
||||
|
||||
return event
|
||||
|
||||
async def _maybe_kick_guest_users(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
|
||||
@@ -108,6 +108,7 @@ class EventContext:
|
||||
class RoomCreationHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastores().main
|
||||
self.state = hs.get_state_handler()
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.auth = hs.get_auth()
|
||||
self.auth_blocking = hs.get_auth_blocking()
|
||||
@@ -119,6 +120,7 @@ class RoomCreationHandler:
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self.config = hs.config
|
||||
self.request_ratelimiter = hs.get_request_ratelimiter()
|
||||
self.builder = hs.get_event_builder_factory()
|
||||
|
||||
# Room state based off defined presets
|
||||
self._presets_dict: Dict[str, Dict[str, Any]] = {
|
||||
@@ -716,7 +718,7 @@ class RoomCreationHandler:
|
||||
|
||||
if (
|
||||
self._server_notices_mxid is not None
|
||||
and requester.user.to_string() == self._server_notices_mxid
|
||||
and user_id == self._server_notices_mxid
|
||||
):
|
||||
# allow the server notices mxid to create rooms
|
||||
is_requester_admin = True
|
||||
@@ -1053,13 +1055,21 @@ class RoomCreationHandler:
|
||||
"""
|
||||
|
||||
creator_id = creator.user.to_string()
|
||||
|
||||
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
|
||||
|
||||
depth = 1
|
||||
|
||||
# the last event sent/persisted to the db
|
||||
last_sent_event_id: Optional[str] = None
|
||||
|
||||
def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
|
||||
# the most recently created event
|
||||
prev_event: List[str] = []
|
||||
|
||||
# a map of event types, state keys -> event_ids. We collect these mappings this
|
||||
# as events are created (but not persisted to the db) to determine state for
|
||||
# future created events (as this info can't be pulled from the db)
|
||||
state_map: dict = {}
|
||||
|
||||
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
|
||||
e = {"type": etype, "content": content}
|
||||
|
||||
e.update(event_keys)
|
||||
@@ -1067,32 +1077,49 @@ class RoomCreationHandler:
|
||||
|
||||
return e
|
||||
|
||||
async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
|
||||
nonlocal last_sent_event_id
|
||||
async def create_event(
|
||||
etype: str,
|
||||
content: JsonDict,
|
||||
**kwargs: Any,
|
||||
) -> EventBase:
|
||||
nonlocal depth
|
||||
nonlocal prev_event
|
||||
|
||||
event = create(etype, content, **kwargs)
|
||||
logger.debug("Sending %s in new room", etype)
|
||||
# Allow these events to be sent even if the user is shadow-banned to
|
||||
# allow the room creation to complete.
|
||||
(
|
||||
sent_event,
|
||||
last_stream_id,
|
||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
event_dict = create_event_dict(etype, content, **kwargs)
|
||||
|
||||
event = await self.event_creation_handler.create_event_for_batch(
|
||||
creator,
|
||||
event,
|
||||
event_dict,
|
||||
prev_event,
|
||||
depth,
|
||||
state_map,
|
||||
)
|
||||
depth += 1
|
||||
prev_event = [event.event_id]
|
||||
state_map[(event.type, event.state_key)] = event.event_id
|
||||
|
||||
return event
|
||||
|
||||
async def send(
|
||||
event: EventBase,
|
||||
context: synapse.events.snapshot.EventContext,
|
||||
creator: Requester,
|
||||
) -> int:
|
||||
nonlocal last_sent_event_id
|
||||
|
||||
ev = await self.event_creation_handler.handle_new_client_event(
|
||||
requester=creator,
|
||||
event=event,
|
||||
context=context,
|
||||
ratelimit=False,
|
||||
ignore_shadow_ban=True,
|
||||
# Note: we don't pass state_event_ids here because this triggers
|
||||
# an additional query per event to look them up from the events table.
|
||||
prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
|
||||
depth=depth,
|
||||
)
|
||||
|
||||
last_sent_event_id = sent_event.event_id
|
||||
depth += 1
|
||||
last_sent_event_id = ev.event_id
|
||||
|
||||
return last_stream_id
|
||||
# we know it was persisted, so must have a stream ordering
|
||||
assert ev.internal_metadata.stream_ordering
|
||||
return ev.internal_metadata.stream_ordering
|
||||
|
||||
try:
|
||||
config = self._presets_dict[preset_config]
|
||||
@@ -1102,9 +1129,15 @@ class RoomCreationHandler:
|
||||
)
|
||||
|
||||
creation_content.update({"creator": creator_id})
|
||||
await send(etype=EventTypes.Create, content=creation_content)
|
||||
creation_event = await create_event(
|
||||
EventTypes.Create,
|
||||
creation_content,
|
||||
)
|
||||
creation_context = await self.state.compute_event_context(creation_event)
|
||||
|
||||
logger.debug("Sending %s in new room", EventTypes.Member)
|
||||
await send(creation_event, creation_context, creator)
|
||||
|
||||
# Room create event must exist at this point
|
||||
assert last_sent_event_id is not None
|
||||
member_event_id, _ = await self.room_member_handler.update_membership(
|
||||
@@ -1118,15 +1151,23 @@ class RoomCreationHandler:
|
||||
prev_event_ids=[last_sent_event_id],
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = member_event_id
|
||||
# last_sent_event_id = member_event_id
|
||||
prev_event = [member_event_id]
|
||||
|
||||
# update the depth and state map here as these are otherwise updated in
|
||||
# 'create_event' the membership event has been created through a different code
|
||||
# path
|
||||
depth += 1
|
||||
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
|
||||
|
||||
# We treat the power levels override specially as this needs to be one
|
||||
# of the first events that get sent into a room.
|
||||
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
|
||||
if pl_content is not None:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.PowerLevels, content=pl_content
|
||||
)
|
||||
power_event = await create_event(EventTypes.PowerLevels, pl_content)
|
||||
power_context = await self.state.compute_event_context(power_event)
|
||||
current_state_group = power_context._state_group
|
||||
await send(power_event, power_context, creator)
|
||||
else:
|
||||
power_level_content: JsonDict = {
|
||||
"users": {creator_id: 100},
|
||||
@@ -1169,48 +1210,92 @@ class RoomCreationHandler:
|
||||
# apply those.
|
||||
if power_level_content_override:
|
||||
power_level_content.update(power_level_content_override)
|
||||
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.PowerLevels, content=power_level_content
|
||||
pl_event = await create_event(
|
||||
EventTypes.PowerLevels,
|
||||
power_level_content,
|
||||
)
|
||||
pl_context = await self.state.compute_event_context(pl_event)
|
||||
current_state_group = pl_context._state_group
|
||||
await send(pl_event, pl_context, creator)
|
||||
|
||||
events_to_send = []
|
||||
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.CanonicalAlias,
|
||||
content={"alias": room_alias.to_string()},
|
||||
room_alias_event = await create_event(
|
||||
EventTypes.CanonicalAlias,
|
||||
{"alias": room_alias.to_string()},
|
||||
)
|
||||
assert current_state_group is not None
|
||||
room_alias_context = await self.state.compute_event_context_for_batched(
|
||||
room_alias_event, state_map, current_state_group
|
||||
)
|
||||
current_state_group = room_alias_context._state_group
|
||||
events_to_send.append((room_alias_event, room_alias_context))
|
||||
|
||||
if (EventTypes.JoinRules, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
|
||||
join_rules_event = await create_event(
|
||||
EventTypes.JoinRules,
|
||||
{"join_rule": config["join_rules"]},
|
||||
)
|
||||
assert current_state_group is not None
|
||||
join_rules_context = await self.state.compute_event_context_for_batched(
|
||||
join_rules_event, state_map, current_state_group
|
||||
)
|
||||
current_state_group = join_rules_context._state_group
|
||||
events_to_send.append((join_rules_event, join_rules_context))
|
||||
|
||||
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.RoomHistoryVisibility,
|
||||
content={"history_visibility": config["history_visibility"]},
|
||||
visibility_event = await create_event(
|
||||
EventTypes.RoomHistoryVisibility,
|
||||
{"history_visibility": config["history_visibility"]},
|
||||
)
|
||||
assert current_state_group is not None
|
||||
visibility_context = await self.state.compute_event_context_for_batched(
|
||||
visibility_event, state_map, current_state_group
|
||||
)
|
||||
current_state_group = visibility_context._state_group
|
||||
events_to_send.append((visibility_event, visibility_context))
|
||||
|
||||
if config["guest_can_join"]:
|
||||
if (EventTypes.GuestAccess, "") not in initial_state:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.GuestAccess,
|
||||
content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
|
||||
guest_access_event = await create_event(
|
||||
EventTypes.GuestAccess,
|
||||
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
|
||||
)
|
||||
assert current_state_group is not None
|
||||
guest_access_context = (
|
||||
await self.state.compute_event_context_for_batched(
|
||||
guest_access_event, state_map, current_state_group
|
||||
)
|
||||
)
|
||||
current_state_group = guest_access_context._state_group
|
||||
events_to_send.append((guest_access_event, guest_access_context))
|
||||
|
||||
for (etype, state_key), content in initial_state.items():
|
||||
last_sent_stream_id = await send(
|
||||
etype=etype, state_key=state_key, content=content
|
||||
event = await create_event(etype, content, state_key=state_key)
|
||||
assert current_state_group is not None
|
||||
context = await self.state.compute_event_context_for_batched(
|
||||
event, state_map, current_state_group
|
||||
)
|
||||
current_state_group = context._state_group
|
||||
events_to_send.append((event, context))
|
||||
|
||||
if config["encrypted"]:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.RoomEncryption,
|
||||
encryption_event = await create_event(
|
||||
EventTypes.RoomEncryption,
|
||||
{"algorithm": RoomEncryptionAlgorithms.DEFAULT},
|
||||
state_key="",
|
||||
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
|
||||
)
|
||||
assert current_state_group is not None
|
||||
encryption_context = await self.state.compute_event_context_for_batched(
|
||||
encryption_event, state_map, current_state_group
|
||||
)
|
||||
events_to_send.append((encryption_event, encryption_context))
|
||||
|
||||
return last_sent_stream_id, last_sent_event_id, depth
|
||||
last_event = await self.event_creation_handler.handle_create_room_events(
|
||||
creator, events_to_send
|
||||
)
|
||||
assert last_event.internal_metadata.stream_ordering is not None
|
||||
return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
|
||||
|
||||
def _generate_room_id(self) -> str:
|
||||
"""Generates a random room ID.
|
||||
|
||||
@@ -25,6 +25,7 @@ from synapse.replication.http import (
|
||||
push,
|
||||
register,
|
||||
send_event,
|
||||
send_events,
|
||||
state,
|
||||
streams,
|
||||
)
|
||||
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
|
||||
|
||||
def register_servlets(self, hs: "HomeServer") -> None:
|
||||
send_event.register_servlets(hs, self)
|
||||
send_events.register_servlets(hs, self)
|
||||
federation.register_servlets(hs, self)
|
||||
presence.register_servlets(hs, self)
|
||||
membership.register_servlets(hs, self)
|
||||
|
||||
165
synapse/replication/http/send_events.py
Normal file
165
synapse/replication/http/send_events.py
Normal file
@@ -0,0 +1,165 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.replication.http._base import ReplicationEndpoint
|
||||
from synapse.types import JsonDict, Requester
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main import DataStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
"""Handles batches of newly created events on workers, including persisting and
|
||||
notifying.
|
||||
|
||||
The API looks like:
|
||||
|
||||
POST /_synapse/replication/send_events/:txn_id
|
||||
|
||||
{
|
||||
"events": [{
|
||||
"event": { .. serialized event .. },
|
||||
"room_version": .., // "1", "2", "3", etc: the version of the room
|
||||
// containing the event
|
||||
"event_format_version": .., // 1,2,3 etc: the event format version
|
||||
"internal_metadata": { .. serialized internal_metadata .. },
|
||||
"outlier": true|false,
|
||||
"rejected_reason": .., // The event.rejected_reason field
|
||||
"context": { .. serialized event context .. },
|
||||
"requester": { .. serialized requester .. },
|
||||
"ratelimit": true,
|
||||
}]
|
||||
}
|
||||
|
||||
200 OK
|
||||
|
||||
{ "stream_id": 12345, "event_id": "$abcdef..." }
|
||||
|
||||
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
|
||||
context that needs to be recomputed due to the un-partial stating of a room.
|
||||
|
||||
"""
|
||||
|
||||
NAME = "send_events"
|
||||
PATH_ARGS = ()
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
store: "DataStore",
|
||||
events_and_ctx: List[Tuple[EventBase, EventContext]],
|
||||
requester: Requester,
|
||||
ratelimit: bool,
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Args:
|
||||
store
|
||||
requester
|
||||
events_and_ctx
|
||||
ratelimit
|
||||
"""
|
||||
serialized_events = []
|
||||
|
||||
for event, context in events_and_ctx:
|
||||
serialized_context = await context.serialize(event, store)
|
||||
serialized_event = {
|
||||
"event": event.get_pdu_json(),
|
||||
"room_version": event.room_version.identifier,
|
||||
"event_format_version": event.format_version,
|
||||
"internal_metadata": event.internal_metadata.get_dict(),
|
||||
"outlier": event.internal_metadata.is_outlier(),
|
||||
"rejected_reason": event.rejected_reason,
|
||||
"context": serialized_context,
|
||||
"requester": requester.serialize(),
|
||||
"ratelimit": ratelimit,
|
||||
}
|
||||
serialized_events.append(serialized_event)
|
||||
|
||||
payload = {"events": serialized_events}
|
||||
|
||||
return payload
|
||||
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_events_parse"):
|
||||
payload = parse_json_object_from_request(request)
|
||||
events_and_ctx = []
|
||||
events = payload["events"]
|
||||
|
||||
for event_payload in events:
|
||||
event_dict = event_payload["event"]
|
||||
room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
|
||||
internal_metadata = event_payload["internal_metadata"]
|
||||
rejected_reason = event_payload["rejected_reason"]
|
||||
|
||||
event = make_event_from_dict(
|
||||
event_dict, room_ver, internal_metadata, rejected_reason
|
||||
)
|
||||
event.internal_metadata.outlier = event_payload["outlier"]
|
||||
|
||||
requester = Requester.deserialize(
|
||||
self.store, event_payload["requester"]
|
||||
)
|
||||
context = EventContext.deserialize(
|
||||
self._storage_controllers, event_payload["context"]
|
||||
)
|
||||
|
||||
ratelimit = event_payload["ratelimit"]
|
||||
events_and_ctx.append((event, context))
|
||||
|
||||
logger.info(
|
||||
"Got batch of events to send, last ID of batch is: %s, sending into room: %s",
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
)
|
||||
|
||||
last_event = (
|
||||
await self.event_creation_handler.persist_and_notify_batched_events(
|
||||
requester, events_and_ctx, ratelimit
|
||||
)
|
||||
)
|
||||
|
||||
return (
|
||||
200,
|
||||
{
|
||||
"stream_id": last_event.internal_metadata.stream_ordering,
|
||||
"event_id": last_event.event_id,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
ReplicationSendEventsRestServlet(hs).register(http_server)
|
||||
@@ -282,7 +282,6 @@ class StateHandler:
|
||||
RuntimeError if `state_ids_before_event` is not provided and one or more
|
||||
prev events are missing or outliers.
|
||||
"""
|
||||
|
||||
assert not event.internal_metadata.is_outlier()
|
||||
|
||||
#
|
||||
@@ -333,6 +332,7 @@ class StateHandler:
|
||||
logger.debug("calling resolve_state_groups from compute_event_context")
|
||||
# we've already taken into account partial state, so no need to wait for
|
||||
# complete state here.
|
||||
|
||||
entry = await self.resolve_state_groups_for_events(
|
||||
event.room_id,
|
||||
event.prev_event_ids(),
|
||||
@@ -420,6 +420,69 @@ class StateHandler:
|
||||
partial_state=partial_state,
|
||||
)
|
||||
|
||||
async def compute_event_context_for_batched(
|
||||
self,
|
||||
event: EventBase,
|
||||
state_ids_before_event: StateMap[str],
|
||||
current_state_group: int,
|
||||
) -> EventContext:
|
||||
"""
|
||||
Generate an event context for an event that has not yet been persisted to the
|
||||
database. Intended for use with events that are created to be persisted in a batch.
|
||||
Args:
|
||||
event: the event the context is being computed for
|
||||
state_ids_before_event: a state map consisting of the state ids of the events
|
||||
created prior to this event.
|
||||
current_state_group: the current state group before the event.
|
||||
"""
|
||||
state_group_before_event_prev_group = None
|
||||
deltas_to_state_group_before_event = None
|
||||
|
||||
state_group_before_event = current_state_group
|
||||
|
||||
# if the event is not state, we are set
|
||||
if not event.is_state():
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_group=state_group_before_event,
|
||||
state_delta_due_to_event={},
|
||||
prev_group=state_group_before_event_prev_group,
|
||||
delta_ids=deltas_to_state_group_before_event,
|
||||
partial_state=False,
|
||||
)
|
||||
|
||||
# otherwise, we'll need to create a new state group for after the event
|
||||
key = (event.type, event.state_key)
|
||||
|
||||
if state_ids_before_event is not None:
|
||||
replaces = state_ids_before_event.get(key)
|
||||
|
||||
if replaces and replaces != event.event_id:
|
||||
event.unsigned["replaces_state"] = replaces
|
||||
|
||||
delta_ids = {key: event.event_id}
|
||||
|
||||
state_group_after_event = (
|
||||
await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
current_state_ids=None,
|
||||
)
|
||||
)
|
||||
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group=state_group_after_event,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_delta_due_to_event=delta_ids,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
partial_state=False,
|
||||
)
|
||||
|
||||
@measure_func()
|
||||
async def resolve_state_groups_for_events(
|
||||
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
|
||||
|
||||
@@ -222,9 +222,10 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
)
|
||||
self.assertEqual(len(alice_sync_result.joined), 1)
|
||||
self.assertEqual(alice_sync_result.joined[0].room_id, room_id)
|
||||
last_room_creation_event_id = (
|
||||
alice_sync_result.joined[0].timeline.events[-1].event_id
|
||||
)
|
||||
last_room_creation_event_ids = [
|
||||
alice_sync_result.joined[0].timeline.events[-1].event_id,
|
||||
alice_sync_result.joined[0].timeline.events[-2].event_id,
|
||||
]
|
||||
|
||||
# Eve, a ne'er-do-well, registers.
|
||||
eve = self.register_user("eve", "password")
|
||||
@@ -250,7 +251,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
self.hs.get_datastores().main,
|
||||
"get_prev_events_for_room",
|
||||
new_callable=MagicMock,
|
||||
return_value=make_awaitable([last_room_creation_event_id]),
|
||||
return_value=make_awaitable(last_room_creation_event_ids),
|
||||
)
|
||||
with mocked_get_prev_events:
|
||||
self.helper.join(room_id, eve, tok=eve_token)
|
||||
|
||||
@@ -710,7 +710,7 @@ class RoomsCreateTestCase(RoomBase):
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(44, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(36, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_initial_state(self) -> None:
|
||||
# POST with initial_state config key, expect new room id
|
||||
@@ -723,7 +723,7 @@ class RoomsCreateTestCase(RoomBase):
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(50, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(39, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_visibility_key(self) -> None:
|
||||
# POST with visibility config key, expect new room id
|
||||
|
||||
@@ -46,10 +46,9 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||
user_id = UserID("us", "test")
|
||||
our_user = create_requester(user_id)
|
||||
room_creator = self.homeserver.get_room_creation_handler()
|
||||
config = {"preset": "public_chat"}
|
||||
self.room_id = self.get_success(
|
||||
room_creator.create_room(
|
||||
our_user, room_creator._presets_dict["public_chat"], ratelimit=False
|
||||
)
|
||||
room_creator.create_room(our_user, config, ratelimit=False)
|
||||
)[0]["room_id"]
|
||||
|
||||
self.store = self.homeserver.get_datastores().main
|
||||
@@ -99,10 +98,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
# Make sure we actually joined the room
|
||||
self.assertEqual(
|
||||
self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))[0],
|
||||
"$join:test.serv",
|
||||
)
|
||||
res = self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
|
||||
assert "$join:test.serv" in res
|
||||
|
||||
def test_cant_hide_direct_ancestors(self):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user