mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
4 Commits
patch-1
...
erikj/even
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34bc0bec98 | ||
|
|
248257c130 | ||
|
|
ba93cda363 | ||
|
|
07d8afc56c |
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import synapse.state
|
||||
import synapse.storage
|
||||
@@ -22,6 +23,9 @@ from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.types import UserID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -30,7 +34,7 @@ class BaseHandler(object):
|
||||
Common base class for the event handlers.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
"""
|
||||
Args:
|
||||
hs (synapse.server.HomeServer):
|
||||
|
||||
@@ -18,7 +18,7 @@ from typing import List
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.types import RoomStreamToken, StateMap
|
||||
from synapse.types import EventStreamToken, StateMap
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -125,8 +125,8 @@ class AdminHandler(BaseHandler):
|
||||
else:
|
||||
stream_ordering = room.stream_ordering
|
||||
|
||||
from_key = str(RoomStreamToken(0, 0))
|
||||
to_key = str(RoomStreamToken(None, stream_ordering))
|
||||
from_key = str(EventStreamToken(stream=0, topological=0))
|
||||
to_key = str(EventStreamToken(stream_ordering))
|
||||
|
||||
written_events = set() # Events that we've processed in this room
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ from synapse.api.errors import (
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import (
|
||||
RoomStreamToken,
|
||||
StreamToken,
|
||||
get_domain_from_id,
|
||||
get_verify_key_from_cross_signing_key,
|
||||
)
|
||||
@@ -104,7 +104,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
|
||||
@trace
|
||||
@measure_func("device.get_user_ids_changed")
|
||||
async def get_user_ids_changed(self, user_id, from_token):
|
||||
async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
|
||||
"""Get list of users that have had the devices updated, or have newly
|
||||
joined a room, that `user_id` may be interested in.
|
||||
|
||||
@@ -115,7 +115,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
|
||||
set_tag("user_id", user_id)
|
||||
set_tag("from_token", from_token)
|
||||
now_room_key = await self.store.get_room_events_max_id()
|
||||
now_room_key = self.store.get_room_events_max_id()
|
||||
|
||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
||||
|
||||
@@ -142,7 +142,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
)
|
||||
rooms_changed.update(event.room_id for event in member_events)
|
||||
|
||||
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream
|
||||
stream_ordering = from_token.room_key.stream
|
||||
|
||||
possibly_changed = set(changed)
|
||||
possibly_left = set()
|
||||
|
||||
@@ -73,6 +73,7 @@ from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRes
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import (
|
||||
EventStreamToken,
|
||||
JsonDict,
|
||||
MutableStateMap,
|
||||
StateMap,
|
||||
@@ -1275,7 +1276,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
async def do_invite_join(
|
||||
self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
""" Attempts to join the `joinee` to the room `room_id` via the
|
||||
servers contained in `target_hosts`.
|
||||
|
||||
@@ -1372,7 +1373,7 @@ class FederationHandler(BaseHandler):
|
||||
await self._replication.wait_for_stream_position(
|
||||
self.config.worker.events_shard_config.get_instance(room_id),
|
||||
"events",
|
||||
max_stream_id,
|
||||
max_stream_id.stream,
|
||||
)
|
||||
|
||||
# Check whether this room is the result of an upgrade of a room we already know
|
||||
@@ -1632,7 +1633,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
async def do_remotely_reject_invite(
|
||||
self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict
|
||||
) -> Tuple[EventBase, int]:
|
||||
) -> Tuple[EventBase, EventStreamToken]:
|
||||
origin, event, room_version = await self._make_and_verify_event(
|
||||
target_hosts, room_id, user_id, "leave", content=content
|
||||
)
|
||||
@@ -1653,11 +1654,11 @@ class FederationHandler(BaseHandler):
|
||||
await self.federation_client.send_leave(host_list, event)
|
||||
|
||||
context = await self.state_handler.compute_event_context(event)
|
||||
stream_id = await self.persist_events_and_notify(
|
||||
stream_token = await self.persist_events_and_notify(
|
||||
event.room_id, [(event, context)]
|
||||
)
|
||||
|
||||
return event, stream_id
|
||||
return event, stream_token
|
||||
|
||||
async def _make_and_verify_event(
|
||||
self,
|
||||
@@ -1964,7 +1965,7 @@ class FederationHandler(BaseHandler):
|
||||
state: List[EventBase],
|
||||
event: EventBase,
|
||||
room_version: RoomVersion,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""Checks the auth chain is valid (and passes auth checks) for the
|
||||
state and event. Then persists the auth chain and state atomically.
|
||||
Persists the event separately. Notifies about the persisted events
|
||||
@@ -2916,7 +2917,7 @@ class FederationHandler(BaseHandler):
|
||||
room_id: str,
|
||||
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""Persists events and tells the notifier/pushers about them, if
|
||||
necessary.
|
||||
|
||||
@@ -2937,9 +2938,9 @@ class FederationHandler(BaseHandler):
|
||||
event_and_contexts=event_and_contexts,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
return result["max_stream_id"]
|
||||
return EventStreamToken.parse(result["max_stream_id"])
|
||||
else:
|
||||
max_stream_id = await self.storage.persistence.persist_events(
|
||||
max_stream_token = await self.storage.persistence.persist_events(
|
||||
event_and_contexts, backfilled=backfilled
|
||||
)
|
||||
|
||||
@@ -2950,12 +2951,12 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
for event, _ in event_and_contexts:
|
||||
await self._notify_persisted_event(event, max_stream_id)
|
||||
await self._notify_persisted_event(event, max_stream_token)
|
||||
|
||||
return max_stream_id
|
||||
return max_stream_token
|
||||
|
||||
async def _notify_persisted_event(
|
||||
self, event: EventBase, max_stream_id: int
|
||||
self, event: EventBase, max_stream_token: EventStreamToken,
|
||||
) -> None:
|
||||
"""Checks to see if notifier/pushers should be notified about the
|
||||
event or not.
|
||||
@@ -2981,12 +2982,14 @@ class FederationHandler(BaseHandler):
|
||||
elif event.internal_metadata.is_outlier():
|
||||
return
|
||||
|
||||
event_stream_id = event.internal_metadata.stream_ordering
|
||||
event_stream_token = EventStreamToken(event.internal_metadata.stream_ordering)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
event, event_stream_token, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
|
||||
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
|
||||
await self.pusher_pool.on_new_notifications(
|
||||
event_stream_token.stream, max_stream_token.stream
|
||||
)
|
||||
|
||||
async def _clean_room_for_join(self, room_id: str) -> None:
|
||||
"""Called to clean up any data in DB for a given room, ready for the
|
||||
|
||||
@@ -23,7 +23,7 @@ from synapse.events.validator import EventValidator
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import StreamToken, UserID
|
||||
from synapse.types import EventStreamToken, StreamToken, UserID
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
@@ -164,7 +164,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
self.state_handler.get_current_state, event.room_id
|
||||
)
|
||||
elif event.membership == Membership.LEAVE:
|
||||
room_end_token = "s%d" % (event.stream_ordering,)
|
||||
room_end_token = EventStreamToken(event.stream_ordering)
|
||||
deferred_room_state = run_in_background(
|
||||
self.state_store.get_state_for_events, [event.event_id]
|
||||
)
|
||||
|
||||
@@ -49,7 +49,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
||||
from synapse.types import (
|
||||
EventStreamToken,
|
||||
Requester,
|
||||
RoomAlias,
|
||||
StreamToken,
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
@@ -644,7 +651,7 @@ class EventCreationHandler(object):
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
ignore_shadow_ban: bool = False,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""
|
||||
Persists and notifies local clients and federation of an event.
|
||||
|
||||
@@ -726,7 +733,7 @@ class EventCreationHandler(object):
|
||||
ratelimit: bool = True,
|
||||
txn_id: Optional[str] = None,
|
||||
ignore_shadow_ban: bool = False,
|
||||
) -> Tuple[EventBase, int]:
|
||||
) -> Tuple[EventBase, EventStreamToken]:
|
||||
"""
|
||||
Creates an event, then sends it.
|
||||
|
||||
@@ -764,14 +771,14 @@ class EventCreationHandler(object):
|
||||
spam_error = "Spam is not permitted here"
|
||||
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
|
||||
|
||||
stream_id = await self.send_nonmember_event(
|
||||
stream_token = await self.send_nonmember_event(
|
||||
requester,
|
||||
event,
|
||||
context,
|
||||
ratelimit=ratelimit,
|
||||
ignore_shadow_ban=ignore_shadow_ban,
|
||||
)
|
||||
return event, stream_id
|
||||
return event, stream_token
|
||||
|
||||
@measure_func("create_new_client_event")
|
||||
async def create_new_client_event(
|
||||
@@ -845,7 +852,7 @@ class EventCreationHandler(object):
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
extra_users: List[UserID] = [],
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""Processes a new event. This includes checking auth, persisting it,
|
||||
notifying users, sending to remote servers, etc.
|
||||
|
||||
@@ -915,15 +922,15 @@ class EventCreationHandler(object):
|
||||
ratelimit=ratelimit,
|
||||
extra_users=extra_users,
|
||||
)
|
||||
stream_id = result["stream_id"]
|
||||
event.internal_metadata.stream_ordering = stream_id
|
||||
return stream_id
|
||||
stream_token = EventStreamToken.parse(result["stream_token"])
|
||||
event.internal_metadata.stream_ordering = stream_token.stream
|
||||
return stream_token
|
||||
|
||||
stream_id = await self.persist_and_notify_client_event(
|
||||
stream_token = await self.persist_and_notify_client_event(
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||
)
|
||||
|
||||
return stream_id
|
||||
return stream_token
|
||||
except Exception:
|
||||
# Ensure that we actually remove the entries in the push actions
|
||||
# staging area, if we calculated them.
|
||||
@@ -968,7 +975,7 @@ class EventCreationHandler(object):
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
extra_users: List[UserID] = [],
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""Called when we have fully built the event, have already
|
||||
calculated the push actions for the event, and checked auth.
|
||||
|
||||
@@ -1139,20 +1146,23 @@ class EventCreationHandler(object):
|
||||
if prev_state_ids:
|
||||
raise AuthError(403, "Changing the room create event is forbidden")
|
||||
|
||||
event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
|
||||
event, context=context
|
||||
)
|
||||
(
|
||||
event_stream_token,
|
||||
max_stream_token,
|
||||
) = await self.storage.persistence.persist_event(event, context=context)
|
||||
|
||||
if self._ephemeral_events_enabled:
|
||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
|
||||
await self.pusher_pool.on_new_notifications(
|
||||
event_stream_token.stream, max_stream_token.stream
|
||||
)
|
||||
|
||||
def _notify():
|
||||
try:
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
event, event_stream_token, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying about new room event")
|
||||
@@ -1164,7 +1174,7 @@ class EventCreationHandler(object):
|
||||
# matters as sometimes presence code can take a while.
|
||||
run_in_background(self._bump_active_time, requester.user)
|
||||
|
||||
return event_stream_id
|
||||
return event_stream_token
|
||||
|
||||
async def _bump_active_time(self, user: UserID) -> None:
|
||||
try:
|
||||
|
||||
@@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import Requester, RoomStreamToken
|
||||
from synapse.types import EventStreamToken, Requester
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -338,8 +338,6 @@ class PaginationHandler(object):
|
||||
)
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
||||
room_token = RoomStreamToken.parse(room_token)
|
||||
|
||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", str(room_token)
|
||||
)
|
||||
@@ -371,7 +369,7 @@ class PaginationHandler(object):
|
||||
leave_token = await self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
leave_token = EventStreamToken.parse(leave_token)
|
||||
if leave_token.topological < max_topo:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ import math
|
||||
import random
|
||||
import string
|
||||
from collections import OrderedDict
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import (
|
||||
EventTypes,
|
||||
@@ -40,12 +40,12 @@ from synapse.events.utils import copy_power_levels_contents
|
||||
from synapse.http.endpoint import parse_and_validate_server_name
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
EventStreamToken,
|
||||
JsonDict,
|
||||
MutableStateMap,
|
||||
Requester,
|
||||
RoomAlias,
|
||||
RoomID,
|
||||
RoomStreamToken,
|
||||
StateMap,
|
||||
StreamToken,
|
||||
UserID,
|
||||
@@ -559,7 +559,7 @@ class RoomCreationHandler(BaseHandler):
|
||||
config: JsonDict,
|
||||
ratelimit: bool = True,
|
||||
creator_join_profile: Optional[JsonDict] = None,
|
||||
) -> Tuple[dict, int]:
|
||||
) -> Tuple[dict, EventStreamToken]:
|
||||
""" Creates a new room.
|
||||
|
||||
Args:
|
||||
@@ -806,7 +806,7 @@ class RoomCreationHandler(BaseHandler):
|
||||
await self._replication.wait_for_stream_position(
|
||||
self.hs.config.worker.events_shard_config.get_instance(room_id),
|
||||
"events",
|
||||
last_stream_id,
|
||||
last_stream_id.stream,
|
||||
)
|
||||
|
||||
return result, last_stream_id
|
||||
@@ -822,7 +822,7 @@ class RoomCreationHandler(BaseHandler):
|
||||
room_alias: Optional[RoomAlias] = None,
|
||||
power_level_content_override: Optional[JsonDict] = None,
|
||||
creator_join_profile: Optional[JsonDict] = None,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""Sends the initial events into a new room.
|
||||
|
||||
`power_level_content_override` doesn't apply when initial state has
|
||||
@@ -844,7 +844,7 @@ class RoomCreationHandler(BaseHandler):
|
||||
|
||||
return e
|
||||
|
||||
async def send(etype: str, content: JsonDict, **kwargs) -> int:
|
||||
async def send(etype: str, content: JsonDict, **kwargs) -> EventStreamToken:
|
||||
event = create(etype, content, **kwargs)
|
||||
logger.debug("Sending %s in new room", etype)
|
||||
# Allow these events to be sent even if the user is shadow-banned to
|
||||
@@ -1093,20 +1093,19 @@ class RoomEventSource(object):
|
||||
async def get_new_events(
|
||||
self,
|
||||
user: UserID,
|
||||
from_key: str,
|
||||
from_key: EventStreamToken,
|
||||
limit: int,
|
||||
room_ids: List[str],
|
||||
is_guest: bool,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
) -> Tuple[List[EventBase], str]:
|
||||
explicit_room_id: str = None,
|
||||
) -> Tuple[List[EventBase], EventStreamToken]:
|
||||
# We just ignore the key for now.
|
||||
|
||||
to_key = self.get_current_key()
|
||||
|
||||
from_token = RoomStreamToken.parse(from_key)
|
||||
if from_token.topological:
|
||||
if from_key.topological:
|
||||
logger.warning("Stream has topological part!!!! %r", from_key)
|
||||
from_key = "s%s" % (from_token.stream,)
|
||||
from_key = EventStreamToken(from_key.stream)
|
||||
|
||||
app_service = self.store.get_app_service_by_user_id(user.to_string())
|
||||
if app_service:
|
||||
@@ -1135,17 +1134,14 @@ class RoomEventSource(object):
|
||||
events[:] = events[:limit]
|
||||
|
||||
if events:
|
||||
end_key = events[-1].internal_metadata.after
|
||||
end_key = EventStreamToken.parse(events[-1].internal_metadata.after)
|
||||
else:
|
||||
end_key = to_key
|
||||
|
||||
return (events, end_key)
|
||||
|
||||
def get_current_key(self) -> str:
|
||||
return "s%d" % (self.store.get_room_max_stream_ordering(),)
|
||||
|
||||
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
|
||||
return self.store.get_room_events_max_id(room_id)
|
||||
def get_current_key(self) -> EventStreamToken:
|
||||
return EventStreamToken(self.store.get_room_max_stream_ordering(),)
|
||||
|
||||
|
||||
class RoomShutdownHandler(object):
|
||||
@@ -1244,7 +1240,7 @@ class RoomShutdownHandler(object):
|
||||
|
||||
room_creator_requester = create_requester(new_room_user_id)
|
||||
|
||||
info, stream_id = await self._room_creation_handler.create_room(
|
||||
info, stream_token = await self._room_creation_handler.create_room(
|
||||
room_creator_requester,
|
||||
config={
|
||||
"preset": RoomCreationPreset.PUBLIC_CHAT,
|
||||
@@ -1265,7 +1261,7 @@ class RoomShutdownHandler(object):
|
||||
await self._replication.wait_for_stream_position(
|
||||
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
|
||||
"events",
|
||||
stream_id,
|
||||
stream_token.stream,
|
||||
)
|
||||
else:
|
||||
new_room_id = None
|
||||
@@ -1283,7 +1279,7 @@ class RoomShutdownHandler(object):
|
||||
try:
|
||||
# Kick users from room
|
||||
target_requester = create_requester(user_id)
|
||||
_, stream_id = await self.room_member_handler.update_membership(
|
||||
_, stream_token = await self.room_member_handler.update_membership(
|
||||
requester=target_requester,
|
||||
target=target_requester.user,
|
||||
room_id=room_id,
|
||||
@@ -1297,7 +1293,7 @@ class RoomShutdownHandler(object):
|
||||
await self._replication.wait_for_stream_position(
|
||||
self.hs.config.worker.events_shard_config.get_instance(room_id),
|
||||
"events",
|
||||
stream_id,
|
||||
stream_token.stream,
|
||||
)
|
||||
|
||||
await self.room_member_handler.forget(target_requester.user, room_id)
|
||||
|
||||
@@ -38,7 +38,15 @@ from synapse.events.builder import create_local_event_from_event_dict
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.storage.roommember import RoomsForUser
|
||||
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
|
||||
from synapse.types import (
|
||||
EventStreamToken,
|
||||
JsonDict,
|
||||
Requester,
|
||||
RoomAlias,
|
||||
RoomID,
|
||||
StateMap,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room, user_left_room
|
||||
|
||||
@@ -106,7 +114,7 @@ class RoomMemberHandler(object):
|
||||
room_id: str,
|
||||
user: UserID,
|
||||
content: dict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""Try and join a room that this server is not in
|
||||
|
||||
Args:
|
||||
@@ -125,7 +133,7 @@ class RoomMemberHandler(object):
|
||||
txn_id: Optional[str],
|
||||
requester: Requester,
|
||||
content: JsonDict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""
|
||||
Rejects an out-of-band invite we have received from a remote server
|
||||
|
||||
@@ -137,7 +145,7 @@ class RoomMemberHandler(object):
|
||||
Normally an empty dict.
|
||||
|
||||
Returns:
|
||||
event id, stream_id of the leave event
|
||||
event id, stream token of the leave event
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -174,7 +182,7 @@ class RoomMemberHandler(object):
|
||||
ratelimit: bool = True,
|
||||
content: Optional[dict] = None,
|
||||
require_consent: bool = True,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
user_id = target.to_string()
|
||||
|
||||
if content is None:
|
||||
@@ -208,7 +216,7 @@ class RoomMemberHandler(object):
|
||||
if duplicate is not None:
|
||||
# Discard the new event since this membership change is a no-op.
|
||||
_, stream_id = await self.store.get_event_ordering(duplicate.event_id)
|
||||
return duplicate.event_id, stream_id
|
||||
return duplicate.event_id, EventStreamToken(stream_id)
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
|
||||
@@ -235,7 +243,7 @@ class RoomMemberHandler(object):
|
||||
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
||||
)
|
||||
|
||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
||||
stream_token = await self.event_creation_handler.handle_new_client_event(
|
||||
requester, event, context, extra_users=[target], ratelimit=ratelimit,
|
||||
)
|
||||
|
||||
@@ -250,7 +258,7 @@ class RoomMemberHandler(object):
|
||||
if prev_member_event.membership == Membership.JOIN:
|
||||
await self._user_left_room(target, room_id)
|
||||
|
||||
return event.event_id, stream_id
|
||||
return event.event_id, stream_token
|
||||
|
||||
async def copy_room_tags_and_direct_to_room(
|
||||
self, old_room_id, new_room_id, user_id
|
||||
@@ -300,7 +308,7 @@ class RoomMemberHandler(object):
|
||||
ratelimit: bool = True,
|
||||
content: Optional[dict] = None,
|
||||
require_consent: bool = True,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""Update a user's membership in a room.
|
||||
|
||||
Params:
|
||||
@@ -356,7 +364,7 @@ class RoomMemberHandler(object):
|
||||
ratelimit: bool = True,
|
||||
content: Optional[dict] = None,
|
||||
require_consent: bool = True,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
content_specified = bool(content)
|
||||
if content is None:
|
||||
content = {}
|
||||
@@ -465,7 +473,7 @@ class RoomMemberHandler(object):
|
||||
)
|
||||
return (
|
||||
old_state.event_id,
|
||||
stream_id,
|
||||
EventStreamToken(stream_id),
|
||||
)
|
||||
|
||||
if old_membership in ["ban", "leave"] and action == "kick":
|
||||
@@ -524,11 +532,11 @@ class RoomMemberHandler(object):
|
||||
if requester.is_guest:
|
||||
content["kind"] = "guest"
|
||||
|
||||
remote_join_response = await self._remote_join(
|
||||
event_id, stream_token = await self._remote_join(
|
||||
requester, remote_room_hosts, room_id, target, content
|
||||
)
|
||||
|
||||
return remote_join_response
|
||||
return event_id, stream_token
|
||||
|
||||
elif effective_membership_state == Membership.LEAVE:
|
||||
if not is_host_in_room:
|
||||
@@ -801,7 +809,7 @@ class RoomMemberHandler(object):
|
||||
requester: Requester,
|
||||
txn_id: Optional[str],
|
||||
id_access_token: Optional[str] = None,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""Invite a 3PID to a room.
|
||||
|
||||
Args:
|
||||
@@ -859,11 +867,11 @@ class RoomMemberHandler(object):
|
||||
if invitee:
|
||||
# Note that update_membership with an action of "invite" can raise
|
||||
# a ShadowBanError, but this was done above already.
|
||||
_, stream_id = await self.update_membership(
|
||||
_, stream_token = await self.update_membership(
|
||||
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
|
||||
)
|
||||
else:
|
||||
stream_id = await self._make_and_store_3pid_invite(
|
||||
stream_token = await self._make_and_store_3pid_invite(
|
||||
requester,
|
||||
id_server,
|
||||
medium,
|
||||
@@ -874,7 +882,7 @@ class RoomMemberHandler(object):
|
||||
id_access_token=id_access_token,
|
||||
)
|
||||
|
||||
return stream_id
|
||||
return stream_token
|
||||
|
||||
async def _make_and_store_3pid_invite(
|
||||
self,
|
||||
@@ -886,7 +894,7 @@ class RoomMemberHandler(object):
|
||||
user: UserID,
|
||||
txn_id: Optional[str],
|
||||
id_access_token: Optional[str] = None,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
room_state = await self.state_handler.get_current_state(room_id)
|
||||
|
||||
inviter_display_name = ""
|
||||
@@ -1042,7 +1050,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
room_id: str,
|
||||
user: UserID,
|
||||
content: dict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""Implements RoomMemberHandler._remote_join
|
||||
"""
|
||||
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
|
||||
@@ -1113,7 +1121,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
txn_id: Optional[str],
|
||||
requester: Requester,
|
||||
content: JsonDict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""
|
||||
Rejects an out-of-band invite received from a remote user
|
||||
|
||||
@@ -1127,10 +1135,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
fed_handler = self.federation_handler
|
||||
try:
|
||||
inviter_id = UserID.from_string(invite_event.sender)
|
||||
event, stream_id = await fed_handler.do_remotely_reject_invite(
|
||||
event, stream_token = await fed_handler.do_remotely_reject_invite(
|
||||
[inviter_id.domain], room_id, target_user, content=content
|
||||
)
|
||||
return event.event_id, stream_id
|
||||
return event.event_id, stream_token
|
||||
except Exception as e:
|
||||
# if we were unable to reject the invite, we will generate our own
|
||||
# leave event.
|
||||
@@ -1150,7 +1158,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
txn_id: Optional[str],
|
||||
requester: Requester,
|
||||
content: JsonDict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""Generate a local invite rejection
|
||||
|
||||
This is called after we fail to reject an invite via a remote server. It
|
||||
@@ -1216,10 +1224,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
|
||||
context = await self.state_handler.compute_event_context(event)
|
||||
context.app_service = requester.app_service
|
||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
||||
stream_token = await self.event_creation_handler.handle_new_client_event(
|
||||
requester, event, context, extra_users=[UserID.from_string(target_user)],
|
||||
)
|
||||
return event.event_id, stream_id
|
||||
return event.event_id, stream_token
|
||||
|
||||
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
|
||||
"""Implements RoomMemberHandler._user_joined_room
|
||||
|
||||
@@ -23,7 +23,7 @@ from synapse.replication.http.membership import (
|
||||
ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
|
||||
ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
|
||||
)
|
||||
from synapse.types import Requester, UserID
|
||||
from synapse.types import EventStreamToken, Requester, UserID
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -43,7 +43,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
|
||||
room_id: str,
|
||||
user: UserID,
|
||||
content: dict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""Implements RoomMemberHandler._remote_join
|
||||
"""
|
||||
if len(remote_room_hosts) == 0:
|
||||
@@ -59,7 +59,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
|
||||
|
||||
await self._user_joined_room(user, room_id)
|
||||
|
||||
return ret["event_id"], ret["stream_id"]
|
||||
return ret["event_id"], EventStreamToken.parse(ret["stream_id"])
|
||||
|
||||
async def remote_reject_invite(
|
||||
self,
|
||||
@@ -67,7 +67,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
|
||||
txn_id: Optional[str],
|
||||
requester: Requester,
|
||||
content: dict,
|
||||
) -> Tuple[str, int]:
|
||||
) -> Tuple[str, EventStreamToken]:
|
||||
"""
|
||||
Rejects an out-of-band invite received from a remote user
|
||||
|
||||
@@ -79,7 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
|
||||
requester=requester,
|
||||
content=content,
|
||||
)
|
||||
return ret["event_id"], ret["stream_id"]
|
||||
return ret["event_id"], EventStreamToken.parse(ret["stream_id"])
|
||||
|
||||
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
|
||||
"""Implements RoomMemberHandler._user_joined_room
|
||||
|
||||
@@ -30,9 +30,9 @@ from synapse.storage.roommember import MemberSummary
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
Collection,
|
||||
EventStreamToken,
|
||||
JsonDict,
|
||||
MutableStateMap,
|
||||
RoomStreamToken,
|
||||
StateMap,
|
||||
StreamToken,
|
||||
UserID,
|
||||
@@ -1482,7 +1482,7 @@ class SyncHandler(object):
|
||||
if rooms_changed:
|
||||
return True
|
||||
|
||||
stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
|
||||
stream_id = since_token.room_key.stream
|
||||
for room_id in sync_result_builder.joined_room_ids:
|
||||
if self.store.has_room_changed_since(room_id, stream_id):
|
||||
return True
|
||||
@@ -1748,7 +1748,7 @@ class SyncHandler(object):
|
||||
continue
|
||||
|
||||
leave_token = now_token.copy_and_replace(
|
||||
"room_key", "s%d" % (event.stream_ordering,)
|
||||
"room_key", EventStreamToken(event.stream_ordering),
|
||||
)
|
||||
room_entries.append(
|
||||
RoomSyncResultBuilder(
|
||||
|
||||
@@ -42,7 +42,7 @@ from synapse.logging.utils import log_function
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import Collection, StreamToken, UserID
|
||||
from synapse.types import Collection, EventStreamToken, StreamToken, UserID
|
||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -112,7 +112,9 @@ class _NotifierUserStream(object):
|
||||
with PreserveLoggingContext():
|
||||
self.notify_deferred = ObservableDeferred(defer.Deferred())
|
||||
|
||||
def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
|
||||
def notify(
|
||||
self, stream_key: str, stream_id: Union[int, EventStreamToken], time_now_ms: int
|
||||
):
|
||||
"""Notify any listeners for this user of a new event from an
|
||||
event source.
|
||||
Args:
|
||||
@@ -187,7 +189,7 @@ class Notifier(object):
|
||||
self.store = hs.get_datastore()
|
||||
self.pending_new_room_events = (
|
||||
[]
|
||||
) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]]
|
||||
) # type: List[Tuple[EventStreamToken, EventBase, Collection[Union[str, UserID]]]]
|
||||
|
||||
# Called when there are new things to stream over replication
|
||||
self.replication_callbacks = [] # type: List[Callable[[], None]]
|
||||
@@ -245,8 +247,8 @@ class Notifier(object):
|
||||
def on_new_room_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
room_stream_id: int,
|
||||
max_room_stream_id: int,
|
||||
room_stream_id: EventStreamToken,
|
||||
max_room_stream_id: EventStreamToken,
|
||||
extra_users: Collection[Union[str, UserID]] = [],
|
||||
):
|
||||
""" Used by handlers to inform the notifier something has happened
|
||||
@@ -265,7 +267,7 @@ class Notifier(object):
|
||||
|
||||
self.notify_replication()
|
||||
|
||||
def _notify_pending_new_room_events(self, max_room_stream_id: int):
|
||||
def _notify_pending_new_room_events(self, max_room_stream_token: EventStreamToken):
|
||||
"""Notify for the room events that were queued waiting for a previous
|
||||
event to be persisted.
|
||||
Args:
|
||||
@@ -274,34 +276,34 @@ class Notifier(object):
|
||||
"""
|
||||
pending = self.pending_new_room_events
|
||||
self.pending_new_room_events = []
|
||||
for room_stream_id, event, extra_users in pending:
|
||||
if room_stream_id > max_room_stream_id:
|
||||
for room_stream_token, event, extra_users in pending:
|
||||
if room_stream_token > max_room_stream_token:
|
||||
self.pending_new_room_events.append(
|
||||
(room_stream_id, event, extra_users)
|
||||
(room_stream_token, event, extra_users)
|
||||
)
|
||||
else:
|
||||
self._on_new_room_event(event, room_stream_id, extra_users)
|
||||
self._on_new_room_event(event, max_room_stream_token, extra_users)
|
||||
|
||||
def _on_new_room_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
room_stream_id: int,
|
||||
event_stream_token: EventStreamToken,
|
||||
extra_users: Collection[Union[str, UserID]] = [],
|
||||
):
|
||||
"""Notify any user streams that are interested in this room event"""
|
||||
# poke any interested application service.
|
||||
run_as_background_process(
|
||||
"notify_app_services", self._notify_app_services, room_stream_id
|
||||
"notify_app_services", self._notify_app_services, event_stream_token.stream
|
||||
)
|
||||
|
||||
if self.federation_sender:
|
||||
self.federation_sender.notify_new_events(room_stream_id)
|
||||
self.federation_sender.notify_new_events(event_stream_token.stream)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
self._user_joined_room(event.state_key, event.room_id)
|
||||
|
||||
self.on_new_event(
|
||||
"room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
|
||||
"room_key", event_stream_token, users=extra_users, rooms=[event.room_id],
|
||||
)
|
||||
|
||||
async def _notify_app_services(self, room_stream_id: int):
|
||||
@@ -313,7 +315,7 @@ class Notifier(object):
|
||||
def on_new_event(
|
||||
self,
|
||||
stream_key: str,
|
||||
new_token: int,
|
||||
new_token: Union[int, EventStreamToken],
|
||||
users: Collection[Union[str, UserID]] = [],
|
||||
rooms: Collection[str] = [],
|
||||
):
|
||||
|
||||
@@ -178,7 +178,7 @@ class PusherPool:
|
||||
)
|
||||
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
|
||||
|
||||
async def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
async def on_new_notifications(self, min_stream_id: int, max_stream_id: int):
|
||||
if not self.pushers:
|
||||
# nothing to do here.
|
||||
return
|
||||
|
||||
@@ -125,11 +125,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
|
||||
logger.info("Got %d events from federation", len(event_and_contexts))
|
||||
|
||||
max_stream_id = await self.federation_handler.persist_events_and_notify(
|
||||
max_stream_token = await self.federation_handler.persist_events_and_notify(
|
||||
room_id, event_and_contexts, backfilled
|
||||
)
|
||||
|
||||
return 200, {"max_stream_id": max_stream_id}
|
||||
return 200, {"max_stream_id": str(max_stream_token)}
|
||||
|
||||
|
||||
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
|
||||
|
||||
@@ -86,7 +86,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
|
||||
remote_room_hosts, room_id, user_id, event_content
|
||||
)
|
||||
|
||||
return 200, {"event_id": event_id, "stream_id": stream_id}
|
||||
return 200, {"event_id": event_id, "stream_id": str(stream_id)}
|
||||
|
||||
|
||||
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
|
||||
@@ -146,11 +146,11 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
|
||||
request.authenticated_entity = requester.user.to_string()
|
||||
|
||||
# hopefully we're now on the master, so this won't recurse!
|
||||
event_id, stream_id = await self.member_handler.remote_reject_invite(
|
||||
event_id, stream_token = await self.member_handler.remote_reject_invite(
|
||||
invite_event_id, txn_id, requester, event_content,
|
||||
)
|
||||
|
||||
return 200, {"event_id": event_id, "stream_id": stream_id}
|
||||
return 200, {"event_id": event_id, "stream_id": str(stream_token)}
|
||||
|
||||
|
||||
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
|
||||
|
||||
@@ -116,11 +116,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
|
||||
)
|
||||
|
||||
stream_id = await self.event_creation_handler.persist_and_notify_client_event(
|
||||
stream_token = await self.event_creation_handler.persist_and_notify_client_event(
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||
)
|
||||
|
||||
return 200, {"stream_id": stream_id}
|
||||
return 200, {"stream_token": str(stream_token)}
|
||||
|
||||
|
||||
def register_servlets(hs, http_server):
|
||||
|
||||
@@ -29,6 +29,7 @@ from synapse.replication.tcp.streams.events import (
|
||||
EventsStreamEventRow,
|
||||
EventsStreamRow,
|
||||
)
|
||||
from synapse.types import EventStreamToken
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -152,7 +153,12 @@ class ReplicationDataHandler:
|
||||
if event.type == EventTypes.Member:
|
||||
extra_users = (event.state_key,)
|
||||
max_token = self.store.get_room_max_stream_ordering()
|
||||
self.notifier.on_new_room_event(event, token, max_token, extra_users)
|
||||
self.notifier.on_new_room_event(
|
||||
event,
|
||||
EventStreamToken(token),
|
||||
EventStreamToken(max_token),
|
||||
extra_users,
|
||||
)
|
||||
|
||||
await self.pusher_pool.on_new_notifications(token, token)
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ from typing import Any, List, Set, Tuple
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.types import EventStreamToken
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -51,7 +51,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
|
||||
)
|
||||
|
||||
def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
|
||||
token = RoomStreamToken.parse(token_str)
|
||||
token = EventStreamToken.parse(token_str)
|
||||
|
||||
# Tables that should be pruned:
|
||||
# event_auth
|
||||
|
||||
@@ -39,20 +39,22 @@ what sort order was used:
|
||||
import abc
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.types import EventStreamToken
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.api.filtering import Filter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -303,11 +305,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
async def get_room_events_stream_for_rooms(
|
||||
self,
|
||||
room_ids: Iterable[str],
|
||||
from_key: str,
|
||||
to_key: str,
|
||||
from_key: EventStreamToken,
|
||||
to_key: EventStreamToken,
|
||||
limit: int = 0,
|
||||
order: str = "DESC",
|
||||
) -> Dict[str, Tuple[List[EventBase], str]]:
|
||||
) -> Dict[str, Tuple[List[EventBase], EventStreamToken]]:
|
||||
"""Get new room events in stream ordering since `from_key`.
|
||||
|
||||
Args:
|
||||
@@ -326,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
- list of recent events in the room
|
||||
- stream ordering key for the start of the chunk of events returned.
|
||||
"""
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
from_id = from_key.stream
|
||||
|
||||
room_ids = self._events_stream_cache.get_entities_changed(room_ids, from_id)
|
||||
|
||||
@@ -356,15 +358,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
return results
|
||||
|
||||
def get_rooms_that_changed(self, room_ids, from_key):
|
||||
def get_rooms_that_changed(
|
||||
self, room_ids: Iterable[str], from_key: EventStreamToken,
|
||||
):
|
||||
"""Given a list of rooms and a token, return rooms where there may have
|
||||
been changes.
|
||||
|
||||
Args:
|
||||
room_ids (list)
|
||||
from_key (str): The room_key portion of a StreamToken
|
||||
room_ids
|
||||
from_key: The room_key portion of a StreamToken
|
||||
"""
|
||||
from_key = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
|
||||
return {
|
||||
room_id
|
||||
for room_id in room_ids
|
||||
@@ -374,11 +378,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
async def get_room_events_stream_for_room(
|
||||
self,
|
||||
room_id: str,
|
||||
from_key: str,
|
||||
to_key: str,
|
||||
from_key: EventStreamToken,
|
||||
to_key: EventStreamToken,
|
||||
limit: int = 0,
|
||||
order: str = "DESC",
|
||||
) -> Tuple[List[EventBase], str]:
|
||||
) -> Tuple[List[EventBase], EventStreamToken]:
|
||||
|
||||
"""Get new room events in stream ordering since `from_key`.
|
||||
|
||||
Args:
|
||||
@@ -399,8 +404,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
if from_key == to_key:
|
||||
return [], from_key
|
||||
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
to_id = RoomStreamToken.parse_stream_token(to_key).stream
|
||||
from_id = from_key.stream
|
||||
to_id = to_key.stream
|
||||
|
||||
has_changed = self._events_stream_cache.has_entity_changed(room_id, from_id)
|
||||
|
||||
@@ -432,7 +437,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
ret.reverse()
|
||||
|
||||
if rows:
|
||||
key = "s%d" % min(r.stream_ordering for r in rows)
|
||||
key = EventStreamToken(min(r.stream_ordering for r in rows))
|
||||
else:
|
||||
# Assume we didn't get anything because there was nothing to
|
||||
# get.
|
||||
@@ -440,9 +445,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
return ret, key
|
||||
|
||||
async def get_membership_changes_for_user(self, user_id, from_key, to_key):
|
||||
from_id = RoomStreamToken.parse_stream_token(from_key).stream
|
||||
to_id = RoomStreamToken.parse_stream_token(to_key).stream
|
||||
async def get_membership_changes_for_user(
|
||||
self, user_id: str, from_key: EventStreamToken, to_key: EventStreamToken
|
||||
) -> List[EventBase]:
|
||||
from_id = from_key.stream
|
||||
to_id = to_key.stream
|
||||
|
||||
if from_key == to_key:
|
||||
return []
|
||||
@@ -480,8 +487,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
return ret
|
||||
|
||||
async def get_recent_events_for_room(
|
||||
self, room_id: str, limit: int, end_token: str
|
||||
) -> Tuple[List[EventBase], str]:
|
||||
self, room_id: str, limit: int, end_token: EventStreamToken
|
||||
) -> Tuple[List[EventBase], EventStreamToken]:
|
||||
"""Get the most recent events in the room in topological ordering.
|
||||
|
||||
Args:
|
||||
@@ -507,8 +514,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
return (events, token)
|
||||
|
||||
async def get_recent_event_ids_for_room(
|
||||
self, room_id: str, limit: int, end_token: str
|
||||
) -> Tuple[List[_EventDictReturn], str]:
|
||||
self, room_id: str, limit: int, end_token: EventStreamToken
|
||||
) -> Tuple[List[_EventDictReturn], EventStreamToken]:
|
||||
"""Get the most recent events in the room in topological ordering.
|
||||
|
||||
Args:
|
||||
@@ -524,8 +531,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
if limit == 0:
|
||||
return [], end_token
|
||||
|
||||
end_token = RoomStreamToken.parse(end_token)
|
||||
|
||||
rows, token = await self.db_pool.runInteraction(
|
||||
"get_recent_event_ids_for_room",
|
||||
self._paginate_room_events_txn,
|
||||
@@ -568,7 +573,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
"get_room_event_before_stream_ordering", _f
|
||||
)
|
||||
|
||||
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
|
||||
def get_room_events_max_id(self) -> EventStreamToken:
|
||||
"""Returns the current token for rooms stream.
|
||||
|
||||
By default, it returns the current global stream token. Specifying a
|
||||
@@ -576,13 +581,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
token.
|
||||
"""
|
||||
token = self.get_room_max_stream_ordering()
|
||||
if room_id is None:
|
||||
return "s%d" % (token,)
|
||||
else:
|
||||
topo = await self.db_pool.runInteraction(
|
||||
"_get_max_topological_txn", self._get_max_topological_txn, room_id
|
||||
)
|
||||
return "t%d-%d" % (topo, token)
|
||||
return EventStreamToken(token)
|
||||
|
||||
async def get_stream_id_for_event(self, event_id: str) -> int:
|
||||
"""The stream ID for an event
|
||||
@@ -597,7 +596,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering"
|
||||
)
|
||||
|
||||
async def get_stream_token_for_event(self, event_id: str) -> str:
|
||||
async def get_stream_token_for_event(self, event_id: str) -> EventStreamToken:
|
||||
"""The stream token for an event
|
||||
Args:
|
||||
event_id: The id of the event to look up a stream token for.
|
||||
@@ -607,7 +606,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
A "s%d" stream token.
|
||||
"""
|
||||
stream_id = await self.get_stream_id_for_event(event_id)
|
||||
return "s%d" % (stream_id,)
|
||||
return EventStreamToken(stream_id)
|
||||
|
||||
async def get_topological_token_for_event(self, event_id: str) -> str:
|
||||
"""The stream token for an event
|
||||
@@ -676,8 +675,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
else:
|
||||
topo = None
|
||||
internal = event.internal_metadata
|
||||
internal.before = str(RoomStreamToken(topo, stream - 1))
|
||||
internal.after = str(RoomStreamToken(topo, stream))
|
||||
internal.before = str(EventStreamToken(topological=topo, stream=stream - 1))
|
||||
internal.after = str(EventStreamToken(topological=topo, stream=stream))
|
||||
internal.order = (int(topo) if topo else 0, int(stream))
|
||||
|
||||
async def get_events_around(
|
||||
@@ -749,12 +748,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
# Paginating backwards includes the event at the token, but paginating
|
||||
# forward doesn't.
|
||||
before_token = RoomStreamToken(
|
||||
results["topological_ordering"] - 1, results["stream_ordering"]
|
||||
before_token = EventStreamToken(
|
||||
topological=results["topological_ordering"] - 1,
|
||||
stream=results["stream_ordering"],
|
||||
)
|
||||
|
||||
after_token = RoomStreamToken(
|
||||
results["topological_ordering"], results["stream_ordering"]
|
||||
after_token = EventStreamToken(
|
||||
topological=results["topological_ordering"],
|
||||
stream=results["stream_ordering"],
|
||||
)
|
||||
|
||||
rows, start_token = self._paginate_room_events_txn(
|
||||
@@ -924,12 +925,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
self,
|
||||
txn,
|
||||
room_id: str,
|
||||
from_token: RoomStreamToken,
|
||||
to_token: Optional[RoomStreamToken] = None,
|
||||
from_token: EventStreamToken,
|
||||
to_token: Optional[EventStreamToken] = None,
|
||||
direction: str = "b",
|
||||
limit: int = -1,
|
||||
event_filter: Optional[Filter] = None,
|
||||
) -> Tuple[List[_EventDictReturn], str]:
|
||||
event_filter: Optional["Filter"] = None,
|
||||
) -> Tuple[List[_EventDictReturn], EventStreamToken]:
|
||||
"""Returns list of events before or after a given token.
|
||||
|
||||
Args:
|
||||
@@ -964,8 +965,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
bounds = generate_pagination_where_clause(
|
||||
direction=direction,
|
||||
column_names=("topological_ordering", "stream_ordering"),
|
||||
from_token=from_token,
|
||||
to_token=to_token,
|
||||
from_token=(from_token.topological, from_token.stream),
|
||||
to_token=(to_token.topological, to_token.stream) if to_token else None,
|
||||
engine=self.database_engine,
|
||||
)
|
||||
|
||||
@@ -1024,12 +1025,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
# when we are going backwards so we subtract one from the
|
||||
# stream part.
|
||||
toke -= 1
|
||||
next_token = RoomStreamToken(topo, toke)
|
||||
next_token = EventStreamToken(topological=topo, stream=toke)
|
||||
else:
|
||||
# TODO (erikj): We should work out what to do here instead.
|
||||
next_token = to_token if to_token else from_token
|
||||
|
||||
return rows, str(next_token)
|
||||
return rows, next_token
|
||||
|
||||
async def paginate_room_events(
|
||||
self,
|
||||
@@ -1058,9 +1059,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
and `to_key`).
|
||||
"""
|
||||
|
||||
from_key = RoomStreamToken.parse(from_key)
|
||||
from_key = EventStreamToken.parse(from_key)
|
||||
if to_key:
|
||||
to_key = RoomStreamToken.parse(to_key)
|
||||
to_key = EventStreamToken.parse(to_key)
|
||||
|
||||
rows, token = await self.db_pool.runInteraction(
|
||||
"paginate_room_events",
|
||||
|
||||
@@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.types import StateMap
|
||||
from synapse.types import EventStreamToken, StateMap
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -196,7 +196,7 @@ class EventsPersistenceStorage(object):
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
) -> int:
|
||||
) -> EventStreamToken:
|
||||
"""
|
||||
Write events to the database
|
||||
Args:
|
||||
@@ -226,11 +226,11 @@ class EventsPersistenceStorage(object):
|
||||
defer.gatherResults(deferreds, consumeErrors=True)
|
||||
)
|
||||
|
||||
return self.main_store.get_current_events_token()
|
||||
return EventStreamToken(self.main_store.get_current_events_token())
|
||||
|
||||
async def persist_event(
|
||||
self, event: EventBase, context: EventContext, backfilled: bool = False
|
||||
) -> Tuple[int, int]:
|
||||
) -> Tuple[EventStreamToken, EventStreamToken]:
|
||||
"""
|
||||
Returns:
|
||||
The stream ordering of `event`, and the stream ordering of the
|
||||
@@ -245,7 +245,10 @@ class EventsPersistenceStorage(object):
|
||||
await make_deferred_yieldable(deferred)
|
||||
|
||||
max_persisted_id = self.main_store.get_current_events_token()
|
||||
return (event.internal_metadata.stream_ordering, max_persisted_id)
|
||||
return (
|
||||
EventStreamToken(event.internal_metadata.stream_ordering),
|
||||
EventStreamToken(max_persisted_id),
|
||||
)
|
||||
|
||||
def _maybe_start_persisting(self, room_id: str):
|
||||
async def persisting_queue(item):
|
||||
|
||||
@@ -18,7 +18,7 @@ import re
|
||||
import string
|
||||
import sys
|
||||
from collections import namedtuple
|
||||
from typing import Any, Dict, Mapping, MutableMapping, Tuple, Type, TypeVar
|
||||
from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar
|
||||
|
||||
import attr
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
@@ -388,7 +388,7 @@ class StreamToken(
|
||||
while len(keys) < len(cls._fields):
|
||||
# i.e. old token from before receipt_key
|
||||
keys.append("0")
|
||||
return cls(*keys)
|
||||
return cls(EventStreamToken.parse(keys[0]), *keys[1:])
|
||||
except Exception:
|
||||
raise SynapseError(400, "Invalid Token")
|
||||
|
||||
@@ -399,10 +399,7 @@ class StreamToken(
|
||||
def room_stream_id(self):
|
||||
# TODO(markjh): Awful hack to work around hacks in the presence tests
|
||||
# which assume that the keys are integers.
|
||||
if type(self.room_key) is int:
|
||||
return self.room_key
|
||||
else:
|
||||
return int(self.room_key[1:].split("-")[-1])
|
||||
return self.room_key.stream
|
||||
|
||||
def is_after(self, other):
|
||||
"""Does this token contain events that the other doesn't?"""
|
||||
@@ -438,36 +435,18 @@ class StreamToken(
|
||||
return self._replace(**{key: new_value})
|
||||
|
||||
|
||||
StreamToken.START = StreamToken(*(["s0"] + ["0"] * (len(StreamToken._fields) - 1)))
|
||||
|
||||
|
||||
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
||||
"""Tokens are positions between events. The token "s1" comes after event 1.
|
||||
|
||||
s0 s1
|
||||
| |
|
||||
[0] V [1] V [2]
|
||||
|
||||
Tokens can either be a point in the live event stream or a cursor going
|
||||
through historic events.
|
||||
|
||||
When traversing the live event stream events are ordered by when they
|
||||
arrived at the homeserver.
|
||||
|
||||
When traversing historic events the events are ordered by their depth in
|
||||
the event graph "topological_ordering" and then by when they arrived at the
|
||||
homeserver "stream_ordering".
|
||||
|
||||
Live tokens start with an "s" followed by the "stream_ordering" id of the
|
||||
event it comes after. Historic tokens start with a "t" followed by the
|
||||
"topological_ordering" id of the event it comes after, followed by "-",
|
||||
followed by the "stream_ordering" id of the event it comes after.
|
||||
"""
|
||||
|
||||
__slots__ = [] # type: list
|
||||
@attr.s(eq=True, order=True, frozen=True, slots=True)
|
||||
class EventStreamToken:
|
||||
topological = attr.ib(
|
||||
type=Optional[int],
|
||||
kw_only=True,
|
||||
default=None,
|
||||
validator=attr.validators.optional(attr.validators.instance_of(int)),
|
||||
)
|
||||
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
||||
|
||||
@classmethod
|
||||
def parse(cls, string):
|
||||
def parse(cls, string: str) -> "EventStreamToken":
|
||||
try:
|
||||
if string[0] == "s":
|
||||
return cls(topological=None, stream=int(string[1:]))
|
||||
@@ -479,7 +458,7 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
||||
raise SynapseError(400, "Invalid token %r" % (string,))
|
||||
|
||||
@classmethod
|
||||
def parse_stream_token(cls, string):
|
||||
def parse_stream_token(cls, string: str) -> "EventStreamToken":
|
||||
try:
|
||||
if string[0] == "s":
|
||||
return cls(topological=None, stream=int(string[1:]))
|
||||
@@ -487,13 +466,16 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
||||
pass
|
||||
raise SynapseError(400, "Invalid token %r" % (string,))
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
if self.topological is not None:
|
||||
return "t%d-%d" % (self.topological, self.stream)
|
||||
else:
|
||||
return "s%d" % (self.stream,)
|
||||
|
||||
|
||||
StreamToken.START = StreamToken.from_string("s0_0")
|
||||
|
||||
|
||||
class ThirdPartyInstanceID(
|
||||
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user