mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
23 Commits
083cd158fa
...
shay/batch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd98493c86 | ||
|
|
9602493c13 | ||
|
|
143211fa26 | ||
|
|
4df01d605e | ||
|
|
060c1a153c | ||
|
|
06a3b9a921 | ||
|
|
8cd1196c4b | ||
|
|
b840328d36 | ||
|
|
64e16fef76 | ||
|
|
0da772fd34 | ||
|
|
1ba3c5e15a | ||
|
|
8b4875423e | ||
|
|
3c90093de3 | ||
|
|
b0aa16ddfb | ||
|
|
d6211b5547 | ||
|
|
5ef29eee8a | ||
|
|
bdfe7fb2e2 | ||
|
|
badf303eb2 | ||
|
|
84be413b2f | ||
|
|
beebb6db4a | ||
|
|
5a1decd5be | ||
|
|
63f8ee4007 | ||
|
|
f07e9af194 |
1
changelog.d/14371.misc
Normal file
1
changelog.d/14371.misc
Normal file
@@ -0,0 +1 @@
|
||||
Batch up state groups to store when creating initial room events.
|
||||
@@ -231,7 +231,11 @@ class ThirdPartyEventRules:
|
||||
self._on_threepid_bind_callbacks.append(on_threepid_bind)
|
||||
|
||||
async def check_event_allowed(
|
||||
self, event: EventBase, context: EventContext
|
||||
self,
|
||||
event: EventBase,
|
||||
context: EventContext,
|
||||
for_batch: bool = False,
|
||||
state_map: Optional[StateMap[str]] = None,
|
||||
) -> Tuple[bool, Optional[dict]]:
|
||||
"""Check if a provided event should be allowed in the given context.
|
||||
|
||||
@@ -253,7 +257,11 @@ class ThirdPartyEventRules:
|
||||
if len(self._check_event_allowed_callbacks) == 0:
|
||||
return True, None
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
if for_batch:
|
||||
assert state_map is not None
|
||||
prev_state_ids = state_map
|
||||
else:
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
|
||||
# Retrieve the state events from the database.
|
||||
events = await self.store.get_events(prev_state_ids.values())
|
||||
|
||||
@@ -573,15 +573,15 @@ class EventCreationHandler:
|
||||
depth: Optional[int] = None,
|
||||
state_map: Optional[StateMap[str]] = None,
|
||||
for_batch: bool = False,
|
||||
current_state_group: Optional[int] = None,
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""
|
||||
Given a dict from a client, create a new event. If bool for_batch is true, will
|
||||
create an event using the prev_event_ids, and will create an event context for
|
||||
the event using the parameters state_map and current_state_group, thus these parameters
|
||||
must be provided in this case if for_batch is True. The subsequently created event
|
||||
and context are suitable for being batched up and bulk persisted to the database
|
||||
with other similarly created events.
|
||||
create an event using the prev_event_ids, and will create an event empty context for
|
||||
the event using the parameter state_map, thus this parameter must be provided
|
||||
if for_batch is True. Please note that the caller is then responsible for updating
|
||||
the state group info in the event context (by calling compute_event_context_for_batched).
|
||||
The subsequently created event and context are suitable for being batched up and
|
||||
bulk persisted to the database with other similarly created events.
|
||||
|
||||
Creates an FrozenEvent object, filling out auth_events, prev_events,
|
||||
etc.
|
||||
@@ -636,10 +636,8 @@ class EventCreationHandler:
|
||||
state_map: A state map of previously created events, used only when creating events
|
||||
for batch persisting
|
||||
|
||||
for_batch: whether the event is being created for batch persisting to the db
|
||||
|
||||
current_state_group: the current state group, used only for creating events for
|
||||
batch persisting
|
||||
for_batch: whether the event is being created for batch persisting to the db.
|
||||
If true, both prev_event_ids and state map must be provided
|
||||
|
||||
Raises:
|
||||
ResourceLimitError if server is blocked to some resource being
|
||||
@@ -718,7 +716,6 @@ class EventCreationHandler:
|
||||
depth=depth,
|
||||
state_map=state_map,
|
||||
for_batch=for_batch,
|
||||
current_state_group=current_state_group,
|
||||
)
|
||||
|
||||
# In an ideal world we wouldn't need the second part of this condition. However,
|
||||
@@ -1069,14 +1066,13 @@ class EventCreationHandler:
|
||||
depth: Optional[int] = None,
|
||||
state_map: Optional[StateMap[str]] = None,
|
||||
for_batch: bool = False,
|
||||
current_state_group: Optional[int] = None,
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""Create a new event for a local client. If bool for_batch is true, will
|
||||
create an event using the prev_event_ids, and will create an event context for
|
||||
the event using the parameters state_map and current_state_group, thus these parameters
|
||||
must be provided in this case if for_batch is True. The subsequently created event
|
||||
and context are suitable for being batched up and bulk persisted to the database
|
||||
with other similarly created events.
|
||||
create an event using the prev_event_ids, and will create an empty event context.
|
||||
Please note that the caller is then responsible for updating the event context
|
||||
with state group information (by calling compute_event_context_for_batched).
|
||||
The subsequently created event and context are suitable for being batched up
|
||||
and bulk persisted to the database with other similarly created events.
|
||||
|
||||
Args:
|
||||
builder:
|
||||
@@ -1112,10 +1108,8 @@ class EventCreationHandler:
|
||||
state_map: A state map of previously created events, used only when creating events
|
||||
for batch persisting
|
||||
|
||||
for_batch: whether the event is being created for batch persisting to the db
|
||||
|
||||
current_state_group: the current state group, used only for creating events for
|
||||
batch persisting
|
||||
for_batch: whether the event is being created for batch persisting to the db.
|
||||
If for batch is true, both prev_event_ids and state_map must be provided
|
||||
|
||||
Returns:
|
||||
Tuple of created event, context
|
||||
@@ -1172,14 +1166,11 @@ class EventCreationHandler:
|
||||
if for_batch:
|
||||
assert prev_event_ids is not None
|
||||
assert state_map is not None
|
||||
assert current_state_group is not None
|
||||
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
|
||||
event = await builder.build(
|
||||
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
|
||||
)
|
||||
context = await self.state.compute_event_context_for_batched(
|
||||
event, state_map, current_state_group
|
||||
)
|
||||
context = EventContext(self._storage_controllers)
|
||||
else:
|
||||
event = await builder.build(
|
||||
prev_event_ids=prev_event_ids,
|
||||
@@ -1244,7 +1235,7 @@ class EventCreationHandler:
|
||||
context.app_service = requester.app_service
|
||||
|
||||
res, new_content = await self.third_party_event_rules.check_event_allowed(
|
||||
event, context
|
||||
event, context, for_batch=for_batch, state_map=state_map
|
||||
)
|
||||
if res is False:
|
||||
logger.info(
|
||||
|
||||
@@ -1062,9 +1062,6 @@ class RoomCreationHandler:
|
||||
# created (but not persisted to the db) to determine state for future created events
|
||||
# (as this info can't be pulled from the db)
|
||||
state_map: MutableStateMap[str] = {}
|
||||
# current_state_group of last event created. Used for computing event context of
|
||||
# events to be batched
|
||||
current_state_group = None
|
||||
|
||||
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
|
||||
e = {"type": etype, "content": content}
|
||||
@@ -1105,7 +1102,6 @@ class RoomCreationHandler:
|
||||
depth=depth,
|
||||
state_map=state_map,
|
||||
for_batch=for_batch,
|
||||
current_state_group=current_state_group,
|
||||
)
|
||||
depth += 1
|
||||
prev_event = [new_event.event_id]
|
||||
@@ -1168,7 +1164,6 @@ class RoomCreationHandler:
|
||||
power_event, power_context = await create_event(
|
||||
EventTypes.PowerLevels, pl_content, True
|
||||
)
|
||||
current_state_group = power_context._state_group
|
||||
events_to_send.append((power_event, power_context))
|
||||
else:
|
||||
power_level_content: JsonDict = {
|
||||
@@ -1217,14 +1212,12 @@ class RoomCreationHandler:
|
||||
power_level_content,
|
||||
True,
|
||||
)
|
||||
current_state_group = pl_context._state_group
|
||||
events_to_send.append((pl_event, pl_context))
|
||||
|
||||
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
|
||||
room_alias_event, room_alias_context = await create_event(
|
||||
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
|
||||
)
|
||||
current_state_group = room_alias_context._state_group
|
||||
events_to_send.append((room_alias_event, room_alias_context))
|
||||
|
||||
if (EventTypes.JoinRules, "") not in initial_state:
|
||||
@@ -1233,7 +1226,6 @@ class RoomCreationHandler:
|
||||
{"join_rule": config["join_rules"]},
|
||||
True,
|
||||
)
|
||||
current_state_group = join_rules_context._state_group
|
||||
events_to_send.append((join_rules_event, join_rules_context))
|
||||
|
||||
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
|
||||
@@ -1242,7 +1234,6 @@ class RoomCreationHandler:
|
||||
{"history_visibility": config["history_visibility"]},
|
||||
True,
|
||||
)
|
||||
current_state_group = visibility_context._state_group
|
||||
events_to_send.append((visibility_event, visibility_context))
|
||||
|
||||
if config["guest_can_join"]:
|
||||
@@ -1252,14 +1243,12 @@ class RoomCreationHandler:
|
||||
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
|
||||
True,
|
||||
)
|
||||
current_state_group = guest_access_context._state_group
|
||||
events_to_send.append((guest_access_event, guest_access_context))
|
||||
|
||||
for (etype, state_key), content in initial_state.items():
|
||||
event, context = await create_event(
|
||||
etype, content, True, state_key=state_key
|
||||
)
|
||||
current_state_group = context._state_group
|
||||
events_to_send.append((event, context))
|
||||
|
||||
if config["encrypted"]:
|
||||
@@ -1271,6 +1260,12 @@ class RoomCreationHandler:
|
||||
)
|
||||
events_to_send.append((encryption_event, encryption_context))
|
||||
|
||||
# update event contexts with state group information
|
||||
state = self.hs.get_state_handler()
|
||||
events_to_send = await state.compute_event_context_for_batched(
|
||||
events_to_send, current_state_group, state_map
|
||||
)
|
||||
|
||||
last_event = await self.event_creation_handler.handle_new_client_event(
|
||||
creator,
|
||||
events_to_send,
|
||||
|
||||
@@ -422,66 +422,57 @@ class StateHandler:
|
||||
|
||||
async def compute_event_context_for_batched(
|
||||
self,
|
||||
event: EventBase,
|
||||
state_ids_before_event: StateMap[str],
|
||||
current_state_group: int,
|
||||
) -> EventContext:
|
||||
events_and_context: List[Tuple[EventBase, EventContext]],
|
||||
prev_group: int,
|
||||
state_ids_before_event: StateMap,
|
||||
) -> List[Tuple[EventBase, EventContext]]:
|
||||
"""
|
||||
Generate an event context for an event that has not yet been persisted to the
|
||||
database. Intended for use with events that are created to be persisted in a batch.
|
||||
Args:
|
||||
event: the event the context is being computed for
|
||||
state_ids_before_event: a state map consisting of the state ids of the events
|
||||
created prior to this event.
|
||||
current_state_group: the current state group before the event.
|
||||
events_and_context: a list of events and their associated contexts
|
||||
prev_group: the state group of the last event persisted before the batched events
|
||||
were created
|
||||
state_ids_before_event: a state map consisting of current state ids
|
||||
"""
|
||||
state_group_before_event_prev_group = None
|
||||
deltas_to_state_group_before_event = None
|
||||
# separate out state and non-state contexts
|
||||
state_events = []
|
||||
for event, context in events_and_context:
|
||||
if event.is_state():
|
||||
state_events.append((event, context))
|
||||
|
||||
state_group_before_event = current_state_group
|
||||
# get state groups for state events
|
||||
room_id = events_and_context[0][0].room_id
|
||||
assert self.hs.datastores is not None
|
||||
await self.hs.datastores.state.store_state_deltas_for_batched(
|
||||
state_events, room_id, prev_group=prev_group
|
||||
)
|
||||
|
||||
# if the event is not state, we are set
|
||||
if not event.is_state():
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_group=state_group_before_event,
|
||||
state_delta_due_to_event={},
|
||||
prev_group=state_group_before_event_prev_group,
|
||||
delta_ids=deltas_to_state_group_before_event,
|
||||
partial_state=False,
|
||||
)
|
||||
# iterate through all contexts and update everything
|
||||
current_state_group = prev_group
|
||||
for event, context in events_and_context:
|
||||
|
||||
# otherwise, we'll need to create a new state group for after the event
|
||||
key = (event.type, event.state_key)
|
||||
# if the event is not state, we need to update it
|
||||
if not event.is_state():
|
||||
context._state_group = current_state_group
|
||||
context.state_group_before_event = current_state_group
|
||||
context._state_delta_due_to_event = {}
|
||||
context.prev_group = None
|
||||
context.delta_ids = None
|
||||
context.partial_state = False
|
||||
|
||||
if state_ids_before_event is not None:
|
||||
# the context should have been updated when storing the state groups but let's
|
||||
# be sure - if it does not have a state group there is a problem
|
||||
if context._state_group is None:
|
||||
raise RuntimeError(f"Event {event.event_id} is missing a state group.")
|
||||
current_state_group = context._state_group
|
||||
|
||||
key = (event.type, event.state_key)
|
||||
replaces = state_ids_before_event.get(key)
|
||||
if replaces and replaces != event.event_id:
|
||||
event.unsigned["replaces_state"] = replaces
|
||||
|
||||
if replaces and replaces != event.event_id:
|
||||
event.unsigned["replaces_state"] = replaces
|
||||
|
||||
delta_ids = {key: event.event_id}
|
||||
|
||||
state_group_after_event = (
|
||||
await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
current_state_ids=None,
|
||||
)
|
||||
)
|
||||
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group=state_group_after_event,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_delta_due_to_event=delta_ids,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
partial_state=False,
|
||||
)
|
||||
return events_and_context
|
||||
|
||||
@measure_func()
|
||||
async def resolve_state_groups_for_events(
|
||||
|
||||
@@ -399,7 +399,11 @@ class StateStorageController:
|
||||
The state group ID
|
||||
"""
|
||||
return await self.stores.state.store_state_group(
|
||||
event_id, room_id, prev_group, delta_ids, current_state_ids
|
||||
event_id,
|
||||
room_id,
|
||||
prev_group,
|
||||
delta_ids,
|
||||
current_state_ids,
|
||||
)
|
||||
|
||||
@cancellable
|
||||
|
||||
@@ -18,6 +18,8 @@ from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Se
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
@@ -404,6 +406,112 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
fetched_keys=non_member_types,
|
||||
)
|
||||
|
||||
async def store_state_deltas_for_batched(
|
||||
self,
|
||||
events_and_context: List[Tuple[EventBase, EventContext]],
|
||||
room_id: str,
|
||||
prev_group: int,
|
||||
) -> List[int]:
|
||||
"""Generate and store state deltas for a group of events and contexts created to be
|
||||
batch persisted.
|
||||
|
||||
Args:
|
||||
events_and_context: the events to generate and store a state groups for
|
||||
and their associated contexts
|
||||
room_id: the id of the room the events were created for
|
||||
prev_group: the state group of the last event persisted before the batched events
|
||||
were created
|
||||
Returns: list of state groups that correspond to the events in `events_and_context`
|
||||
"""
|
||||
|
||||
def insert_deltas_group_txn(
|
||||
txn: LoggingTransaction,
|
||||
events_and_context: List[Tuple[EventBase, EventContext]],
|
||||
prev_group: int,
|
||||
) -> List[int]:
|
||||
"""Generate and store state groups for the provided events and contexts.
|
||||
|
||||
Requires that we have the state as a delta from the last persisted state group.
|
||||
|
||||
Returns:
|
||||
A list of state groups
|
||||
"""
|
||||
is_in_db = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (prev_group,)
|
||||
)
|
||||
|
||||
num_state_groups = len(events_and_context)
|
||||
|
||||
state_groups = self._state_group_seq_gen.get_next_mult_txn(
|
||||
txn, num_state_groups
|
||||
)
|
||||
|
||||
index = 0
|
||||
for event, context in events_and_context:
|
||||
context._state_group = state_groups[index]
|
||||
# The first prev_group will be the last persisted state group, which is passed in
|
||||
# else it will be the group most recently assigned
|
||||
if index > 0:
|
||||
context.prev_group = state_groups[index - 1]
|
||||
context.state_group_before_event = state_groups[index - 1]
|
||||
else:
|
||||
context.prev_group = prev_group
|
||||
context.state_group_before_event = prev_group
|
||||
context.delta_ids = {(event.type, event.state_key): event.event_id}
|
||||
context._state_delta_due_to_event = {
|
||||
(event.type, event.state_key): event.event_id
|
||||
}
|
||||
index += 1
|
||||
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keys=("id", "room_id", "event_id"),
|
||||
values=[
|
||||
(context._state_group, room_id, event.event_id)
|
||||
for event, context in events_and_context
|
||||
],
|
||||
)
|
||||
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keys=("state_group", "prev_state_group"),
|
||||
values=[
|
||||
(context._state_group, context.prev_group)
|
||||
for _, context in events_and_context
|
||||
],
|
||||
)
|
||||
|
||||
for _, context in events_and_context:
|
||||
assert context.delta_ids is not None
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||
values=[
|
||||
(context._state_group, room_id, key[0], key[1], state_id)
|
||||
for key, state_id in context.delta_ids.items()
|
||||
],
|
||||
)
|
||||
return state_groups
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"store_state_deltas_for_batched.insert_deltas_group",
|
||||
insert_deltas_group_txn,
|
||||
events_and_context,
|
||||
prev_group,
|
||||
)
|
||||
|
||||
async def store_state_group(
|
||||
self,
|
||||
event_id: str,
|
||||
|
||||
@@ -715,7 +715,7 @@ class RoomsCreateTestCase(RoomBase):
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(33, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(30, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_initial_state(self) -> None:
|
||||
# POST with initial_state config key, expect new room id
|
||||
@@ -728,7 +728,7 @@ class RoomsCreateTestCase(RoomBase):
|
||||
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
|
||||
self.assertTrue("room_id" in channel.json_body)
|
||||
assert channel.resource_usage is not None
|
||||
self.assertEqual(36, channel.resource_usage.db_txn_count)
|
||||
self.assertEqual(32, channel.resource_usage.db_txn_count)
|
||||
|
||||
def test_post_room_visibility_key(self) -> None:
|
||||
# POST with visibility config key, expect new room id
|
||||
|
||||
Reference in New Issue
Block a user