Compare commits

...

36 Commits

Author SHA1 Message Date
H. Shay
404a6e3d79 add suppport for persisting batched events over replication 2022-09-19 11:49:33 -07:00
H. Shay
99aa2136c2 add fucntions to persist events as a batch, encapsulate some logic in a helper function 2022-09-19 11:48:59 -07:00
H. Shay
d950d99ab5 add some comments and sned events as a batch to be persisted 2022-09-19 11:48:35 -07:00
H. Shay
abc497bae6 Merge branch 'shay/batch_events' of https://github.com/matrix-org/synapse into shay/batch_events 2022-09-19 10:43:03 -07:00
H. Shay
2bb3af5310 fix tests to reflect new reality 2022-09-19 10:11:15 -07:00
H. Shay
fb5ac9208a ditch auth events and pass state map instead 2022-09-19 10:11:14 -07:00
H. Shay
5e72a85f33 split out creating events for batches and add helper methods for duplicated code 2022-09-19 10:11:14 -07:00
H. Shay
b80daa3b2b reduce duplicated code 2022-09-19 10:11:14 -07:00
H. Shay
86d135ac63 fix test to align with new behaviour 2022-09-19 10:11:14 -07:00
H. Shay
f0b65d057b add function to calculate event context without pulling from db 2022-09-19 10:11:14 -07:00
H. Shay
466d2c5dfb create events and contexts seperately 2022-09-19 10:11:14 -07:00
H. Shay
0a91b06ede add function to create events without computing event context 2022-09-19 10:11:14 -07:00
H. Shay
ab297fa0a1 fix newsfragment 2022-09-19 10:11:14 -07:00
H. Shay
b419e55d1e newsfragment 2022-09-19 10:11:14 -07:00
H. Shay
8f8a264fb2 fix tests to accomodate new structure of DAG after room creation 2022-09-19 10:11:14 -07:00
H. Shay
7020aeac52 batch some events to send 2022-09-19 10:11:14 -07:00
H. Shay
d06fcba002 split out creating and sending event 2022-09-19 10:11:14 -07:00
Erik Johnston
69beef22c2 Merge remote-tracking branch 'origin/develop' into shay/batch_events 2022-09-14 11:13:37 +01:00
H. Shay
6630daba47 fix tests to reflect new reality 2022-09-01 14:47:10 -07:00
H. Shay
070f1bb720 ditch auth events and pass state map instead 2022-09-01 14:46:57 -07:00
H. Shay
059746dec4 split out creating events for batches and add helper methods for duplicated code 2022-09-01 14:46:25 -07:00
H. Shay
2d06a39dd0 reduce duplicated code 2022-08-31 15:30:05 -07:00
Shay
172b651832 Merge branch 'develop' into shay/batch_events 2022-08-24 14:18:31 -07:00
H. Shay
b38f3e9217 fix test to align with new behaviour 2022-08-24 14:08:47 -07:00
H. Shay
28eca036bb add function to calculate event context without pulling from db 2022-08-24 14:08:31 -07:00
H. Shay
49686ede02 create events and contexts seperately 2022-08-24 14:08:04 -07:00
H. Shay
af2a5f3893 add function to create events without computing event context 2022-08-24 14:07:36 -07:00
H. Shay
066045f03e Merge branch 'shay/batch_events' of https://github.com/matrix-org/synapse into shay/batch_events 2022-08-11 21:29:58 -07:00
H. Shay
47bd7cff4e fix newsfragment 2022-08-11 21:29:35 -07:00
Shay
0da05c5a2c Merge branch 'develop' into shay/batch_events 2022-08-11 21:21:30 -07:00
H. Shay
740a48de76 newsfragment 2022-08-11 21:20:10 -07:00
H. Shay
a7ed07944f Merge branch 'shay/batch_events' of https://github.com/matrix-org/synapse into shay/batch_events 2022-08-11 21:15:57 -07:00
H. Shay
b8ed35841d fix tests to accomodate new structure of DAG after room creation 2022-08-11 21:15:16 -07:00
H. Shay
e215109b74 batch some events to send 2022-08-11 21:13:59 -07:00
Shay
9adffc2c95 Merge branch 'develop' into shay/batch_events 2022-08-09 09:04:46 -07:00
H. Shay
a08f32f8ed split out creating and sending event 2022-08-04 12:16:26 -07:00
9 changed files with 697 additions and 146 deletions

1
changelog.d/13487.misc Normal file
View File

@@ -0,0 +1 @@
Refactor ` _send_events_for_new_room` to separate creating and sending events.

View File

@@ -56,13 +56,16 @@ from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
Requester,
RoomAlias,
StateMap,
StreamToken,
UserID,
create_requester,
@@ -492,6 +495,7 @@ class EventCreationHandler:
self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
self.send_event = ReplicationSendEventRestServlet.make_client(hs)
self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
self.request_ratelimiter = hs.get_request_ratelimiter()
@@ -627,49 +631,10 @@ class EventCreationHandler:
"""
await self.auth_blocking.check_auth_blocking(requester=requester)
if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
room_version_id = event_dict["content"]["room_version"]
maybe_room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version_id)
if not maybe_room_version_obj:
# this can happen if support is withdrawn for a room version
raise UnsupportedRoomVersionError(room_version_id)
room_version_obj = maybe_room_version_obj
else:
try:
room_version_obj = await self.store.get_room_version(
event_dict["room_id"]
)
except NotFoundError:
raise AuthError(403, "Unknown room")
builder = self.event_builder_factory.for_room_version(
room_version_obj, event_dict
)
self.validator.validate_builder(builder)
builder = await self._get_and_validate_builder(event_dict)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if membership in self.membership_types_to_include_profile_data_in:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
try:
if "displayname" not in content:
displayname = await profile.get_displayname(target)
if displayname is not None:
content["displayname"] = displayname
if "avatar_url" not in content:
avatar_url = await profile.get_avatar_url(target)
if avatar_url is not None:
content["avatar_url"] = avatar_url
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s", target, e
)
await self._build_profile_data(builder)
is_exempt = await self._is_exempt_from_privacy_policy(builder, requester)
if require_consent and not is_exempt:
@@ -735,6 +700,126 @@ class EventCreationHandler:
return event, context
async def create_event_for_batch(
self,
requester: Requester,
event_dict: dict,
prev_event_ids: List[str],
depth: int,
state_map: StateMap,
txn_id: Optional[str] = None,
require_consent: bool = True,
outlier: bool = False,
) -> EventBase:
"""
Given a dict from a client, create a new event. Notably does not create an event
context. Adds display names to Join membership events.
Args:
requester
event_dict: An entire event
txn_id
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
state_map: a state_map of previously created events for batching. Will be used
to calculate the auth_ids for the event, as the previously created events for
batching will not yet have been persisted to the db
require_consent: Whether to check if the requester has
consented to the privacy policy.
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
depth: Override the depth used to order the event in the DAG.
Returns:
the created event
"""
await self.auth_blocking.check_auth_blocking(requester=requester)
builder = await self._get_and_validate_builder(event_dict)
if builder.type == EventTypes.Member:
await self._build_profile_data(builder)
is_exempt = await self._is_exempt_from_privacy_policy(builder, requester)
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
if requester.access_token_id is not None:
builder.internal_metadata.token_id = requester.access_token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
builder.internal_metadata.outlier = outlier
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_ids,
depth=depth,
)
# Pass on the outlier property from the builder to the event
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
self.validator.validate_new(event, self.config)
return event
async def _build_profile_data(self, builder: EventBuilder) -> None:
"""
Helper method to add profile information to membership event
"""
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if membership in self.membership_types_to_include_profile_data_in:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
try:
if "displayname" not in content:
displayname = await profile.get_displayname(target)
if displayname is not None:
content["displayname"] = displayname
if "avatar_url" not in content:
avatar_url = await profile.get_avatar_url(target)
if avatar_url is not None:
content["avatar_url"] = avatar_url
except Exception as e:
logger.info("Failed to get profile information for %r: %s", target, e)
async def _get_and_validate_builder(self, event_dict: dict) -> EventBuilder:
"""
Helper method to create and validate a builder object when creating an event
"""
if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
room_version_id = event_dict["content"]["room_version"]
maybe_room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version_id)
if not maybe_room_version_obj:
# this can happen if support is withdrawn for a room version
raise UnsupportedRoomVersionError(room_version_id)
room_version_obj = maybe_room_version_obj
else:
try:
room_version_obj = await self.store.get_room_version(
event_dict["room_id"]
)
except NotFoundError:
raise AuthError(403, "Unknown room")
builder = self.event_builder_factory.for_room_version(
room_version_obj, event_dict
)
self.validator.validate_builder(builder)
return builder
async def _is_exempt_from_privacy_policy(
self, builder: EventBuilder, requester: Requester
) -> bool:
@@ -1234,6 +1319,147 @@ class EventCreationHandler:
400, "Cannot start threads from an event with a relation"
)
async def handle_create_room_events(
self,
requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
"""
Process a batch of room creation events. For each event in the list it checks
the authorization and that the event can be serialized. Returns the last event in the
list once it has been persisted.
Args:
requester: the room creator
events_and_ctx: a set of events and their associated contexts to persist
ratelimit: whether to ratelimit this request
"""
for event, context in events_and_ctx:
try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(
event, context
)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
raise err
# Ensure that we can round trip before trying to persist in db
try:
dump = json_encoder.encode(event.content)
json_decoder.decode(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
# We now persist the events
try:
result = await self._persist_events_batch(
requester, events_and_ctx, ratelimit
)
except Exception as e:
logger.info(f"Encountered an error persisting events: {e}")
return result
async def _persist_events_batch(
self,
requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
"""
Processes the push actions and adds them to the push staging area before attempting to
persist the batch of events.
See handle_create_room_events for arguments
Returns the last event in the list if persisted successfully
"""
for event, context in events_and_ctx:
with opentracing.start_active_span("calculate_push_actions"):
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
try:
# If we're a worker we need to hit out to the master.
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
try:
result = await self.send_events(
instance_name=writer_instance,
store=self.store,
requester=requester,
events_and_ctx=events_and_ctx,
ratelimit=ratelimit,
)
except SynapseError as e:
if e.code == HTTPStatus.CONFLICT:
raise PartialStateConflictError()
raise
stream_id = result["stream_id"]
# If we newly persisted the event then we need to update its
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
return event
last_event = await self.persist_and_notify_batched_events(
requester, events_and_ctx, ratelimit
)
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
for event, _ in events_and_ctx:
await self.store.remove_push_actions_from_staging(event.event_id)
raise
return last_event
async def persist_and_notify_batched_events(
self,
requester: Requester,
events_and_ctx: List[Tuple[EventBase, EventContext]],
ratelimit: bool = True,
) -> EventBase:
"""
Handles the actual persisting of a batch of events to the DB, and sends the appropriate
notifications when this is done.
Args:
requester: the room creator
events_and_ctx: list of events and their associated contexts to persist
ratelimit: whether to apply ratelimiting to this request
"""
if ratelimit:
await self.request_ratelimiter.ratelimit(requester)
for event, context in events_and_ctx:
await self._actions_by_event_type(event, context)
assert self._storage_controllers.persistence is not None
(
persisted_events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(events_and_ctx)
stream_ordering = persisted_events[-1].internal_metadata.stream_ordering
assert stream_ordering is not None
pos = PersistedEventPosition(self._instance_name, stream_ordering)
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
persisted_events[-1], pos, max_stream_token
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
return persisted_events[-1]
@measure_func("handle_new_client_event")
async def handle_new_client_event(
self,
@@ -1568,6 +1794,55 @@ class EventCreationHandler:
requester, is_admin_redaction=is_admin_redaction
)
# run checks/actions on event based on type
await self._actions_by_event_type(event, context)
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self._storage_controllers.persistence.persist_event(
event, context=context, backfilled=backfilled
)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
return event
async def _actions_by_event_type(
self, event: EventBase, context: EventContext
) -> None:
"""
Helper function to execute actions/checks based on the event type
"""
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
@@ -1588,11 +1863,13 @@ class EventCreationHandler:
original_event_id = event.unsigned.get("replaces_state")
if original_event_id:
original_event = await self.store.get_event(original_event_id)
original_alias_event = await self.store.get_event(original_event_id)
if original_event:
original_alias = original_event.content.get("alias", None)
original_alt_aliases = original_event.content.get("alt_aliases", [])
if original_alias_event:
original_alias = original_alias_event.content.get("alias", None)
original_alt_aliases = original_alias_event.content.get(
"alt_aliases", []
)
# Check the alias is currently valid (if it has changed).
room_alias_str = event.content.get("alias", None)
@@ -1770,46 +2047,6 @@ class EventCreationHandler:
errcode=Codes.INVALID_PARAM,
)
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self._storage_controllers.persistence.persist_event(
event, context=context, backfilled=backfilled
)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
return event
async def _maybe_kick_guest_users(
self, event: EventBase, context: EventContext
) -> None:

View File

@@ -108,6 +108,7 @@ class EventContext:
class RoomCreationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.state = hs.get_state_handler()
self._storage_controllers = hs.get_storage_controllers()
self.auth = hs.get_auth()
self.auth_blocking = hs.get_auth_blocking()
@@ -119,6 +120,7 @@ class RoomCreationHandler:
self._event_auth_handler = hs.get_event_auth_handler()
self.config = hs.config
self.request_ratelimiter = hs.get_request_ratelimiter()
self.builder = hs.get_event_builder_factory()
# Room state based off defined presets
self._presets_dict: Dict[str, Dict[str, Any]] = {
@@ -716,7 +718,7 @@ class RoomCreationHandler:
if (
self._server_notices_mxid is not None
and requester.user.to_string() == self._server_notices_mxid
and user_id == self._server_notices_mxid
):
# allow the server notices mxid to create rooms
is_requester_admin = True
@@ -1053,13 +1055,21 @@ class RoomCreationHandler:
"""
creator_id = creator.user.to_string()
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
depth = 1
# the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None
def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
# the most recently created event
prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this
# as events are created (but not persisted to the db) to determine state for
# future created events (as this info can't be pulled from the db)
state_map: dict = {}
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}
e.update(event_keys)
@@ -1067,32 +1077,49 @@ class RoomCreationHandler:
return e
async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
nonlocal last_sent_event_id
async def create_event(
etype: str,
content: JsonDict,
**kwargs: Any,
) -> EventBase:
nonlocal depth
nonlocal prev_event
event = create(etype, content, **kwargs)
logger.debug("Sending %s in new room", etype)
# Allow these events to be sent even if the user is shadow-banned to
# allow the room creation to complete.
(
sent_event,
last_stream_id,
) = await self.event_creation_handler.create_and_send_nonmember_event(
event_dict = create_event_dict(etype, content, **kwargs)
event = await self.event_creation_handler.create_event_for_batch(
creator,
event,
event_dict,
prev_event,
depth,
state_map,
)
depth += 1
prev_event = [event.event_id]
state_map[(event.type, event.state_key)] = event.event_id
return event
async def send(
event: EventBase,
context: synapse.events.snapshot.EventContext,
creator: Requester,
) -> int:
nonlocal last_sent_event_id
ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
event=event,
context=context,
ratelimit=False,
ignore_shadow_ban=True,
# Note: we don't pass state_event_ids here because this triggers
# an additional query per event to look them up from the events table.
prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
depth=depth,
)
last_sent_event_id = sent_event.event_id
depth += 1
last_sent_event_id = ev.event_id
return last_stream_id
# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
return ev.internal_metadata.stream_ordering
try:
config = self._presets_dict[preset_config]
@@ -1102,9 +1129,15 @@ class RoomCreationHandler:
)
creation_content.update({"creator": creator_id})
await send(etype=EventTypes.Create, content=creation_content)
creation_event = await create_event(
EventTypes.Create,
creation_content,
)
creation_context = await self.state.compute_event_context(creation_event)
logger.debug("Sending %s in new room", EventTypes.Member)
await send(creation_event, creation_context, creator)
# Room create event must exist at this point
assert last_sent_event_id is not None
member_event_id, _ = await self.room_member_handler.update_membership(
@@ -1118,15 +1151,23 @@ class RoomCreationHandler:
prev_event_ids=[last_sent_event_id],
depth=depth,
)
last_sent_event_id = member_event_id
# last_sent_event_id = member_event_id
prev_event = [member_event_id]
# update the depth and state map here as these are otherwise updated in
# 'create_event' the membership event has been created through a different code
# path
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
last_sent_stream_id = await send(
etype=EventTypes.PowerLevels, content=pl_content
)
power_event = await create_event(EventTypes.PowerLevels, pl_content)
power_context = await self.state.compute_event_context(power_event)
current_state_group = power_context._state_group
await send(power_event, power_context, creator)
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
@@ -1169,48 +1210,92 @@ class RoomCreationHandler:
# apply those.
if power_level_content_override:
power_level_content.update(power_level_content_override)
last_sent_stream_id = await send(
etype=EventTypes.PowerLevels, content=power_level_content
pl_event = await create_event(
EventTypes.PowerLevels,
power_level_content,
)
pl_context = await self.state.compute_event_context(pl_event)
current_state_group = pl_context._state_group
await send(pl_event, pl_context, creator)
events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.CanonicalAlias,
content={"alias": room_alias.to_string()},
room_alias_event = await create_event(
EventTypes.CanonicalAlias,
{"alias": room_alias.to_string()},
)
assert current_state_group is not None
room_alias_context = await self.state.compute_event_context_for_batched(
room_alias_event, state_map, current_state_group
)
current_state_group = room_alias_context._state_group
events_to_send.append((room_alias_event, room_alias_context))
if (EventTypes.JoinRules, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
join_rules_event = await create_event(
EventTypes.JoinRules,
{"join_rule": config["join_rules"]},
)
assert current_state_group is not None
join_rules_context = await self.state.compute_event_context_for_batched(
join_rules_event, state_map, current_state_group
)
current_state_group = join_rules_context._state_group
events_to_send.append((join_rules_event, join_rules_context))
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.RoomHistoryVisibility,
content={"history_visibility": config["history_visibility"]},
visibility_event = await create_event(
EventTypes.RoomHistoryVisibility,
{"history_visibility": config["history_visibility"]},
)
assert current_state_group is not None
visibility_context = await self.state.compute_event_context_for_batched(
visibility_event, state_map, current_state_group
)
current_state_group = visibility_context._state_group
events_to_send.append((visibility_event, visibility_context))
if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.GuestAccess,
content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
guest_access_event = await create_event(
EventTypes.GuestAccess,
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
)
assert current_state_group is not None
guest_access_context = (
await self.state.compute_event_context_for_batched(
guest_access_event, state_map, current_state_group
)
)
current_state_group = guest_access_context._state_group
events_to_send.append((guest_access_event, guest_access_context))
for (etype, state_key), content in initial_state.items():
last_sent_stream_id = await send(
etype=etype, state_key=state_key, content=content
event = await create_event(etype, content, state_key=state_key)
assert current_state_group is not None
context = await self.state.compute_event_context_for_batched(
event, state_map, current_state_group
)
current_state_group = context._state_group
events_to_send.append((event, context))
if config["encrypted"]:
last_sent_stream_id = await send(
etype=EventTypes.RoomEncryption,
encryption_event = await create_event(
EventTypes.RoomEncryption,
{"algorithm": RoomEncryptionAlgorithms.DEFAULT},
state_key="",
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
)
assert current_state_group is not None
encryption_context = await self.state.compute_event_context_for_batched(
encryption_event, state_map, current_state_group
)
events_to_send.append((encryption_event, encryption_context))
return last_sent_stream_id, last_sent_event_id, depth
last_event = await self.event_creation_handler.handle_create_room_events(
creator, events_to_send
)
assert last_event.internal_metadata.stream_ordering is not None
return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
def _generate_room_id(self) -> str:
"""Generates a random room ID.

View File

@@ -25,6 +25,7 @@ from synapse.replication.http import (
push,
register,
send_event,
send_events,
state,
streams,
)
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs: "HomeServer") -> None:
send_event.register_servlets(hs, self)
send_events.register_servlets(hs, self)
federation.register_servlets(hs, self)
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)

View File

@@ -0,0 +1,165 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Tuple
from twisted.web.server import Request
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.http.server import HttpServer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, Requester
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.main import DataStore
logger = logging.getLogger(__name__)
class ReplicationSendEventsRestServlet(ReplicationEndpoint):
"""Handles batches of newly created events on workers, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/send_events/:txn_id
{
"events": [{
"event": { .. serialized event .. },
"room_version": .., // "1", "2", "3", etc: the version of the room
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
"ratelimit": true,
}]
}
200 OK
{ "stream_id": 12345, "event_id": "$abcdef..." }
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
context that needs to be recomputed due to the un-partial stating of a room.
"""
NAME = "send_events"
PATH_ARGS = ()
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload( # type: ignore[override]
store: "DataStore",
events_and_ctx: List[Tuple[EventBase, EventContext]],
requester: Requester,
ratelimit: bool,
) -> JsonDict:
"""
Args:
store
requester
events_and_ctx
ratelimit
"""
serialized_events = []
for event, context in events_and_ctx:
serialized_context = await context.serialize(event, store)
serialized_event = {
"event": event.get_pdu_json(),
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
}
serialized_events.append(serialized_event)
payload = {"events": serialized_events}
return payload
async def _handle_request( # type: ignore[override]
self, request: Request
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_events_parse"):
payload = parse_json_object_from_request(request)
events_and_ctx = []
events = payload["events"]
for event_payload in events:
event_dict = event_payload["event"]
room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
event.internal_metadata.outlier = event_payload["outlier"]
requester = Requester.deserialize(
self.store, event_payload["requester"]
)
context = EventContext.deserialize(
self._storage_controllers, event_payload["context"]
)
ratelimit = event_payload["ratelimit"]
events_and_ctx.append((event, context))
logger.info(
"Got batch of events to send, last ID of batch is: %s, sending into room: %s",
event.event_id,
event.room_id,
)
last_event = (
await self.event_creation_handler.persist_and_notify_batched_events(
requester, events_and_ctx, ratelimit
)
)
return (
200,
{
"stream_id": last_event.internal_metadata.stream_ordering,
"event_id": last_event.event_id,
},
)
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationSendEventsRestServlet(hs).register(http_server)

View File

@@ -282,7 +282,6 @@ class StateHandler:
RuntimeError if `state_ids_before_event` is not provided and one or more
prev events are missing or outliers.
"""
assert not event.internal_metadata.is_outlier()
#
@@ -333,6 +332,7 @@ class StateHandler:
logger.debug("calling resolve_state_groups from compute_event_context")
# we've already taken into account partial state, so no need to wait for
# complete state here.
entry = await self.resolve_state_groups_for_events(
event.room_id,
event.prev_event_ids(),
@@ -420,6 +420,69 @@ class StateHandler:
partial_state=partial_state,
)
async def compute_event_context_for_batched(
self,
event: EventBase,
state_ids_before_event: StateMap[str],
current_state_group: int,
) -> EventContext:
"""
Generate an event context for an event that has not yet been persisted to the
database. Intended for use with events that are created to be persisted in a batch.
Args:
event: the event the context is being computed for
state_ids_before_event: a state map consisting of the state ids of the events
created prior to this event.
current_state_group: the current state group before the event.
"""
state_group_before_event_prev_group = None
deltas_to_state_group_before_event = None
state_group_before_event = current_state_group
# if the event is not state, we are set
if not event.is_state():
return EventContext.with_state(
storage=self._storage_controllers,
state_group_before_event=state_group_before_event,
state_group=state_group_before_event,
state_delta_due_to_event={},
prev_group=state_group_before_event_prev_group,
delta_ids=deltas_to_state_group_before_event,
partial_state=False,
)
# otherwise, we'll need to create a new state group for after the event
key = (event.type, event.state_key)
if state_ids_before_event is not None:
replaces = state_ids_before_event.get(key)
if replaces and replaces != event.event_id:
event.unsigned["replaces_state"] = replaces
delta_ids = {key: event.event_id}
state_group_after_event = (
await self._state_storage_controller.store_state_group(
event.event_id,
event.room_id,
prev_group=state_group_before_event,
delta_ids=delta_ids,
current_state_ids=None,
)
)
return EventContext.with_state(
storage=self._storage_controllers,
state_group=state_group_after_event,
state_group_before_event=state_group_before_event,
state_delta_due_to_event=delta_ids,
prev_group=state_group_before_event,
delta_ids=delta_ids,
partial_state=False,
)
@measure_func()
async def resolve_state_groups_for_events(
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True

View File

@@ -222,9 +222,10 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
)
self.assertEqual(len(alice_sync_result.joined), 1)
self.assertEqual(alice_sync_result.joined[0].room_id, room_id)
last_room_creation_event_id = (
alice_sync_result.joined[0].timeline.events[-1].event_id
)
last_room_creation_event_ids = [
alice_sync_result.joined[0].timeline.events[-1].event_id,
alice_sync_result.joined[0].timeline.events[-2].event_id,
]
# Eve, a ne'er-do-well, registers.
eve = self.register_user("eve", "password")
@@ -250,7 +251,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.hs.get_datastores().main,
"get_prev_events_for_room",
new_callable=MagicMock,
return_value=make_awaitable([last_room_creation_event_id]),
return_value=make_awaitable(last_room_creation_event_ids),
)
with mocked_get_prev_events:
self.helper.join(room_id, eve, tok=eve_token)

View File

@@ -710,7 +710,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
self.assertEqual(44, channel.resource_usage.db_txn_count)
self.assertEqual(36, channel.resource_usage.db_txn_count)
def test_post_room_initial_state(self) -> None:
# POST with initial_state config key, expect new room id
@@ -723,7 +723,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
self.assertEqual(50, channel.resource_usage.db_txn_count)
self.assertEqual(39, channel.resource_usage.db_txn_count)
def test_post_room_visibility_key(self) -> None:
# POST with visibility config key, expect new room id

View File

@@ -46,10 +46,9 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
user_id = UserID("us", "test")
our_user = create_requester(user_id)
room_creator = self.homeserver.get_room_creation_handler()
config = {"preset": "public_chat"}
self.room_id = self.get_success(
room_creator.create_room(
our_user, room_creator._presets_dict["public_chat"], ratelimit=False
)
room_creator.create_room(our_user, config, ratelimit=False)
)[0]["room_id"]
self.store = self.homeserver.get_datastores().main
@@ -99,10 +98,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
)
# Make sure we actually joined the room
self.assertEqual(
self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))[0],
"$join:test.serv",
)
res = self.get_success(self.store.get_latest_event_ids_in_room(self.room_id))
assert "$join:test.serv" in res
def test_cant_hide_direct_ancestors(self):
"""