Compare commits

...

23 Commits

Author SHA1 Message Date
H. Shay
fd98493c86 remove duplicated code and update comment 2022-11-08 12:19:42 -08:00
H. Shay
9602493c13 Merge branch 'shay/batch_state_groups' of https://github.com/matrix-org/synapse into shay/batch_state_groups 2022-11-08 11:44:36 -08:00
H. Shay
143211fa26 lint 2022-11-08 11:44:26 -08:00
Shay
4df01d605e Merge branch 'develop' into shay/batch_state_groups 2022-11-08 11:35:39 -08:00
H. Shay
060c1a153c remove unnecessary event context method 2022-11-08 11:32:25 -08:00
H. Shay
06a3b9a921 call compute event context for batched from room code 2022-11-08 11:29:23 -08:00
H. Shay
8cd1196c4b update compute_event_context_for_batched to take list and assign state groups 2022-11-08 11:28:56 -08:00
H. Shay
b840328d36 create a skeleton context and update comments 2022-11-08 11:27:56 -08:00
H. Shay
64e16fef76 fix test 2022-11-04 13:50:02 -07:00
H. Shay
0da772fd34 fix newsfrag 2022-11-04 13:19:40 -07:00
H. Shay
1ba3c5e15a misc cleanup 2022-11-04 12:03:46 -07:00
H. Shay
8b4875423e fix bug 2022-11-04 10:58:23 -07:00
H. Shay
3c90093de3 Merge branch 'develop' into shay/batch_state_groups 2022-11-03 12:57:59 -07:00
H. Shay
b0aa16ddfb newsfragment 2022-11-03 12:33:16 -07:00
H. Shay
d6211b5547 lints 2022-11-03 12:29:13 -07:00
H. Shay
5ef29eee8a update to pass in current state map when checking batched events 2022-11-03 12:26:35 -07:00
H. Shay
bdfe7fb2e2 update create room code to store state groups after all events/context to be batched have been created 2022-11-03 12:25:34 -07:00
H. Shay
badf303eb2 update functions to no longer take a current state event as parameter 2022-11-03 12:24:49 -07:00
H. Shay
84be413b2f update function 2022-11-03 12:24:19 -07:00
H. Shay
beebb6db4a add a function to store state groups for batched events/contexts 2022-11-03 12:23:50 -07:00
H. Shay
5a1decd5be modify compute_event_context_for_batched to no longer store state groups 2022-11-03 12:22:49 -07:00
H. Shay
63f8ee4007 Merge branch 'develop' into shay/batch_state_groups 2022-10-20 14:22:21 -07:00
H. Shay
f07e9af194 add version of eventcontext without state group 2022-10-11 15:42:49 -07:00
8 changed files with 189 additions and 91 deletions

1
changelog.d/14371.misc Normal file
View File

@@ -0,0 +1 @@
Batch up state groups to store when creating initial room events.

View File

@@ -231,7 +231,11 @@ class ThirdPartyEventRules:
self._on_threepid_bind_callbacks.append(on_threepid_bind)
async def check_event_allowed(
self, event: EventBase, context: EventContext
self,
event: EventBase,
context: EventContext,
for_batch: bool = False,
state_map: Optional[StateMap[str]] = None,
) -> Tuple[bool, Optional[dict]]:
"""Check if a provided event should be allowed in the given context.
@@ -253,7 +257,11 @@ class ThirdPartyEventRules:
if len(self._check_event_allowed_callbacks) == 0:
return True, None
prev_state_ids = await context.get_prev_state_ids()
if for_batch:
assert state_map is not None
prev_state_ids = state_map
else:
prev_state_ids = await context.get_prev_state_ids()
# Retrieve the state events from the database.
events = await self.store.get_events(prev_state_ids.values())

View File

@@ -573,15 +573,15 @@ class EventCreationHandler:
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Given a dict from a client, create a new event. If bool for_batch is true, will
create an event using the prev_event_ids, and will create an event context for
the event using the parameters state_map and current_state_group, thus these parameters
must be provided in this case if for_batch is True. The subsequently created event
and context are suitable for being batched up and bulk persisted to the database
with other similarly created events.
create an event using the prev_event_ids, and will create an event empty context for
the event using the parameter state_map, thus this parameter must be provided
if for_batch is True. Please note that the caller is then responsible for updating
the state group info in the event context (by calling compute_event_context_for_batched).
The subsequently created event and context are suitable for being batched up and
bulk persisted to the database with other similarly created events.
Creates an FrozenEvent object, filling out auth_events, prev_events,
etc.
@@ -636,10 +636,8 @@ class EventCreationHandler:
state_map: A state map of previously created events, used only when creating events
for batch persisting
for_batch: whether the event is being created for batch persisting to the db
current_state_group: the current state group, used only for creating events for
batch persisting
for_batch: whether the event is being created for batch persisting to the db.
If true, both prev_event_ids and state map must be provided
Raises:
ResourceLimitError if server is blocked to some resource being
@@ -718,7 +716,6 @@ class EventCreationHandler:
depth=depth,
state_map=state_map,
for_batch=for_batch,
current_state_group=current_state_group,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -1069,14 +1066,13 @@ class EventCreationHandler:
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client. If bool for_batch is true, will
create an event using the prev_event_ids, and will create an event context for
the event using the parameters state_map and current_state_group, thus these parameters
must be provided in this case if for_batch is True. The subsequently created event
and context are suitable for being batched up and bulk persisted to the database
with other similarly created events.
create an event using the prev_event_ids, and will create an empty event context.
Please note that the caller is then responsible for updating the event context
with state group information (by calling compute_event_context_for_batched).
The subsequently created event and context are suitable for being batched up
and bulk persisted to the database with other similarly created events.
Args:
builder:
@@ -1112,10 +1108,8 @@ class EventCreationHandler:
state_map: A state map of previously created events, used only when creating events
for batch persisting
for_batch: whether the event is being created for batch persisting to the db
current_state_group: the current state group, used only for creating events for
batch persisting
for_batch: whether the event is being created for batch persisting to the db.
If for batch is true, both prev_event_ids and state_map must be provided
Returns:
Tuple of created event, context
@@ -1172,14 +1166,11 @@ class EventCreationHandler:
if for_batch:
assert prev_event_ids is not None
assert state_map is not None
assert current_state_group is not None
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
event = await builder.build(
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
)
context = await self.state.compute_event_context_for_batched(
event, state_map, current_state_group
)
context = EventContext(self._storage_controllers)
else:
event = await builder.build(
prev_event_ids=prev_event_ids,
@@ -1244,7 +1235,7 @@ class EventCreationHandler:
context.app_service = requester.app_service
res, new_content = await self.third_party_event_rules.check_event_allowed(
event, context
event, context, for_batch=for_batch, state_map=state_map
)
if res is False:
logger.info(

View File

@@ -1062,9 +1062,6 @@ class RoomCreationHandler:
# created (but not persisted to the db) to determine state for future created events
# (as this info can't be pulled from the db)
state_map: MutableStateMap[str] = {}
# current_state_group of last event created. Used for computing event context of
# events to be batched
current_state_group = None
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}
@@ -1105,7 +1102,6 @@ class RoomCreationHandler:
depth=depth,
state_map=state_map,
for_batch=for_batch,
current_state_group=current_state_group,
)
depth += 1
prev_event = [new_event.event_id]
@@ -1168,7 +1164,6 @@ class RoomCreationHandler:
power_event, power_context = await create_event(
EventTypes.PowerLevels, pl_content, True
)
current_state_group = power_context._state_group
events_to_send.append((power_event, power_context))
else:
power_level_content: JsonDict = {
@@ -1217,14 +1212,12 @@ class RoomCreationHandler:
power_level_content,
True,
)
current_state_group = pl_context._state_group
events_to_send.append((pl_event, pl_context))
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
room_alias_event, room_alias_context = await create_event(
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
)
current_state_group = room_alias_context._state_group
events_to_send.append((room_alias_event, room_alias_context))
if (EventTypes.JoinRules, "") not in initial_state:
@@ -1233,7 +1226,6 @@ class RoomCreationHandler:
{"join_rule": config["join_rules"]},
True,
)
current_state_group = join_rules_context._state_group
events_to_send.append((join_rules_event, join_rules_context))
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
@@ -1242,7 +1234,6 @@ class RoomCreationHandler:
{"history_visibility": config["history_visibility"]},
True,
)
current_state_group = visibility_context._state_group
events_to_send.append((visibility_event, visibility_context))
if config["guest_can_join"]:
@@ -1252,14 +1243,12 @@ class RoomCreationHandler:
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
True,
)
current_state_group = guest_access_context._state_group
events_to_send.append((guest_access_event, guest_access_context))
for (etype, state_key), content in initial_state.items():
event, context = await create_event(
etype, content, True, state_key=state_key
)
current_state_group = context._state_group
events_to_send.append((event, context))
if config["encrypted"]:
@@ -1271,6 +1260,12 @@ class RoomCreationHandler:
)
events_to_send.append((encryption_event, encryption_context))
# update event contexts with state group information
state = self.hs.get_state_handler()
events_to_send = await state.compute_event_context_for_batched(
events_to_send, current_state_group, state_map
)
last_event = await self.event_creation_handler.handle_new_client_event(
creator,
events_to_send,

View File

@@ -422,66 +422,57 @@ class StateHandler:
async def compute_event_context_for_batched(
self,
event: EventBase,
state_ids_before_event: StateMap[str],
current_state_group: int,
) -> EventContext:
events_and_context: List[Tuple[EventBase, EventContext]],
prev_group: int,
state_ids_before_event: StateMap,
) -> List[Tuple[EventBase, EventContext]]:
"""
Generate an event context for an event that has not yet been persisted to the
database. Intended for use with events that are created to be persisted in a batch.
Args:
event: the event the context is being computed for
state_ids_before_event: a state map consisting of the state ids of the events
created prior to this event.
current_state_group: the current state group before the event.
events_and_context: a list of events and their associated contexts
prev_group: the state group of the last event persisted before the batched events
were created
state_ids_before_event: a state map consisting of current state ids
"""
state_group_before_event_prev_group = None
deltas_to_state_group_before_event = None
# separate out state and non-state contexts
state_events = []
for event, context in events_and_context:
if event.is_state():
state_events.append((event, context))
state_group_before_event = current_state_group
# get state groups for state events
room_id = events_and_context[0][0].room_id
assert self.hs.datastores is not None
await self.hs.datastores.state.store_state_deltas_for_batched(
state_events, room_id, prev_group=prev_group
)
# if the event is not state, we are set
if not event.is_state():
return EventContext.with_state(
storage=self._storage_controllers,
state_group_before_event=state_group_before_event,
state_group=state_group_before_event,
state_delta_due_to_event={},
prev_group=state_group_before_event_prev_group,
delta_ids=deltas_to_state_group_before_event,
partial_state=False,
)
# iterate through all contexts and update everything
current_state_group = prev_group
for event, context in events_and_context:
# otherwise, we'll need to create a new state group for after the event
key = (event.type, event.state_key)
# if the event is not state, we need to update it
if not event.is_state():
context._state_group = current_state_group
context.state_group_before_event = current_state_group
context._state_delta_due_to_event = {}
context.prev_group = None
context.delta_ids = None
context.partial_state = False
if state_ids_before_event is not None:
# the context should have been updated when storing the state groups but let's
# be sure - if it does not have a state group there is a problem
if context._state_group is None:
raise RuntimeError(f"Event {event.event_id} is missing a state group.")
current_state_group = context._state_group
key = (event.type, event.state_key)
replaces = state_ids_before_event.get(key)
if replaces and replaces != event.event_id:
event.unsigned["replaces_state"] = replaces
if replaces and replaces != event.event_id:
event.unsigned["replaces_state"] = replaces
delta_ids = {key: event.event_id}
state_group_after_event = (
await self._state_storage_controller.store_state_group(
event.event_id,
event.room_id,
prev_group=state_group_before_event,
delta_ids=delta_ids,
current_state_ids=None,
)
)
return EventContext.with_state(
storage=self._storage_controllers,
state_group=state_group_after_event,
state_group_before_event=state_group_before_event,
state_delta_due_to_event=delta_ids,
prev_group=state_group_before_event,
delta_ids=delta_ids,
partial_state=False,
)
return events_and_context
@measure_func()
async def resolve_state_groups_for_events(

View File

@@ -399,7 +399,11 @@ class StateStorageController:
The state group ID
"""
return await self.stores.state.store_state_group(
event_id, room_id, prev_group, delta_ids, current_state_ids
event_id,
room_id,
prev_group,
delta_ids,
current_state_ids,
)
@cancellable

View File

@@ -18,6 +18,8 @@ from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Se
import attr
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -404,6 +406,112 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
fetched_keys=non_member_types,
)
async def store_state_deltas_for_batched(
self,
events_and_context: List[Tuple[EventBase, EventContext]],
room_id: str,
prev_group: int,
) -> List[int]:
"""Generate and store state deltas for a group of events and contexts created to be
batch persisted.
Args:
events_and_context: the events to generate and store a state groups for
and their associated contexts
room_id: the id of the room the events were created for
prev_group: the state group of the last event persisted before the batched events
were created
Returns: list of state groups that correspond to the events in `events_and_context`
"""
def insert_deltas_group_txn(
txn: LoggingTransaction,
events_and_context: List[Tuple[EventBase, EventContext]],
prev_group: int,
) -> List[int]:
"""Generate and store state groups for the provided events and contexts.
Requires that we have the state as a delta from the last persisted state group.
Returns:
A list of state groups
"""
is_in_db = self.db_pool.simple_select_one_onecol_txn(
txn,
table="state_groups",
keyvalues={"id": prev_group},
retcol="id",
allow_none=True,
)
if not is_in_db:
raise Exception(
"Trying to persist state with unpersisted prev_group: %r"
% (prev_group,)
)
num_state_groups = len(events_and_context)
state_groups = self._state_group_seq_gen.get_next_mult_txn(
txn, num_state_groups
)
index = 0
for event, context in events_and_context:
context._state_group = state_groups[index]
# The first prev_group will be the last persisted state group, which is passed in
# else it will be the group most recently assigned
if index > 0:
context.prev_group = state_groups[index - 1]
context.state_group_before_event = state_groups[index - 1]
else:
context.prev_group = prev_group
context.state_group_before_event = prev_group
context.delta_ids = {(event.type, event.state_key): event.event_id}
context._state_delta_due_to_event = {
(event.type, event.state_key): event.event_id
}
index += 1
self.db_pool.simple_insert_many_txn(
txn,
table="state_groups",
keys=("id", "room_id", "event_id"),
values=[
(context._state_group, room_id, event.event_id)
for event, context in events_and_context
],
)
self.db_pool.simple_insert_many_txn(
txn,
table="state_group_edges",
keys=("state_group", "prev_state_group"),
values=[
(context._state_group, context.prev_group)
for _, context in events_and_context
],
)
for _, context in events_and_context:
assert context.delta_ids is not None
self.db_pool.simple_insert_many_txn(
txn,
table="state_groups_state",
keys=("state_group", "room_id", "type", "state_key", "event_id"),
values=[
(context._state_group, room_id, key[0], key[1], state_id)
for key, state_id in context.delta_ids.items()
],
)
return state_groups
return await self.db_pool.runInteraction(
"store_state_deltas_for_batched.insert_deltas_group",
insert_deltas_group_txn,
events_and_context,
prev_group,
)
async def store_state_group(
self,
event_id: str,

View File

@@ -715,7 +715,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
self.assertEqual(33, channel.resource_usage.db_txn_count)
self.assertEqual(30, channel.resource_usage.db_txn_count)
def test_post_room_initial_state(self) -> None:
# POST with initial_state config key, expect new room id
@@ -728,7 +728,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
self.assertEqual(36, channel.resource_usage.db_txn_count)
self.assertEqual(32, channel.resource_usage.db_txn_count)
def test_post_room_visibility_key(self) -> None:
# POST with visibility config key, expect new room id