mirror of
https://github.com/signalapp/Signal-iOS.git
synced 2025-12-05 01:10:41 +00:00
Add Cron
This commit is contained in:
@@ -590,6 +590,7 @@
|
||||
50169695291B0627007AD709 /* ContactDiscoveryManagerTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 50169694291B0627007AD709 /* ContactDiscoveryManagerTest.swift */; };
|
||||
5018B9DD2ADF4157001DFB12 /* AuthedDevice.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5018B9DC2ADF4157001DFB12 /* AuthedDevice.swift */; };
|
||||
501AD1C42AF17A16001B796A /* ECKeyPairTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 501AD1C32AF17A16001B796A /* ECKeyPairTest.swift */; };
|
||||
501DAB1C2EBBCB5B00636157 /* Cron.swift in Sources */ = {isa = PBXBuildFile; fileRef = 501DAB1A2EBBB48800636157 /* Cron.swift */; };
|
||||
501E4DAB2D133F4400D883C7 /* CompletionSerializer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 501E4DAA2D133F4400D883C7 /* CompletionSerializer.swift */; };
|
||||
501E4DAE2D13439E00D883C7 /* CompletionSerializerTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 501E4DAC2D13439E00D883C7 /* CompletionSerializerTest.swift */; };
|
||||
501E78622CFE3E2700FD56C4 /* PhoneNumberCountry.swift in Sources */ = {isa = PBXBuildFile; fileRef = 501E78612CFE3E2700FD56C4 /* PhoneNumberCountry.swift */; };
|
||||
@@ -4560,6 +4561,7 @@
|
||||
5018B9DC2ADF4157001DFB12 /* AuthedDevice.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AuthedDevice.swift; sourceTree = "<group>"; };
|
||||
501AD1C32AF17A16001B796A /* ECKeyPairTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ECKeyPairTest.swift; sourceTree = "<group>"; };
|
||||
501D64FA28C027BA008D5993 /* OWSPaymentsLock.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = OWSPaymentsLock.swift; sourceTree = "<group>"; };
|
||||
501DAB1A2EBBB48800636157 /* Cron.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Cron.swift; sourceTree = "<group>"; };
|
||||
501E4DAA2D133F4400D883C7 /* CompletionSerializer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CompletionSerializer.swift; sourceTree = "<group>"; };
|
||||
501E4DAC2D13439E00D883C7 /* CompletionSerializerTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; name = CompletionSerializerTest.swift; path = SignalServiceKit/tests/Storage/Database/CompletionSerializerTest.swift; sourceTree = SOURCE_ROOT; };
|
||||
501E78612CFE3E2700FD56C4 /* PhoneNumberCountry.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PhoneNumberCountry.swift; sourceTree = "<group>"; };
|
||||
@@ -12638,6 +12640,7 @@
|
||||
F9C5CA85289453B100548EEE /* JobRecords */,
|
||||
D9F9A63A2BFFFCC400EF13EC /* BulkDeleteInteractionJobQueue.swift */,
|
||||
D9DB37F82B72A770007B16C8 /* CallRecordDeleteAllJobQueue.swift */,
|
||||
501DAB1A2EBBB48800636157 /* Cron.swift */,
|
||||
886A58C9276A760600A1099B /* DonationReceiptCredentialRedemptionJobQueue.swift */,
|
||||
4C9D347E23689E06006A4307 /* IncomingContactSyncJobQueue.swift */,
|
||||
5008FEBB2B1811A0004E73FD /* JobQueueRunner.swift */,
|
||||
@@ -17947,6 +17950,7 @@
|
||||
F9C5CDCA289453B400548EEE /* ContentProxy.swift in Sources */,
|
||||
500AF3AF2C58366700CB9F4F /* CooperativeTimeout.swift in Sources */,
|
||||
F962B38A293F9F1F00765BD8 /* CRC32.swift in Sources */,
|
||||
501DAB1C2EBBCB5B00636157 /* Cron.swift in Sources */,
|
||||
668A00D02C2B5E32007B8808 /* Cryptography.swift in Sources */,
|
||||
F9C5CDF4289453B400548EEE /* Currency.swift in Sources */,
|
||||
D97992A32D9E55FB0080A4F5 /* CurrencyFormatter.swift in Sources */,
|
||||
|
||||
@@ -118,7 +118,7 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
}
|
||||
|
||||
appReadiness.runNowOrWhenAppDidBecomeReadySync {
|
||||
self.refreshConnection(isAppActive: false)
|
||||
self.refreshConnection(isAppActive: false, shouldRunCron: false)
|
||||
}
|
||||
|
||||
clearAppropriateNotificationsAndRestoreBadgeCount()
|
||||
@@ -602,6 +602,9 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
owsPrecondition(!CurrentAppContext().isRunningTests)
|
||||
|
||||
let appContext = launchContext.appContext
|
||||
let dependenciesBridge = DependenciesBridge.shared
|
||||
let cron = dependenciesBridge.cron
|
||||
_ = cron
|
||||
|
||||
SignalApp.shared.performInitialSetup(appReadiness: appReadiness)
|
||||
|
||||
@@ -741,7 +744,7 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
// launching from the background, without this, we end up waiting some extra
|
||||
// seconds before receiving an actionable push notification.
|
||||
if !appContext.isMainAppAndActive {
|
||||
self.refreshConnection(isAppActive: false)
|
||||
self.refreshConnection(isAppActive: false, shouldRunCron: false)
|
||||
}
|
||||
|
||||
if tsRegistrationState.isRegistered {
|
||||
@@ -1319,7 +1322,7 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
refreshConnection(isAppActive: true)
|
||||
refreshConnection(isAppActive: true, shouldRunCron: true)
|
||||
|
||||
// Every time we become active...
|
||||
if tsRegistrationState.isRegistered {
|
||||
@@ -1350,16 +1353,44 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
DispatchQueue.main.asyncAfter(deadline: .now() + 1) { identityManager.tryToSyncQueuedVerificationStates() }
|
||||
}
|
||||
|
||||
// MARK: - Cron
|
||||
|
||||
private func runCron() async {
|
||||
let cron = DependenciesBridge.shared.cron
|
||||
let chatConnectionManager = DependenciesBridge.shared.chatConnectionManager
|
||||
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
|
||||
await cron.runOnce(ctx: CronContext(
|
||||
chatConnectionManager: chatConnectionManager,
|
||||
tsAccountManager: tsAccountManager,
|
||||
))
|
||||
}
|
||||
|
||||
// MARK: - Connections & Fetching
|
||||
|
||||
/// Tokens to keep the web socket open when the app is in the foreground.
|
||||
private var activeConnectionTokens = [OWSChatConnection.ConnectionToken]()
|
||||
|
||||
/// Task that should be continued/waited for/canceled in the background.
|
||||
@MainActor
|
||||
private var cronTask: Task<Void, Never>?
|
||||
|
||||
@MainActor
|
||||
private func startCronTask() {
|
||||
self.cronTask?.cancel()
|
||||
self.cronTask = Task {
|
||||
await self.runCron()
|
||||
if Task.isCancelled {
|
||||
return
|
||||
}
|
||||
self.cronTask = nil
|
||||
}
|
||||
}
|
||||
|
||||
/// A background fetching task that keeps the web socket open while the app
|
||||
/// is in the background.
|
||||
private var backgroundFetchHandle: BackgroundTaskHandle?
|
||||
|
||||
private func refreshConnection(isAppActive: Bool) {
|
||||
private func refreshConnection(isAppActive: Bool, shouldRunCron: Bool) {
|
||||
let chatConnectionManager = DependenciesBridge.shared.chatConnectionManager
|
||||
|
||||
let oldActiveConnectionTokens = self.activeConnectionTokens
|
||||
@@ -1367,7 +1398,9 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
// If we're active, open a connection.
|
||||
self.activeConnectionTokens = chatConnectionManager.requestConnections()
|
||||
oldActiveConnectionTokens.forEach { $0.releaseConnection() }
|
||||
|
||||
if shouldRunCron {
|
||||
self.startCronTask()
|
||||
}
|
||||
// We're back in the foreground. We've passed off connection management to
|
||||
// the foreground logic, so just tear it down without waiting for anything.
|
||||
self.backgroundFetchHandle?.interrupt()
|
||||
@@ -1376,6 +1409,7 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
let backgroundFetcher = DependenciesBridge.shared.backgroundMessageFetcherFactory.buildFetcher()
|
||||
self.activeConnectionTokens = []
|
||||
self.backgroundFetchHandle?.interrupt()
|
||||
let cronTask = self.cronTask.take()
|
||||
let startDate = MonotonicDate()
|
||||
let isPastRegistration = SignalApp.shared.conversationSplitViewController != nil
|
||||
self.backgroundFetchHandle = UIApplication.shared.beginBackgroundTask(
|
||||
@@ -1383,6 +1417,17 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
do {
|
||||
await backgroundFetcher.start()
|
||||
oldActiveConnectionTokens.forEach { $0.releaseConnection() }
|
||||
// If there's a Cron task running that was started in the foreground, wait
|
||||
// for it to finish.
|
||||
await withTaskCancellationHandler(
|
||||
operation: { await cronTask?.value },
|
||||
onCancel: { cronTask?.cancel() },
|
||||
)
|
||||
// If there's a fresh request to run Cron when entering the background,
|
||||
// start a new Cron instance.
|
||||
if shouldRunCron {
|
||||
await self.runCron()
|
||||
}
|
||||
// This will usually be limited to 30 seconds rather than 3 minutes.
|
||||
let waitDeadline = startDate.adding(180)
|
||||
if isPastRegistration {
|
||||
@@ -1717,6 +1762,11 @@ final class AppDelegate: UIResponder, UIApplicationDelegate {
|
||||
Task {
|
||||
try await StickerManager.downloadPendingSickerPacks()
|
||||
}
|
||||
|
||||
// Schedule a Cron run if we're in the foreground.
|
||||
if !self.activeConnectionTokens.isEmpty {
|
||||
self.startCronTask()
|
||||
}
|
||||
}
|
||||
|
||||
Self.updateApplicationShortcutItems(isRegistered: isRegistered)
|
||||
|
||||
@@ -51,6 +51,10 @@ public class AppEnvironment: NSObject {
|
||||
}
|
||||
|
||||
func setUp(appReadiness: AppReadiness, callService: CallService) {
|
||||
let dependenciesBridge = DependenciesBridge.shared
|
||||
let cron = dependenciesBridge.cron
|
||||
_ = cron
|
||||
|
||||
let backupAttachmentUploadEraStore = BackupAttachmentUploadEraStore()
|
||||
let backupNonceStore = BackupNonceMetadataStore()
|
||||
let backupSettingsStore = BackupSettingsStore()
|
||||
|
||||
@@ -183,16 +183,26 @@ class NotificationService: UNNotificationServiceExtension {
|
||||
return UNMutableNotificationContent()
|
||||
}
|
||||
|
||||
let cron = DependenciesBridge.shared.cron
|
||||
let cronCtx = CronContext(
|
||||
chatConnectionManager: DependenciesBridge.shared.chatConnectionManager,
|
||||
tsAccountManager: DependenciesBridge.shared.tsAccountManager,
|
||||
)
|
||||
|
||||
do {
|
||||
try await startProxyIfEnabled()
|
||||
defer { SignalProxy.stopRelayServer() }
|
||||
|
||||
let backgroundMessageFetcher = DependenciesBridge.shared.backgroundMessageFetcherFactory.buildFetcher()
|
||||
|
||||
await backgroundMessageFetcher.start()
|
||||
// Start Cron after we request a socket.
|
||||
async let cronResult: Void = cron.runOnce(ctx: cronCtx)
|
||||
let fetchResult = await Result(catching: {
|
||||
await backgroundMessageFetcher.start()
|
||||
try await backgroundMessageFetcher.waitForFetchingProcessingAndSideEffects()
|
||||
})
|
||||
// Wait for Cron to finish executing before we release the socket.
|
||||
await cronResult
|
||||
await backgroundMessageFetcher.stopAndWaitBeforeSuspending()
|
||||
try fetchResult.get()
|
||||
} catch is CancellationError {
|
||||
|
||||
@@ -19,6 +19,7 @@ public class RegistrationStateChangeManagerImpl: RegistrationStateChangeManager
|
||||
// TODO: Fix circular dependency.
|
||||
return DependenciesBridge.shared.chatConnectionManager
|
||||
}
|
||||
private let cron: Cron
|
||||
private let db: DB
|
||||
private let dmConfigurationStore: DisappearingMessagesConfigurationStore
|
||||
private let identityManager: OWSIdentityManager
|
||||
@@ -40,6 +41,7 @@ public class RegistrationStateChangeManagerImpl: RegistrationStateChangeManager
|
||||
backupCDNCredentialStore: BackupCDNCredentialStore,
|
||||
backupSubscriptionManager: BackupSubscriptionManager,
|
||||
backupTestFlightEntitlementManager: BackupTestFlightEntitlementManager,
|
||||
cron: Cron,
|
||||
db: DB,
|
||||
dmConfigurationStore: DisappearingMessagesConfigurationStore,
|
||||
identityManager: OWSIdentityManager,
|
||||
@@ -60,6 +62,7 @@ public class RegistrationStateChangeManagerImpl: RegistrationStateChangeManager
|
||||
self.backupCDNCredentialStore = backupCDNCredentialStore
|
||||
self.backupSubscriptionManager = backupSubscriptionManager
|
||||
self.backupTestFlightEntitlementManager = backupTestFlightEntitlementManager
|
||||
self.cron = cron
|
||||
self.db = db
|
||||
self.dmConfigurationStore = dmConfigurationStore
|
||||
self.identityManager = identityManager
|
||||
@@ -327,6 +330,7 @@ public class RegistrationStateChangeManagerImpl: RegistrationStateChangeManager
|
||||
versionedProfiles.clearProfileKeyCredentials(tx: tx)
|
||||
authCredentialStore.removeAllGroupAuthCredentials(tx: tx)
|
||||
authCredentialStore.removeAllCallLinkAuthCredentials(tx: tx)
|
||||
cron.resetMostRecentDates(tx: tx)
|
||||
|
||||
storageServiceManager.setLocalIdentifiers(LocalIdentifiers(aci: aci, pni: pni, e164: e164))
|
||||
|
||||
|
||||
@@ -224,6 +224,11 @@ extension AppSetup.GlobalsContinuation {
|
||||
remoteConfig: remoteConfig.netConfig(),
|
||||
)
|
||||
|
||||
let cron = Cron(
|
||||
appVersion: appVersion.currentAppVersion4,
|
||||
db: databaseStorage,
|
||||
)
|
||||
|
||||
let recipientDatabaseTable = RecipientDatabaseTable()
|
||||
let signalAccountStore = SignalAccountStoreImpl()
|
||||
let threadStore = ThreadStoreImpl()
|
||||
@@ -1099,6 +1104,7 @@ extension AppSetup.GlobalsContinuation {
|
||||
backupCDNCredentialStore: backupCDNCredentialStore,
|
||||
backupSubscriptionManager: backupSubscriptionManager,
|
||||
backupTestFlightEntitlementManager: backupTestFlightEntitlementManager,
|
||||
cron: cron,
|
||||
db: db,
|
||||
dmConfigurationStore: disappearingMessagesConfigurationStore,
|
||||
identityManager: identityManager,
|
||||
@@ -1658,6 +1664,7 @@ extension AppSetup.GlobalsContinuation {
|
||||
chatColorSettingStore: chatColorSettingStore,
|
||||
chatConnectionManager: chatConnectionManager,
|
||||
contactShareManager: contactShareManager,
|
||||
cron: cron,
|
||||
currentCallProvider: currentCallProvider,
|
||||
databaseChangeObserver: databaseStorage.databaseChangeObserver,
|
||||
db: db,
|
||||
|
||||
@@ -94,6 +94,7 @@ public class DependenciesBridge {
|
||||
public let chatColorSettingStore: ChatColorSettingStore
|
||||
public let chatConnectionManager: ChatConnectionManager
|
||||
public let contactShareManager: ContactShareManager
|
||||
public let cron: Cron
|
||||
public let currentCallProvider: any CurrentCallProvider
|
||||
public let databaseChangeObserver: DatabaseChangeObserver
|
||||
public let db: any DB
|
||||
@@ -232,6 +233,7 @@ public class DependenciesBridge {
|
||||
chatColorSettingStore: ChatColorSettingStore,
|
||||
chatConnectionManager: ChatConnectionManager,
|
||||
contactShareManager: ContactShareManager,
|
||||
cron: Cron,
|
||||
currentCallProvider: any CurrentCallProvider,
|
||||
databaseChangeObserver: DatabaseChangeObserver,
|
||||
db: any DB,
|
||||
@@ -369,6 +371,7 @@ public class DependenciesBridge {
|
||||
self.chatColorSettingStore = chatColorSettingStore
|
||||
self.chatConnectionManager = chatConnectionManager
|
||||
self.contactShareManager = contactShareManager
|
||||
self.cron = cron
|
||||
self.currentCallProvider = currentCallProvider
|
||||
self.databaseChangeObserver = databaseChangeObserver
|
||||
self.db = db
|
||||
|
||||
337
SignalServiceKit/Jobs/Cron.swift
Normal file
337
SignalServiceKit/Jobs/Cron.swift
Normal file
@@ -0,0 +1,337 @@
|
||||
//
|
||||
// Copyright 2025 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
public struct CronContext {
|
||||
public var chatConnectionManager: any ChatConnectionManager
|
||||
public var tsAccountManager: any TSAccountManager
|
||||
|
||||
public init(
|
||||
chatConnectionManager: any ChatConnectionManager,
|
||||
tsAccountManager: any TSAccountManager,
|
||||
) {
|
||||
self.chatConnectionManager = chatConnectionManager
|
||||
self.tsAccountManager = tsAccountManager
|
||||
}
|
||||
}
|
||||
|
||||
private let dateStore = NewKeyValueStore(collection: "Cron")
|
||||
|
||||
struct CronStore {
|
||||
private let uniqueKey: Cron.UniqueKey
|
||||
|
||||
init(uniqueKey: Cron.UniqueKey) {
|
||||
self.uniqueKey = uniqueKey
|
||||
}
|
||||
|
||||
/// The most recent completion date (or `.distantPast`).
|
||||
func mostRecentDate(tx: DBReadTransaction) -> Date {
|
||||
return dateStore.fetchValue(
|
||||
Date.self,
|
||||
forKey: self.uniqueKey.rawValue,
|
||||
tx: tx,
|
||||
) ?? .distantPast
|
||||
}
|
||||
|
||||
/// Marks the task as "complete".
|
||||
///
|
||||
/// - Parameter jitter: The maxmimum amount of random jitter added
|
||||
/// to/subtracted from `now`. Helps distribute load/avoid spikes.
|
||||
func setMostRecentDate(_ now: Date, jitter: TimeInterval, tx: DBWriteTransaction) {
|
||||
dateStore.writeValue(
|
||||
now.addingTimeInterval(TimeInterval.random(in: -jitter...jitter)),
|
||||
forKey: self.uniqueKey.rawValue,
|
||||
tx: tx,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
public class Cron {
|
||||
private let appVersion: AppVersionNumber4
|
||||
private let db: any DB
|
||||
private let metadataStore: NewKeyValueStore
|
||||
private let jobs: AtomicValue<[(CronContext) async -> Void]>
|
||||
|
||||
/// Unique keys that identify Cron jobs.
|
||||
///
|
||||
/// All state related to these keys is cleared when the app's version number
|
||||
/// changes. These are therefore safe to add/remove/rename without migrating
|
||||
/// anything that's been written to disk. (This statement is not true for
|
||||
/// local builds, but it's true for all TestFlight/App Store builds.)
|
||||
public enum UniqueKey: String {
|
||||
case none
|
||||
}
|
||||
|
||||
init(
|
||||
appVersion: AppVersionNumber4,
|
||||
db: any DB,
|
||||
) {
|
||||
self.appVersion = appVersion
|
||||
self.db = db
|
||||
self.metadataStore = NewKeyValueStore(collection: "CronM")
|
||||
self.jobs = AtomicValue([], lock: .init())
|
||||
}
|
||||
|
||||
/// Schedules `operation` to run periodically.
|
||||
///
|
||||
/// The `operation` will be run every `approximateInterval` seconds or so.
|
||||
/// It may be run more frequently than `approximateInterval` seconds, and
|
||||
/// therefore `operation`s must be safe to invoke at shorter intervals.
|
||||
///
|
||||
/// Guarantees:
|
||||
///
|
||||
/// - When `mustBe...` values are true, the job "waits" until the conditions
|
||||
/// are met before invoking `operation`. For example, if `mustBeConnected`
|
||||
/// is true, the job will wait until the web socket is connected before
|
||||
/// invoking `operation`.
|
||||
///
|
||||
/// - The `operation` will be re-run whenever the app's version number
|
||||
/// changes. This helps ensure that bugs fixed directly in Cron jobs are
|
||||
/// mitigated quickly in new versions, but it also ensures indirect bug
|
||||
/// fixes are mitigated quickly. (If you fix a bug on purpose, you can trust
|
||||
/// that users who update will apply the fix immediately; if you fix a bug
|
||||
/// without realizing it, users who update will also apply it immediately.)
|
||||
///
|
||||
/// - The `operation` is integrated with the UIBackgroundTask
|
||||
/// infrastructure; a background task assertion will be held whenever
|
||||
/// `operation` is executing, and `operation` will be canceled when
|
||||
/// background execution time expires.
|
||||
///
|
||||
/// - Parameter uniqueKey: The identifier for a job that's used to store the
|
||||
/// time at which the job was most recently executed.
|
||||
///
|
||||
/// - Parameter approximateInterval: The suggested interval between
|
||||
/// invocations of `operation`. It may run more quickly (e.g., when you
|
||||
/// update the app) or more slowly (e.g., you didn't launch the app for a
|
||||
/// week). The Cron system also imposes random jitter of ±5%.
|
||||
///
|
||||
/// - Parameter mustBeRegistered: If true, `operation` won't be invoked
|
||||
/// until the user is registered.
|
||||
///
|
||||
/// - Parameter mustBeConnected: If true, `operation` won't be invoked until
|
||||
/// the user is connected.
|
||||
///
|
||||
/// - Parameter isRetryable: Whether or not an error thrown by `operation`
|
||||
/// should be retried "quickly". If true, `operation` will be invoked
|
||||
/// repeatedly with exponential backoff, up to a maximum average backoff of
|
||||
/// `approximateInterval`. If false, this attempt will be marked complete,
|
||||
/// and the next attempt won't start until after `approximateInterval`.
|
||||
public func schedulePeriodically<E>(
|
||||
uniqueKey: UniqueKey,
|
||||
approximateInterval: TimeInterval,
|
||||
mustBeRegistered: Bool,
|
||||
mustBeConnected: Bool,
|
||||
isRetryable: @escaping (E) -> Bool = { $0.isRetryable },
|
||||
operation: @escaping () async throws(E) -> Void,
|
||||
) {
|
||||
let store = CronStore(uniqueKey: uniqueKey)
|
||||
scheduleFrequently(
|
||||
mustBeRegistered: mustBeRegistered,
|
||||
mustBeConnected: mustBeConnected,
|
||||
maxAverageBackoff: approximateInterval,
|
||||
isRetryable: isRetryable,
|
||||
operation: { [db] () async throws(E) -> Bool in
|
||||
let mostRecentDate = db.read(block: store.mostRecentDate(tx:))
|
||||
let earliestNextDate = mostRecentDate.addingTimeInterval(approximateInterval)
|
||||
if Date() < earliestNextDate {
|
||||
return false
|
||||
}
|
||||
Logger.info("job \(uniqueKey) starting")
|
||||
try await operation()
|
||||
return true
|
||||
},
|
||||
handleResult: { [db] result in
|
||||
switch result {
|
||||
case .failure(is NotRegisteredError), .success(false), .failure(is CancellationError):
|
||||
// A requirement (e.g., mustBeRegistered) wasn't met, it's too early to run
|
||||
// again, or we were canceled while running. Don't set any state so that we
|
||||
// run again at the next opportunity.
|
||||
break
|
||||
case .success(true), .failure(_):
|
||||
// We ran or hit a terminal error while trying to run; mark the job as
|
||||
// completed so that we wait for `approximateInterval` before retrying.
|
||||
Logger.info("job \(uniqueKey) reached terminal result: \(result)")
|
||||
await db.awaitableWrite { tx in
|
||||
store.setMostRecentDate(Date(), jitter: approximateInterval / 20, tx: tx)
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Schedules `operation` to run "frequently".
|
||||
///
|
||||
/// - Warning: Operations scheduled via this mechanism are executed
|
||||
/// extremely frequently and must implement their own logic to check whether
|
||||
/// or not it's necessary to execute. They should turn into no-ops most of
|
||||
/// the time. Most callers should prefer `schedulePeriodically`.
|
||||
///
|
||||
/// "Frequently" means that `operation` is executed every time the app
|
||||
/// launches, every time the app enters the foreground, every time the
|
||||
/// notification service is triggered, and every time the user registers. In
|
||||
/// the future, it may also mean that `operation` is executed during
|
||||
/// background app refresh and "content-available" pushes.
|
||||
///
|
||||
/// This method is similar to `Retry.performWithBackoff` and exposes many of
|
||||
/// the same parameters. However, whereas `Retry.performWithBackoff` stops
|
||||
/// entirely after encountering a non-`isRetryable` error, this method
|
||||
/// restarts automatically after the next "frequent" event.
|
||||
///
|
||||
/// The `operation` is integrated with the UIBackgroundTask infrastructure;
|
||||
/// a background task assertion will be held whenever `operation` is
|
||||
/// executing, and `operation` will be canceled when background execution
|
||||
/// time expires.
|
||||
///
|
||||
/// This method is a generalized version of `schedulePeriodically` that may
|
||||
/// be useful for callers who want to implement more complex triggers.
|
||||
///
|
||||
/// - Parameter mustBeRegistered: If true, `operation` won't be invoked
|
||||
/// until the user is registered.
|
||||
///
|
||||
/// - Parameter mustBeConnected: If true, `operation` won't be invoked until
|
||||
/// the user is connected.
|
||||
///
|
||||
/// - Parameter minAverageBackoff: See `Retry.performWithBackoff`.
|
||||
///
|
||||
/// - Parameter maxAverageBackoff: See `Retry.performWithBackoff`.
|
||||
///
|
||||
/// - Parameter isRetryable: Whether or not an error thrown by `operation`
|
||||
/// should be retried "quickly". If true, `operation` will be invoked
|
||||
/// repeatedly with exponential backoff. If false, this attempt will stop,
|
||||
/// `handleResult` will be invoked, and the next attempt won't start until
|
||||
/// the next "frequent" trigger.
|
||||
///
|
||||
/// - Parameter handleResult: Invoked when an "attempt" (started after a
|
||||
/// "frequent" event) reaches a terminal state. A "terminal state" is any
|
||||
/// outcome other than `operation` throwing an `isRetryable` error (e.g.,
|
||||
/// `operation` running to completion or being canceled while waiting for
|
||||
/// exponential backoff after an `isRetryable` error). The `Result` is `any
|
||||
/// Error` to handle `CancellationError`s (from `Retry` and waiting for the
|
||||
/// network) and `NotRegisteredError`s that may be thrown.
|
||||
public func scheduleFrequently<T, E>(
|
||||
mustBeRegistered: Bool,
|
||||
mustBeConnected: Bool,
|
||||
minAverageBackoff: TimeInterval = 2,
|
||||
maxAverageBackoff: TimeInterval = .infinity,
|
||||
isRetryable: @escaping (E) -> Bool = { $0.isRetryable },
|
||||
operation: @escaping () async throws(E) -> T,
|
||||
handleResult: @escaping (Result<T, any Error>) async -> Void,
|
||||
) {
|
||||
self.jobs.update {
|
||||
$0.append({ (ctx) async -> Void in
|
||||
let attemptResult = await Self.runOuterOperationAttempt(
|
||||
mustBeRegistered: mustBeRegistered,
|
||||
mustBeConnected: mustBeConnected,
|
||||
minAverageBackoff: minAverageBackoff,
|
||||
maxAverageBackoff: maxAverageBackoff,
|
||||
isRetryable: isRetryable,
|
||||
operation: operation,
|
||||
ctx: ctx,
|
||||
)
|
||||
await handleResult(attemptResult)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs an "outer" attempt.
|
||||
///
|
||||
/// An "outer" attempt may invoke `operation` multiple times. It's triggered
|
||||
/// by a "frequent" event (e.g., foregrounding the app). It uses
|
||||
/// Retry.performWithBackoff to execute `operation` until it succeeds or
|
||||
/// throws a non-`isRetryable` error.
|
||||
private static func runOuterOperationAttempt<T, E>(
|
||||
mustBeRegistered: Bool,
|
||||
mustBeConnected: Bool,
|
||||
minAverageBackoff: TimeInterval,
|
||||
maxAverageBackoff: TimeInterval,
|
||||
isRetryable: (E) -> Bool,
|
||||
operation: () async throws(E) -> T,
|
||||
ctx: CronContext,
|
||||
) async -> Result<T, any Error> {
|
||||
do {
|
||||
return try await Retry.performWithBackoff(
|
||||
maxAttempts: .max,
|
||||
minAverageBackoff: minAverageBackoff,
|
||||
maxAverageBackoff: maxAverageBackoff,
|
||||
isRetryable: isRetryable,
|
||||
block: { () throws(E) -> Result<T, any Error> in
|
||||
return try await runInnerOperationAttempt(
|
||||
mustBeRegistered: mustBeRegistered,
|
||||
mustBeConnected: mustBeConnected,
|
||||
operation: operation,
|
||||
ctx: ctx,
|
||||
)
|
||||
},
|
||||
)
|
||||
} catch {
|
||||
// We may have gotten a CancellationError from Retry, or we may have gotten
|
||||
// a non-`isRetryable` error. These are all terminal failures for this
|
||||
// attempt; we pass those to `handleResult` and stop executing until the
|
||||
// next time we're triggered.
|
||||
return .failure(error)
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs an "inner" attempt.
|
||||
///
|
||||
/// An "inner" attempt is a single invocation of `operation`. If "mustBe..."
|
||||
/// preconditions aren't satisfied, this method may throw an error before
|
||||
/// `operation` is invoked. All errors are immediately rethrown.
|
||||
private static func runInnerOperationAttempt<T, E>(
|
||||
mustBeRegistered: Bool,
|
||||
mustBeConnected: Bool,
|
||||
operation: () async throws(E) -> T,
|
||||
ctx: CronContext,
|
||||
) async throws(E) -> Result<T, any Error> {
|
||||
// Before each attempt, wait until the network is available.
|
||||
do throws(CancellationError) {
|
||||
if mustBeConnected {
|
||||
if mustBeRegistered {
|
||||
try await ctx.chatConnectionManager.waitForIdentifiedConnectionToOpen()
|
||||
} else {
|
||||
try await ctx.chatConnectionManager.waitForUnidentifiedConnectionToOpen()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
return .failure(error)
|
||||
}
|
||||
|
||||
// Before each attempt, check if we're registered.
|
||||
if mustBeRegistered, !ctx.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered {
|
||||
return .failure(NotRegisteredError())
|
||||
}
|
||||
|
||||
return .success(try await operation())
|
||||
}
|
||||
|
||||
private func checkForNewVersion() async {
|
||||
let appVersionKey = "AppVersion"
|
||||
|
||||
let mostRecentAppVersion = self.db.read { tx in
|
||||
return self.metadataStore.fetchValue(String.self, forKey: appVersionKey, tx: tx)
|
||||
}
|
||||
if mostRecentAppVersion != self.appVersion.wrappedValue.rawValue {
|
||||
await self.db.awaitableWrite { tx in
|
||||
self.resetMostRecentDates(tx: tx)
|
||||
self.metadataStore.writeValue(self.appVersion.wrappedValue.rawValue, forKey: appVersionKey, tx: tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func resetMostRecentDates(tx: DBWriteTransaction) {
|
||||
dateStore.removeAll(tx: tx)
|
||||
}
|
||||
|
||||
public func runOnce(ctx: CronContext) async {
|
||||
await self.checkForNewVersion()
|
||||
await withTaskGroup { taskGroup in
|
||||
for job in self.jobs.get() {
|
||||
taskGroup.addTask { await job(ctx) }
|
||||
}
|
||||
await taskGroup.waitForAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user