Compare commits

...

4 Commits

Author SHA1 Message Date
Erik Johnston
34bc0bec98 Change return type of persist_events 2020-09-02 16:15:23 +01:00
Erik Johnston
248257c130 Convert stuff 2020-09-02 16:11:45 +01:00
Erik Johnston
ba93cda363 Merge EventStreamToken and RoomStreamToken 2020-09-02 16:07:51 +01:00
Erik Johnston
07d8afc56c Add EventStreamToken class 2020-09-02 15:56:12 +01:00
21 changed files with 234 additions and 221 deletions

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING
import synapse.state
import synapse.storage
@@ -22,6 +23,9 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.ratelimiting import Ratelimiter
from synapse.types import UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -30,7 +34,7 @@ class BaseHandler(object):
Common base class for the event handlers.
"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
"""
Args:
hs (synapse.server.HomeServer):

View File

@@ -18,7 +18,7 @@ from typing import List
from synapse.api.constants import Membership
from synapse.events import FrozenEvent
from synapse.types import RoomStreamToken, StateMap
from synapse.types import EventStreamToken, StateMap
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -125,8 +125,8 @@ class AdminHandler(BaseHandler):
else:
stream_ordering = room.stream_ordering
from_key = str(RoomStreamToken(0, 0))
to_key = str(RoomStreamToken(None, stream_ordering))
from_key = str(EventStreamToken(stream=0, topological=0))
to_key = str(EventStreamToken(stream_ordering))
written_events = set() # Events that we've processed in this room

View File

@@ -28,7 +28,7 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
@@ -104,7 +104,7 @@ class DeviceWorkerHandler(BaseHandler):
@trace
@measure_func("device.get_user_ids_changed")
async def get_user_ids_changed(self, user_id, from_token):
async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
@@ -115,7 +115,7 @@ class DeviceWorkerHandler(BaseHandler):
set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = await self.store.get_room_events_max_id()
now_room_key = self.store.get_room_events_max_id()
room_ids = await self.store.get_rooms_for_user(user_id)
@@ -142,7 +142,7 @@ class DeviceWorkerHandler(BaseHandler):
)
rooms_changed.update(event.room_id for event in member_events)
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream
stream_ordering = from_token.room_key.stream
possibly_changed = set(changed)
possibly_left = set()

View File

@@ -73,6 +73,7 @@ from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRes
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
EventStreamToken,
JsonDict,
MutableStateMap,
StateMap,
@@ -1275,7 +1276,7 @@ class FederationHandler(BaseHandler):
async def do_invite_join(
self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
""" Attempts to join the `joinee` to the room `room_id` via the
servers contained in `target_hosts`.
@@ -1372,7 +1373,7 @@ class FederationHandler(BaseHandler):
await self._replication.wait_for_stream_position(
self.config.worker.events_shard_config.get_instance(room_id),
"events",
max_stream_id,
max_stream_id.stream,
)
# Check whether this room is the result of an upgrade of a room we already know
@@ -1632,7 +1633,7 @@ class FederationHandler(BaseHandler):
async def do_remotely_reject_invite(
self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict
) -> Tuple[EventBase, int]:
) -> Tuple[EventBase, EventStreamToken]:
origin, event, room_version = await self._make_and_verify_event(
target_hosts, room_id, user_id, "leave", content=content
)
@@ -1653,11 +1654,11 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify(
stream_token = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)
return event, stream_id
return event, stream_token
async def _make_and_verify_event(
self,
@@ -1964,7 +1965,7 @@ class FederationHandler(BaseHandler):
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
) -> int:
) -> EventStreamToken:
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event separately. Notifies about the persisted events
@@ -2916,7 +2917,7 @@ class FederationHandler(BaseHandler):
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
) -> EventStreamToken:
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2937,9 +2938,9 @@ class FederationHandler(BaseHandler):
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
return result["max_stream_id"]
return EventStreamToken.parse(result["max_stream_id"])
else:
max_stream_id = await self.storage.persistence.persist_events(
max_stream_token = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)
@@ -2950,12 +2951,12 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
await self._notify_persisted_event(event, max_stream_id)
await self._notify_persisted_event(event, max_stream_token)
return max_stream_id
return max_stream_token
async def _notify_persisted_event(
self, event: EventBase, max_stream_id: int
self, event: EventBase, max_stream_token: EventStreamToken,
) -> None:
"""Checks to see if notifier/pushers should be notified about the
event or not.
@@ -2981,12 +2982,14 @@ class FederationHandler(BaseHandler):
elif event.internal_metadata.is_outlier():
return
event_stream_id = event.internal_metadata.stream_ordering
event_stream_token = EventStreamToken(event.internal_metadata.stream_ordering)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
event, event_stream_token, max_stream_token, extra_users=extra_users
)
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
await self.pusher_pool.on_new_notifications(
event_stream_token.stream, max_stream_token.stream
)
async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the

View File

@@ -23,7 +23,7 @@ from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.types import EventStreamToken, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -164,7 +164,7 @@ class InitialSyncHandler(BaseHandler):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
room_end_token = EventStreamToken(event.stream_ordering)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)

View File

@@ -49,7 +49,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.types import (
EventStreamToken,
Requester,
RoomAlias,
StreamToken,
UserID,
create_requester,
)
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -644,7 +651,7 @@ class EventCreationHandler(object):
context: EventContext,
ratelimit: bool = True,
ignore_shadow_ban: bool = False,
) -> int:
) -> EventStreamToken:
"""
Persists and notifies local clients and federation of an event.
@@ -726,7 +733,7 @@ class EventCreationHandler(object):
ratelimit: bool = True,
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
) -> Tuple[EventBase, int]:
) -> Tuple[EventBase, EventStreamToken]:
"""
Creates an event, then sends it.
@@ -764,14 +771,14 @@ class EventCreationHandler(object):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
stream_id = await self.send_nonmember_event(
stream_token = await self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
return event, stream_id
return event, stream_token
@measure_func("create_new_client_event")
async def create_new_client_event(
@@ -845,7 +852,7 @@ class EventCreationHandler(object):
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> int:
) -> EventStreamToken:
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
@@ -915,15 +922,15 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
stream_id = result["stream_id"]
event.internal_metadata.stream_ordering = stream_id
return stream_id
stream_token = EventStreamToken.parse(result["stream_token"])
event.internal_metadata.stream_ordering = stream_token.stream
return stream_token
stream_id = await self.persist_and_notify_client_event(
stream_token = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
return stream_id
return stream_token
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
@@ -968,7 +975,7 @@ class EventCreationHandler(object):
context: EventContext,
ratelimit: bool = True,
extra_users: List[UserID] = [],
) -> int:
) -> EventStreamToken:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
@@ -1139,20 +1146,23 @@ class EventCreationHandler(object):
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
event, context=context
)
(
event_stream_token,
max_stream_token,
) = await self.storage.persistence.persist_event(event, context=context)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
await self.pusher_pool.on_new_notifications(
event_stream_token.stream, max_stream_token.stream
)
def _notify():
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
event, event_stream_token, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
@@ -1164,7 +1174,7 @@ class EventCreationHandler(object):
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
return event_stream_id
return event_stream_token
async def _bump_active_time(self, user: UserID) -> None:
try:

View File

@@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import Requester, RoomStreamToken
from synapse.types import EventStreamToken, Requester
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -338,8 +338,6 @@ class PaginationHandler(object):
)
room_token = pagin_config.from_token.room_key
room_token = RoomStreamToken.parse(room_token)
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
@@ -371,7 +369,7 @@ class PaginationHandler(object):
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
leave_token = EventStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)

View File

@@ -23,7 +23,7 @@ import math
import random
import string
from collections import OrderedDict
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from synapse.api.constants import (
EventTypes,
@@ -40,12 +40,12 @@ from synapse.events.utils import copy_power_levels_contents
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.storage.state import StateFilter
from synapse.types import (
EventStreamToken,
JsonDict,
MutableStateMap,
Requester,
RoomAlias,
RoomID,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
@@ -559,7 +559,7 @@ class RoomCreationHandler(BaseHandler):
config: JsonDict,
ratelimit: bool = True,
creator_join_profile: Optional[JsonDict] = None,
) -> Tuple[dict, int]:
) -> Tuple[dict, EventStreamToken]:
""" Creates a new room.
Args:
@@ -806,7 +806,7 @@ class RoomCreationHandler(BaseHandler):
await self._replication.wait_for_stream_position(
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
last_stream_id,
last_stream_id.stream,
)
return result, last_stream_id
@@ -822,7 +822,7 @@ class RoomCreationHandler(BaseHandler):
room_alias: Optional[RoomAlias] = None,
power_level_content_override: Optional[JsonDict] = None,
creator_join_profile: Optional[JsonDict] = None,
) -> int:
) -> EventStreamToken:
"""Sends the initial events into a new room.
`power_level_content_override` doesn't apply when initial state has
@@ -844,7 +844,7 @@ class RoomCreationHandler(BaseHandler):
return e
async def send(etype: str, content: JsonDict, **kwargs) -> int:
async def send(etype: str, content: JsonDict, **kwargs) -> EventStreamToken:
event = create(etype, content, **kwargs)
logger.debug("Sending %s in new room", etype)
# Allow these events to be sent even if the user is shadow-banned to
@@ -1093,20 +1093,19 @@ class RoomEventSource(object):
async def get_new_events(
self,
user: UserID,
from_key: str,
from_key: EventStreamToken,
limit: int,
room_ids: List[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[EventBase], str]:
explicit_room_id: str = None,
) -> Tuple[List[EventBase], EventStreamToken]:
# We just ignore the key for now.
to_key = self.get_current_key()
from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
from_key = "s%s" % (from_token.stream,)
from_key = EventStreamToken(from_key.stream)
app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
@@ -1135,17 +1134,14 @@ class RoomEventSource(object):
events[:] = events[:limit]
if events:
end_key = events[-1].internal_metadata.after
end_key = EventStreamToken.parse(events[-1].internal_metadata.after)
else:
end_key = to_key
return (events, end_key)
def get_current_key(self) -> str:
return "s%d" % (self.store.get_room_max_stream_ordering(),)
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
def get_current_key(self) -> EventStreamToken:
return EventStreamToken(self.store.get_room_max_stream_ordering(),)
class RoomShutdownHandler(object):
@@ -1244,7 +1240,7 @@ class RoomShutdownHandler(object):
room_creator_requester = create_requester(new_room_user_id)
info, stream_id = await self._room_creation_handler.create_room(
info, stream_token = await self._room_creation_handler.create_room(
room_creator_requester,
config={
"preset": RoomCreationPreset.PUBLIC_CHAT,
@@ -1265,7 +1261,7 @@ class RoomShutdownHandler(object):
await self._replication.wait_for_stream_position(
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
"events",
stream_id,
stream_token.stream,
)
else:
new_room_id = None
@@ -1283,7 +1279,7 @@ class RoomShutdownHandler(object):
try:
# Kick users from room
target_requester = create_requester(user_id)
_, stream_id = await self.room_member_handler.update_membership(
_, stream_token = await self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
@@ -1297,7 +1293,7 @@ class RoomShutdownHandler(object):
await self._replication.wait_for_stream_position(
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
stream_id,
stream_token.stream,
)
await self.room_member_handler.forget(target_requester.user, room_id)

View File

@@ -38,7 +38,15 @@ from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.types import (
EventStreamToken,
JsonDict,
Requester,
RoomAlias,
RoomID,
StateMap,
UserID,
)
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -106,7 +114,7 @@ class RoomMemberHandler(object):
room_id: str,
user: UserID,
content: dict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""Try and join a room that this server is not in
Args:
@@ -125,7 +133,7 @@ class RoomMemberHandler(object):
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""
Rejects an out-of-band invite we have received from a remote server
@@ -137,7 +145,7 @@ class RoomMemberHandler(object):
Normally an empty dict.
Returns:
event id, stream_id of the leave event
event id, stream token of the leave event
"""
raise NotImplementedError()
@@ -174,7 +182,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
user_id = target.to_string()
if content is None:
@@ -208,7 +216,7 @@ class RoomMemberHandler(object):
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
_, stream_id = await self.store.get_event_ordering(duplicate.event_id)
return duplicate.event_id, stream_id
return duplicate.event_id, EventStreamToken(stream_id)
prev_state_ids = await context.get_prev_state_ids()
@@ -235,7 +243,7 @@ class RoomMemberHandler(object):
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)
stream_id = await self.event_creation_handler.handle_new_client_event(
stream_token = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
@@ -250,7 +258,7 @@ class RoomMemberHandler(object):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
return event.event_id, stream_id
return event.event_id, stream_token
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
@@ -300,7 +308,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""Update a user's membership in a room.
Params:
@@ -356,7 +364,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
content_specified = bool(content)
if content is None:
content = {}
@@ -465,7 +473,7 @@ class RoomMemberHandler(object):
)
return (
old_state.event_id,
stream_id,
EventStreamToken(stream_id),
)
if old_membership in ["ban", "leave"] and action == "kick":
@@ -524,11 +532,11 @@ class RoomMemberHandler(object):
if requester.is_guest:
content["kind"] = "guest"
remote_join_response = await self._remote_join(
event_id, stream_token = await self._remote_join(
requester, remote_room_hosts, room_id, target, content
)
return remote_join_response
return event_id, stream_token
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
@@ -801,7 +809,7 @@ class RoomMemberHandler(object):
requester: Requester,
txn_id: Optional[str],
id_access_token: Optional[str] = None,
) -> int:
) -> EventStreamToken:
"""Invite a 3PID to a room.
Args:
@@ -859,11 +867,11 @@ class RoomMemberHandler(object):
if invitee:
# Note that update_membership with an action of "invite" can raise
# a ShadowBanError, but this was done above already.
_, stream_id = await self.update_membership(
_, stream_token = await self.update_membership(
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
)
else:
stream_id = await self._make_and_store_3pid_invite(
stream_token = await self._make_and_store_3pid_invite(
requester,
id_server,
medium,
@@ -874,7 +882,7 @@ class RoomMemberHandler(object):
id_access_token=id_access_token,
)
return stream_id
return stream_token
async def _make_and_store_3pid_invite(
self,
@@ -886,7 +894,7 @@ class RoomMemberHandler(object):
user: UserID,
txn_id: Optional[str],
id_access_token: Optional[str] = None,
) -> int:
) -> EventStreamToken:
room_state = await self.state_handler.get_current_state(room_id)
inviter_display_name = ""
@@ -1042,7 +1050,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id: str,
user: UserID,
content: dict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
@@ -1113,7 +1121,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""
Rejects an out-of-band invite received from a remote user
@@ -1127,10 +1135,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
fed_handler = self.federation_handler
try:
inviter_id = UserID.from_string(invite_event.sender)
event, stream_id = await fed_handler.do_remotely_reject_invite(
event, stream_token = await fed_handler.do_remotely_reject_invite(
[inviter_id.domain], room_id, target_user, content=content
)
return event.event_id, stream_id
return event.event_id, stream_token
except Exception as e:
# if we were unable to reject the invite, we will generate our own
# leave event.
@@ -1150,7 +1158,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""Generate a local invite rejection
This is called after we fail to reject an invite via a remote server. It
@@ -1216,10 +1224,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
stream_id = await self.event_creation_handler.handle_new_client_event(
stream_token = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
return event.event_id, stream_id
return event.event_id, stream_token
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room

View File

@@ -23,7 +23,7 @@ from synapse.replication.http.membership import (
ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
)
from synapse.types import Requester, UserID
from synapse.types import EventStreamToken, Requester, UserID
logger = logging.getLogger(__name__)
@@ -43,7 +43,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
room_id: str,
user: UserID,
content: dict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""Implements RoomMemberHandler._remote_join
"""
if len(remote_room_hosts) == 0:
@@ -59,7 +59,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
await self._user_joined_room(user, room_id)
return ret["event_id"], ret["stream_id"]
return ret["event_id"], EventStreamToken.parse(ret["stream_id"])
async def remote_reject_invite(
self,
@@ -67,7 +67,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
txn_id: Optional[str],
requester: Requester,
content: dict,
) -> Tuple[str, int]:
) -> Tuple[str, EventStreamToken]:
"""
Rejects an out-of-band invite received from a remote user
@@ -79,7 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
requester=requester,
content=content,
)
return ret["event_id"], ret["stream_id"]
return ret["event_id"], EventStreamToken.parse(ret["stream_id"])
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room

View File

@@ -30,9 +30,9 @@ from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
EventStreamToken,
JsonDict,
MutableStateMap,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
@@ -1482,7 +1482,7 @@ class SyncHandler(object):
if rooms_changed:
return True
stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
return True
@@ -1748,7 +1748,7 @@ class SyncHandler(object):
continue
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
"room_key", EventStreamToken(event.stream_ordering),
)
room_entries.append(
RoomSyncResultBuilder(

View File

@@ -42,7 +42,7 @@ from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import Collection, StreamToken, UserID
from synapse.types import Collection, EventStreamToken, StreamToken, UserID
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -112,7 +112,9 @@ class _NotifierUserStream(object):
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
def notify(
self, stream_key: str, stream_id: Union[int, EventStreamToken], time_now_ms: int
):
"""Notify any listeners for this user of a new event from an
event source.
Args:
@@ -187,7 +189,7 @@ class Notifier(object):
self.store = hs.get_datastore()
self.pending_new_room_events = (
[]
) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]]
) # type: List[Tuple[EventStreamToken, EventBase, Collection[Union[str, UserID]]]]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -245,8 +247,8 @@ class Notifier(object):
def on_new_room_event(
self,
event: EventBase,
room_stream_id: int,
max_room_stream_id: int,
room_stream_id: EventStreamToken,
max_room_stream_id: EventStreamToken,
extra_users: Collection[Union[str, UserID]] = [],
):
""" Used by handlers to inform the notifier something has happened
@@ -265,7 +267,7 @@ class Notifier(object):
self.notify_replication()
def _notify_pending_new_room_events(self, max_room_stream_id: int):
def _notify_pending_new_room_events(self, max_room_stream_token: EventStreamToken):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
Args:
@@ -274,34 +276,34 @@ class Notifier(object):
"""
pending = self.pending_new_room_events
self.pending_new_room_events = []
for room_stream_id, event, extra_users in pending:
if room_stream_id > max_room_stream_id:
for room_stream_token, event, extra_users in pending:
if room_stream_token > max_room_stream_token:
self.pending_new_room_events.append(
(room_stream_id, event, extra_users)
(room_stream_token, event, extra_users)
)
else:
self._on_new_room_event(event, room_stream_id, extra_users)
self._on_new_room_event(event, max_room_stream_token, extra_users)
def _on_new_room_event(
self,
event: EventBase,
room_stream_id: int,
event_stream_token: EventStreamToken,
extra_users: Collection[Union[str, UserID]] = [],
):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_as_background_process(
"notify_app_services", self._notify_app_services, room_stream_id
"notify_app_services", self._notify_app_services, event_stream_token.stream
)
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
self.federation_sender.notify_new_events(event_stream_token.stream)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)
self.on_new_event(
"room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
"room_key", event_stream_token, users=extra_users, rooms=[event.room_id],
)
async def _notify_app_services(self, room_stream_id: int):
@@ -313,7 +315,7 @@ class Notifier(object):
def on_new_event(
self,
stream_key: str,
new_token: int,
new_token: Union[int, EventStreamToken],
users: Collection[Union[str, UserID]] = [],
rooms: Collection[str] = [],
):

View File

@@ -178,7 +178,7 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
async def on_new_notifications(self, min_stream_id, max_stream_id):
async def on_new_notifications(self, min_stream_id: int, max_stream_id: int):
if not self.pushers:
# nothing to do here.
return

View File

@@ -125,11 +125,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
logger.info("Got %d events from federation", len(event_and_contexts))
max_stream_id = await self.federation_handler.persist_events_and_notify(
max_stream_token = await self.federation_handler.persist_events_and_notify(
room_id, event_and_contexts, backfilled
)
return 200, {"max_stream_id": max_stream_id}
return 200, {"max_stream_id": str(max_stream_token)}
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):

View File

@@ -86,7 +86,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
remote_room_hosts, room_id, user_id, event_content
)
return 200, {"event_id": event_id, "stream_id": stream_id}
return 200, {"event_id": event_id, "stream_id": str(stream_id)}
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
@@ -146,11 +146,11 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
request.authenticated_entity = requester.user.to_string()
# hopefully we're now on the master, so this won't recurse!
event_id, stream_id = await self.member_handler.remote_reject_invite(
event_id, stream_token = await self.member_handler.remote_reject_invite(
invite_event_id, txn_id, requester, event_content,
)
return 200, {"event_id": event_id, "stream_id": stream_id}
return 200, {"event_id": event_id, "stream_id": str(stream_token)}
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):

View File

@@ -116,11 +116,11 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
stream_id = await self.event_creation_handler.persist_and_notify_client_event(
stream_token = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
return 200, {"stream_id": stream_id}
return 200, {"stream_token": str(stream_token)}
def register_servlets(hs, http_server):

View File

@@ -29,6 +29,7 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.types import EventStreamToken
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure
@@ -152,7 +153,12 @@ class ReplicationDataHandler:
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)
self.notifier.on_new_room_event(
event,
EventStreamToken(token),
EventStreamToken(max_token),
extra_users,
)
await self.pusher_pool.on_new_notifications(token, token)

View File

@@ -19,7 +19,7 @@ from typing import Any, List, Set, Tuple
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken
from synapse.types import EventStreamToken
logger = logging.getLogger(__name__)
@@ -51,7 +51,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
)
def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
token = RoomStreamToken.parse(token_str)
token = EventStreamToken.parse(token_str)
# Tables that should be pruned:
# event_auth

View File

@@ -39,20 +39,22 @@ what sort order was used:
import abc
import logging
from collections import namedtuple
from typing import Dict, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
from twisted.internet import defer
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import RoomStreamToken
from synapse.types import EventStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
if TYPE_CHECKING:
from synapse.api.filtering import Filter
logger = logging.getLogger(__name__)
@@ -303,11 +305,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_room_events_stream_for_rooms(
self,
room_ids: Iterable[str],
from_key: str,
to_key: str,
from_key: EventStreamToken,
to_key: EventStreamToken,
limit: int = 0,
order: str = "DESC",
) -> Dict[str, Tuple[List[EventBase], str]]:
) -> Dict[str, Tuple[List[EventBase], EventStreamToken]]:
"""Get new room events in stream ordering since `from_key`.
Args:
@@ -326,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
- list of recent events in the room
- stream ordering key for the start of the chunk of events returned.
"""
from_id = RoomStreamToken.parse_stream_token(from_key).stream
from_id = from_key.stream
room_ids = self._events_stream_cache.get_entities_changed(room_ids, from_id)
@@ -356,15 +358,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return results
def get_rooms_that_changed(self, room_ids, from_key):
def get_rooms_that_changed(
self, room_ids: Iterable[str], from_key: EventStreamToken,
):
"""Given a list of rooms and a token, return rooms where there may have
been changes.
Args:
room_ids (list)
from_key (str): The room_key portion of a StreamToken
room_ids
from_key: The room_key portion of a StreamToken
"""
from_key = RoomStreamToken.parse_stream_token(from_key).stream
return {
room_id
for room_id in room_ids
@@ -374,11 +378,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_room_events_stream_for_room(
self,
room_id: str,
from_key: str,
to_key: str,
from_key: EventStreamToken,
to_key: EventStreamToken,
limit: int = 0,
order: str = "DESC",
) -> Tuple[List[EventBase], str]:
) -> Tuple[List[EventBase], EventStreamToken]:
"""Get new room events in stream ordering since `from_key`.
Args:
@@ -399,8 +404,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if from_key == to_key:
return [], from_key
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
from_id = from_key.stream
to_id = to_key.stream
has_changed = self._events_stream_cache.has_entity_changed(room_id, from_id)
@@ -432,7 +437,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ret.reverse()
if rows:
key = "s%d" % min(r.stream_ordering for r in rows)
key = EventStreamToken(min(r.stream_ordering for r in rows))
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -440,9 +445,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret, key
async def get_membership_changes_for_user(self, user_id, from_key, to_key):
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
async def get_membership_changes_for_user(
self, user_id: str, from_key: EventStreamToken, to_key: EventStreamToken
) -> List[EventBase]:
from_id = from_key.stream
to_id = to_key.stream
if from_key == to_key:
return []
@@ -480,8 +487,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret
async def get_recent_events_for_room(
self, room_id: str, limit: int, end_token: str
) -> Tuple[List[EventBase], str]:
self, room_id: str, limit: int, end_token: EventStreamToken
) -> Tuple[List[EventBase], EventStreamToken]:
"""Get the most recent events in the room in topological ordering.
Args:
@@ -507,8 +514,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return (events, token)
async def get_recent_event_ids_for_room(
self, room_id: str, limit: int, end_token: str
) -> Tuple[List[_EventDictReturn], str]:
self, room_id: str, limit: int, end_token: EventStreamToken
) -> Tuple[List[_EventDictReturn], EventStreamToken]:
"""Get the most recent events in the room in topological ordering.
Args:
@@ -524,8 +531,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if limit == 0:
return [], end_token
end_token = RoomStreamToken.parse(end_token)
rows, token = await self.db_pool.runInteraction(
"get_recent_event_ids_for_room",
self._paginate_room_events_txn,
@@ -568,7 +573,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f
)
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
def get_room_events_max_id(self) -> EventStreamToken:
"""Returns the current token for rooms stream.
By default, it returns the current global stream token. Specifying a
@@ -576,13 +581,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
token.
"""
token = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
else:
topo = await self.db_pool.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
return EventStreamToken(token)
async def get_stream_id_for_event(self, event_id: str) -> int:
"""The stream ID for an event
@@ -597,7 +596,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering"
)
async def get_stream_token_for_event(self, event_id: str) -> str:
async def get_stream_token_for_event(self, event_id: str) -> EventStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
@@ -607,7 +606,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
A "s%d" stream token.
"""
stream_id = await self.get_stream_id_for_event(event_id)
return "s%d" % (stream_id,)
return EventStreamToken(stream_id)
async def get_topological_token_for_event(self, event_id: str) -> str:
"""The stream token for an event
@@ -676,8 +675,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
else:
topo = None
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
internal.before = str(EventStreamToken(topological=topo, stream=stream - 1))
internal.after = str(EventStreamToken(topological=topo, stream=stream))
internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around(
@@ -749,12 +748,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1, results["stream_ordering"]
before_token = EventStreamToken(
topological=results["topological_ordering"] - 1,
stream=results["stream_ordering"],
)
after_token = RoomStreamToken(
results["topological_ordering"], results["stream_ordering"]
after_token = EventStreamToken(
topological=results["topological_ordering"],
stream=results["stream_ordering"],
)
rows, start_token = self._paginate_room_events_txn(
@@ -924,12 +925,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self,
txn,
room_id: str,
from_token: RoomStreamToken,
to_token: Optional[RoomStreamToken] = None,
from_token: EventStreamToken,
to_token: Optional[EventStreamToken] = None,
direction: str = "b",
limit: int = -1,
event_filter: Optional[Filter] = None,
) -> Tuple[List[_EventDictReturn], str]:
event_filter: Optional["Filter"] = None,
) -> Tuple[List[_EventDictReturn], EventStreamToken]:
"""Returns list of events before or after a given token.
Args:
@@ -964,8 +965,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
bounds = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
from_token=from_token,
to_token=to_token,
from_token=(from_token.topological, from_token.stream),
to_token=(to_token.topological, to_token.stream) if to_token else None,
engine=self.database_engine,
)
@@ -1024,12 +1025,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
next_token = EventStreamToken(topological=topo, stream=toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, str(next_token)
return rows, next_token
async def paginate_room_events(
self,
@@ -1058,9 +1059,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and `to_key`).
"""
from_key = RoomStreamToken.parse(from_key)
from_key = EventStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
to_key = EventStreamToken.parse(to_key)
rows, token = await self.db_pool.runInteraction(
"paginate_room_events",

View File

@@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
from synapse.types import StateMap
from synapse.types import EventStreamToken, StateMap
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.metrics import Measure
@@ -196,7 +196,7 @@ class EventsPersistenceStorage(object):
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
) -> EventStreamToken:
"""
Write events to the database
Args:
@@ -226,11 +226,11 @@ class EventsPersistenceStorage(object):
defer.gatherResults(deferreds, consumeErrors=True)
)
return self.main_store.get_current_events_token()
return EventStreamToken(self.main_store.get_current_events_token())
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[int, int]:
) -> Tuple[EventStreamToken, EventStreamToken]:
"""
Returns:
The stream ordering of `event`, and the stream ordering of the
@@ -245,7 +245,10 @@ class EventsPersistenceStorage(object):
await make_deferred_yieldable(deferred)
max_persisted_id = self.main_store.get_current_events_token()
return (event.internal_metadata.stream_ordering, max_persisted_id)
return (
EventStreamToken(event.internal_metadata.stream_ordering),
EventStreamToken(max_persisted_id),
)
def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item):

View File

@@ -18,7 +18,7 @@ import re
import string
import sys
from collections import namedtuple
from typing import Any, Dict, Mapping, MutableMapping, Tuple, Type, TypeVar
from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar
import attr
from signedjson.key import decode_verify_key_bytes
@@ -388,7 +388,7 @@ class StreamToken(
while len(keys) < len(cls._fields):
# i.e. old token from before receipt_key
keys.append("0")
return cls(*keys)
return cls(EventStreamToken.parse(keys[0]), *keys[1:])
except Exception:
raise SynapseError(400, "Invalid Token")
@@ -399,10 +399,7 @@ class StreamToken(
def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests
# which assume that the keys are integers.
if type(self.room_key) is int:
return self.room_key
else:
return int(self.room_key[1:].split("-")[-1])
return self.room_key.stream
def is_after(self, other):
"""Does this token contain events that the other doesn't?"""
@@ -438,36 +435,18 @@ class StreamToken(
return self._replace(**{key: new_value})
StreamToken.START = StreamToken(*(["s0"] + ["0"] * (len(StreamToken._fields) - 1)))
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
"""Tokens are positions between events. The token "s1" comes after event 1.
s0 s1
| |
[0] V [1] V [2]
Tokens can either be a point in the live event stream or a cursor going
through historic events.
When traversing the live event stream events are ordered by when they
arrived at the homeserver.
When traversing historic events the events are ordered by their depth in
the event graph "topological_ordering" and then by when they arrived at the
homeserver "stream_ordering".
Live tokens start with an "s" followed by the "stream_ordering" id of the
event it comes after. Historic tokens start with a "t" followed by the
"topological_ordering" id of the event it comes after, followed by "-",
followed by the "stream_ordering" id of the event it comes after.
"""
__slots__ = [] # type: list
@attr.s(eq=True, order=True, frozen=True, slots=True)
class EventStreamToken:
topological = attr.ib(
type=Optional[int],
kw_only=True,
default=None,
validator=attr.validators.optional(attr.validators.instance_of(int)),
)
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
@classmethod
def parse(cls, string):
def parse(cls, string: str) -> "EventStreamToken":
try:
if string[0] == "s":
return cls(topological=None, stream=int(string[1:]))
@@ -479,7 +458,7 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
raise SynapseError(400, "Invalid token %r" % (string,))
@classmethod
def parse_stream_token(cls, string):
def parse_stream_token(cls, string: str) -> "EventStreamToken":
try:
if string[0] == "s":
return cls(topological=None, stream=int(string[1:]))
@@ -487,13 +466,16 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
pass
raise SynapseError(400, "Invalid token %r" % (string,))
def __str__(self):
def __str__(self) -> str:
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:
return "s%d" % (self.stream,)
StreamToken.START = StreamToken.from_string("s0_0")
class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
):