Allow both fetch mechanisms in a single process

This commit is contained in:
Max Radermacher
2025-06-05 14:59:53 -05:00
committed by GitHub
parent 9fb39dfa8f
commit a2a49ebd5d
5 changed files with 57 additions and 52 deletions

View File

@@ -26,7 +26,6 @@ public class MessageFetcherJob {
owsPrecondition(CurrentAppContext().shouldProcessIncomingMessages)
owsPrecondition(CurrentAppContext().isNSE)
owsPrecondition(self.appReadiness.isAppReady)
owsPrecondition(!self.shouldUseWebSocket)
owsAssertDebug(DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered)
await self.startGroupMessageProcessorsIfNeeded()
@@ -37,25 +36,17 @@ public class MessageFetcherJob {
await SSKEnvironment.shared.groupMessageProcessorManagerRef.startAllProcessors()
}
private var shouldUseWebSocket: Bool {
return OWSChatConnection.canAppUseSocketsToMakeRequests
}
public var hasCompletedInitialFetch: Bool {
if shouldUseWebSocket {
return (
DependenciesBridge.shared.chatConnectionManager.identifiedConnectionState == .open &&
DependenciesBridge.shared.chatConnectionManager.hasEmptiedInitialQueue
)
} else {
return self.didFinishFetchingViaREST.get()
get async {
let chatConnectionManager = DependenciesBridge.shared.chatConnectionManager
return await chatConnectionManager.hasEmptiedInitialQueue || self.didFinishFetchingViaREST.get()
}
}
func preconditionForFetchingComplete() -> some Precondition {
return NotificationPrecondition(
notificationName: shouldUseWebSocket ? OWSChatConnection.chatConnectionStateDidChange : Self.didChangeStateNotificationName,
isSatisfied: { self.hasCompletedInitialFetch }
notificationNames: [OWSChatConnection.chatConnectionStateDidChange, Self.didChangeStateNotificationName],
isSatisfied: { await self.hasCompletedInitialFetch }
)
}

View File

@@ -7,7 +7,8 @@ import LibSignalClient
public class OWSMessageDecrypter {
private var senderIdsResetDuringCurrentBatch = NSMutableSet()
private let senderIdsResetDuringCurrentBatch = AtomicValue<Set<String>>(Set(), lock: .init())
private var placeholderCleanupTimer: Timer? {
didSet { oldValue?.invalidate() }
}
@@ -31,15 +32,16 @@ public class OWSMessageDecrypter {
@objc
func messageProcessorDidDrainQueue() {
// We don't want to send additional resets until we
// have received the "empty" response from the WebSocket
// or finished at least one REST fetch.
guard SSKEnvironment.shared.messageFetcherJobRef.hasCompletedInitialFetch else { return }
Task {
// We don't want to send additional resets until we have received the
// "empty" response from the WebSocket or finished at least one REST fetch.
guard await SSKEnvironment.shared.messageFetcherJobRef.hasCompletedInitialFetch else { return }
// We clear all recently reset sender ids any time the
// decryption queue has drained, so that any new messages
// that fail to decrypt will reset the session again.
senderIdsResetDuringCurrentBatch.removeAllObjects()
// We clear all recently reset sender ids any time the decryption queue has
// drained, so that any new messages that fail to decrypt will reset the
// session again.
senderIdsResetDuringCurrentBatch.update { $0.removeAll() }
}
}
private func trySendNullMessage(
@@ -354,9 +356,7 @@ public class OWSMessageDecrypter {
// resets. When the message decrypt queue is drained, the list of recently
// reset IDs is cleared.
let senderId = "\(sourceAci).\(sourceDeviceId)"
if !senderIdsResetDuringCurrentBatch.contains(senderId) {
senderIdsResetDuringCurrentBatch.add(senderId)
if senderIdsResetDuringCurrentBatch.update(block: { $0.insert(senderId).inserted }) {
// We don't reset sessions for messages sent to our PNI because those are
// receive-only & we don't send retries FROM our PNI back to the sender.

View File

@@ -17,7 +17,7 @@ public protocol ChatConnectionManager {
/// all connection tokens are released).
func waitUntilIdentifiedConnectionShouldBeClosed() async throws
var identifiedConnectionState: OWSChatConnectionState { get }
var hasEmptiedInitialQueue: Bool { get }
var hasEmptiedInitialQueue: Bool { get async }
func shouldWaitForSocketToMakeRequest(connectionType: OWSChatConnectionType) -> Bool
func requestConnections(shouldReconnectIfConnectedElsewhere: Bool) -> [OWSChatConnection.ConnectionToken]
@@ -95,7 +95,9 @@ public class ChatConnectionManagerImpl: ChatConnectionManager {
}
public var hasEmptiedInitialQueue: Bool {
connectionIdentified.hasEmptiedInitialQueue
get async {
return await connectionIdentified.hasEmptiedInitialQueue
}
}
}

View File

@@ -63,9 +63,10 @@ public class OWSChatConnection {
return .closed
}
// This var must be thread-safe.
public var hasEmptiedInitialQueue: Bool {
false
get async {
return false
}
}
fileprivate var logPrefix: String {
@@ -884,9 +885,15 @@ internal class OWSUnauthConnectionUsingLibSignal: OWSChatConnectionUsingLibSigna
}
internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<AuthenticatedChatConnection>, ChatConnectionListener {
private let _hasEmptiedInitialQueue = AtomicBool(false, lock: .sharedGlobal)
private var _hasEmptiedInitialQueue = false
override var hasEmptiedInitialQueue: Bool {
_hasEmptiedInitialQueue.get()
get async {
return await withCheckedContinuation { continuation in
serialQueue.async {
continuation.resume(returning: self._hasEmptiedInitialQueue)
}
}
}
}
private var _keepaliveSenderTask: Task<Void, Never>?
@@ -931,13 +938,8 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<
}
keepaliveSenderTask = makeKeepaliveTask(service)
case .closed:
// While _hasEmptiedInitialQueue is atomic, that's not sufficient to guarantee the
// *order* of writes. We do that by making sure we only set it on the serial queue,
// and then make sure libsignal's serialized callbacks result in scheduling on the
// serial queue.
keepaliveSenderTask = nil
_hasEmptiedInitialQueue.set(false)
Logger.debug("Reset _hasEmptiedInitialQueue")
_hasEmptiedInitialQueue = false
}
}
}
@@ -1036,8 +1038,8 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<
// We have since disconnected from the chat service instance that reported the empty queue.
return
}
let alreadyEmptied = self._hasEmptiedInitialQueue.swap(true)
Logger.debug("Initial queue emptied")
let alreadyEmptied = self._hasEmptiedInitialQueue
self._hasEmptiedInitialQueue = true
if !alreadyEmptied {
// This notification is used to wake up anything waiting for hasEmptiedInitialQueue.

View File

@@ -8,25 +8,35 @@ import Foundation
/// Waits until `isSatisfied()` returns true. Checks the initial result and
/// then re-checks the result each time notificationName fires.
public struct NotificationPrecondition: Precondition, Sendable {
private let notificationName: Notification.Name
private let isSatisfied: @Sendable () -> Bool
private let notificationNames: [Notification.Name]
private let isSatisfied: @Sendable () async -> Bool
public init(notificationName: Notification.Name, isSatisfied: @escaping @Sendable () -> Bool) {
self.notificationName = notificationName
public init(notificationName: Notification.Name, isSatisfied: @escaping @Sendable () async -> Bool) {
self.init(notificationNames: [notificationName], isSatisfied: isSatisfied)
}
public init(notificationNames: [Notification.Name], isSatisfied: @escaping @Sendable () async -> Bool) {
self.notificationNames = notificationNames
self.isSatisfied = isSatisfied
}
public func waitUntilSatisfied() async -> WaitResult {
let result = CancellableContinuation<Void>()
let observer = NotificationCenter.default.addObserver(forName: notificationName, object: nil, queue: nil, using: { _ in
if self.isSatisfied() {
return result.resume(with: .success(()))
}
})
defer {
NotificationCenter.default.removeObserver(observer)
let observers = self.notificationNames.map {
return NotificationCenter.default.addObserver(forName: $0, object: nil, queue: nil, using: { _ in
Task {
if await self.isSatisfied() {
result.resume(with: .success(()))
}
}
})
}
if isSatisfied() {
defer {
for observer in observers {
NotificationCenter.default.removeObserver(observer)
}
}
if await isSatisfied() {
return .satisfiedImmediately
}
do {