Merge branch 'database-refactor' into quote-standardise

This commit is contained in:
Ryan Zhao 2022-08-08 09:22:15 +10:00
commit d4237828a8
27 changed files with 627 additions and 273 deletions

View File

@ -27,7 +27,7 @@ PODS:
- DifferenceKit/Core (1.2.0)
- DifferenceKit/UIKitExtension (1.2.0):
- DifferenceKit/Core
- GRDB.swift/SQLCipher (5.24.1):
- GRDB.swift/SQLCipher (5.26.0):
- SQLCipher (>= 3.4.0)
- libwebp (1.2.1):
- libwebp/demux (= 1.2.1)
@ -222,7 +222,7 @@ SPEC CHECKSUMS:
CryptoSwift: a532e74ed010f8c95f611d00b8bbae42e9fe7c17
Curve25519Kit: e63f9859ede02438ae3defc5e1a87e09d1ec7ee6
DifferenceKit: 5659c430bb7fe45876fa32ce5cba5d6167f0c805
GRDB.swift: b3180ce2135fc06a453297889b746b1478c4d8c7
GRDB.swift: 1395cb3556df6b16ed69dfc74c3886abc75d2825
libwebp: 98a37e597e40bfdb4c911fc98f2c53d0b12d05fc
Nimble: 5316ef81a170ce87baf72dd961f22f89a602ff84
NVActivityIndicatorView: 1f6c5687f1171810aa27a3296814dc2d7dec3667

View File

@ -6818,7 +6818,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 363;
CURRENT_PROJECT_VERSION = 365;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = (
"$(inherited)",
@ -6890,7 +6890,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 363;
CURRENT_PROJECT_VERSION = 365;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = (
"$(inherited)",

View File

@ -520,16 +520,18 @@ extension ConversationVC:
let threadId: String = self.viewModel.threadData.threadId
let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant
let threadIsMessageRequest: Bool = (self.viewModel.threadData.threadIsMessageRequest == true)
let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: .outgoing,
timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000))
)
Storage.shared.writeAsync { db in
TypingIndicators.didStartTyping(
db,
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: .outgoing,
timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000))
)
if needsToStartTypingIndicator {
Storage.shared.writeAsync { db in
TypingIndicators.start(db, threadId: threadId, direction: .outgoing)
}
}
}

View File

@ -428,15 +428,34 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
// MARK: - Functions
public func updateDraft(to draft: String) {
let threadId: String = self.threadId
let currentDraft: String = Storage.shared
.read { db in
try SessionThread
.select(.messageDraft)
.filter(id: threadId)
.asRequest(of: String.self)
.fetchOne(db)
}
.defaulting(to: "")
// Only write the updated draft to the database if it's changed (avoid unnecessary writes)
guard draft != currentDraft else { return }
Storage.shared.writeAsync { db in
try SessionThread
.filter(id: self.threadId)
.filter(id: threadId)
.updateAll(db, SessionThread.Columns.messageDraft.set(to: draft))
}
}
public func markAllAsRead() {
guard let lastInteractionId: Int64 = self.threadData.interactionId else { return }
// Don't bother marking anything as read if there are no unread interactions (we can rely
// on the 'threadData.threadUnreadCount' to always be accurate)
guard
(self.threadData.threadUnreadCount ?? 0) > 0,
let lastInteractionId: Int64 = self.threadData.interactionId
else { return }
let threadId: String = self.threadData.threadId
let trySendReadReceipt: Bool = (self.threadData.threadIsMessageRequest == false)

View File

@ -59,8 +59,8 @@ final class InputViewButton : UIView {
isUserInteractionEnabled = true
widthConstraint.isActive = true
heightConstraint.isActive = true
let tint = isSendButton ? UIColor.black : Colors.text
let iconImageView = UIImageView(image: icon.withTint(tint))
let iconImageView = UIImageView(image: icon.withRenderingMode(.alwaysTemplate))
iconImageView.tintColor = (isSendButton ? UIColor.black : Colors.text)
iconImageView.contentMode = .scaleAspectFit
let iconSize = InputViewButton.iconSize
iconImageView.set(.width, to: iconSize)

View File

@ -28,8 +28,8 @@ final class CallMessageView: UIView {
// Image view
let imageView: UIImageView = UIImageView(
image: UIImage(named: "Phone")?
.resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize))?
.withRenderingMode(.alwaysTemplate)
.resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize))
)
imageView.tintColor = textColor
imageView.contentMode = .center

View File

@ -27,11 +27,11 @@ final class DeletedMessageView: UIView {
private func setUpViewHierarchy(textColor: UIColor) {
// Image view
let icon = UIImage(named: "ic_trash")?
.withRenderingMode(.alwaysTemplate)
.resizedImage(to: CGSize(
width: DeletedMessageView.iconSize,
height: DeletedMessageView.iconSize
))
))?
.withRenderingMode(.alwaysTemplate)
let imageView = UIImageView(image: icon)
imageView.tintColor = textColor

View File

@ -44,13 +44,13 @@ final class MediaPlaceholderView: UIView {
// Image view
let imageView = UIImageView(
image: UIImage(named: iconName)?
.withRenderingMode(.alwaysTemplate)
.resizedImage(
to: CGSize(
width: MediaPlaceholderView.iconSize,
height: MediaPlaceholderView.iconSize
)
)
)?
.withRenderingMode(.alwaysTemplate)
)
imageView.tintColor = textColor
imageView.contentMode = .center

View File

@ -68,8 +68,8 @@ final class OpenGroupInvitationView: UIView {
let iconImageViewSize = OpenGroupInvitationView.iconImageViewSize
let iconImageView = UIImageView(
image: UIImage(named: iconName)?
.resizedImage(to: CGSize(width: iconSize, height: iconSize))?
.withRenderingMode(.alwaysTemplate)
.resizedImage(to: CGSize(width: iconSize, height: iconSize))
)
iconImageView.tintColor = .white
iconImageView.contentMode = .center

View File

@ -122,6 +122,9 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
/// `appDidFinishLaunching` seems to fix this odd behaviour (even though it doesn't match
/// Apple's documentation on the matter)
UNUserNotificationCenter.current().delegate = self
// Resume database
NotificationCenter.default.post(name: Database.resumeNotification, object: self)
}
func applicationDidEnterBackground(_ application: UIApplication) {
@ -130,6 +133,10 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
// NOTE: Fix an edge case where user taps on the callkit notification
// but answers the call on another device
stopPollers(shouldStopUserPoller: !self.hasIncomingCallWaiting())
JobRunner.stopAndClearPendingJobs()
// Suspend database
NotificationCenter.default.post(name: Database.suspendNotification, object: self)
}
func applicationDidReceiveMemoryWarning(_ application: UIApplication) {
@ -185,8 +192,16 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
// MARK: - Background Fetching
func application(_ application: UIApplication, performFetchWithCompletionHandler completionHandler: @escaping (UIBackgroundFetchResult) -> Void) {
// Resume database
NotificationCenter.default.post(name: Database.resumeNotification, object: self)
AppReadiness.runNowOrWhenAppDidBecomeReady {
BackgroundPoller.poll(completionHandler: completionHandler)
BackgroundPoller.poll { result in
// Suspend database
NotificationCenter.default.post(name: Database.suspendNotification, object: self)
completionHandler(result)
}
}
}

View File

@ -137,9 +137,6 @@ class HighlightMentionBackgroundView: UIView {
extraYOffset
)
// We don't want to draw too far to the right
runBounds.size.width = (runBounds.width > lineWidth ? lineWidth : runBounds.width)
let path = UIBezierPath(roundedRect: runBounds, cornerRadius: cornerRadius)
mentionBackgroundColor.setFill()
path.fill()

View File

@ -9,8 +9,11 @@ import SessionUtilitiesKit
public final class BackgroundPoller {
private static var promises: [Promise<Void>] = []
private static var isValid: Bool = false
public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) {
BackgroundPoller.isValid = true
promises = []
.appending(pollForMessages())
.appending(contentsOf: pollForClosedGroupMessages())
@ -32,7 +35,11 @@ public final class BackgroundPoller {
let poller: OpenGroupAPI.Poller = OpenGroupAPI.Poller(for: server)
poller.stop()
return poller.poll(isBackgroundPoll: true, isPostCapabilitiesRetry: false)
return poller.poll(
isBackgroundPoll: true,
isBackgroundPollerValid: { BackgroundPoller.isValid },
isPostCapabilitiesRetry: false
)
}
)
@ -41,6 +48,7 @@ public final class BackgroundPoller {
// after 25 seconds allowing us to cancel all pending promises
let cancelTimer: Timer = Timer.scheduledTimerOnMainThread(withTimeInterval: 25, repeats: false) { timer in
timer.invalidate()
BackgroundPoller.isValid = false
guard promises.contains(where: { !$0.isResolved }) else { return }
@ -50,6 +58,9 @@ public final class BackgroundPoller {
when(resolved: promises)
.done { _ in
// If we have already invalidated the timer then do nothing (we essentially timed out)
guard cancelTimer.isValid else { return }
cancelTimer.invalidate()
completionHandler(.newData)
}
@ -88,7 +99,8 @@ public final class BackgroundPoller {
groupPublicKey,
on: DispatchQueue.main,
maxRetryCount: 0,
isBackgroundPoll: true
isBackgroundPoll: true,
isBackgroundPollValid: { BackgroundPoller.isValid }
)
}
}
@ -100,44 +112,45 @@ public final class BackgroundPoller {
return SnodeAPI.getMessages(from: snode, associatedWith: publicKey)
.then(on: DispatchQueue.main) { messages -> Promise<Void> in
guard !messages.isEmpty else { return Promise.value(()) }
guard !messages.isEmpty, BackgroundPoller.isValid else { return Promise.value(()) }
var jobsToRun: [Job] = []
Storage.shared.write { db in
var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:]
messages.forEach { message in
do {
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message)
let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId)
threadMessages[key] = (threadMessages[key] ?? [])
.appending(processedMessage?.messageInfo)
}
catch {
switch error {
// Ignore duplicate & selfSend message errors (and don't bother logging
// them as there will be a lot since we each service node duplicates messages)
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
MessageReceiverError.duplicateMessage,
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
messages
.compactMap { message -> ProcessedMessage? in
do {
return try Message.processRawReceivedMessage(db, rawMessage: message)
}
catch {
switch error {
// Ignore duplicate & selfSend message errors (and don't bother
// logging them as there will be a lot since we each service node
// duplicates messages)
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
MessageReceiverError.duplicateMessage,
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
// In the background ignore 'SQLITE_ABORT' (it generally means
// the BackgroundPoller has timed out
case DatabaseError.SQLITE_ABORT: break
default: SNLog("Failed to deserialize envelope due to error: \(error).")
}
default: SNLog("Failed to deserialize envelope due to error: \(error).")
return nil
}
}
}
threadMessages
.grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) }
.forEach { threadId, threadMessages in
let maybeJob: Job? = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: threadId,
details: MessageReceiveJob.Details(
messages: threadMessages,
messages: threadMessages.map { $0.messageInfo },
isBackgroundPoll: true
)
)

View File

@ -59,3 +59,37 @@ public struct Capability: Codable, FetchableRecord, PersistableRecord, TableReco
self.isMissing = isMissing
}
}
extension Capability.Variant {
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: SingleValueDecodingContainer = try decoder.singleValueContainer()
let valueString: String = try container.decode(String.self)
// FIXME: Remove this code
// There was a point where we didn't have custom Codable handling for the Capability.Variant
// which resulted in the data being encoded into the database as a JSON dict - this code catches
// that case and extracts the standard string value so it can be processed the same as the
// "proper" custom Codable logic)
if valueString.starts(with: "{") {
self = Capability.Variant(
from: valueString
.replacingOccurrences(of: "\":{}}", with: "")
.replacingOccurrences(of: "\"}}", with: "")
.replacingOccurrences(of: "{\"unsupported\":{\"_0\":\"", with: "")
.replacingOccurrences(of: "{\"", with: "")
)
return
}
// FIXME: Remove this code ^^^
self = Capability.Variant(from: valueString)
}
public func encode(to encoder: Encoder) throws {
var container: SingleValueEncodingContainer = encoder.singleValueContainer()
try container.encode(rawValue)
}
}

View File

@ -4,60 +4,14 @@ import Foundation
extension OpenGroupAPI {
public struct Capabilities: Codable, Equatable {
public enum Capability: Equatable, CaseIterable, Codable {
public static var allCases: [Capability] {
[.sogs, .blind]
}
case sogs
case blind
/// Fallback case if the capability isn't supported by this version of the app
case unsupported(String)
// MARK: - Convenience
public var rawValue: String {
switch self {
case .unsupported(let originalValue): return originalValue
default: return "\(self)"
}
}
// MARK: - Initialization
public init(from valueString: String) {
let maybeValue: Capability? = Capability.allCases.first { $0.rawValue == valueString }
self = (maybeValue ?? .unsupported(valueString))
}
}
public let capabilities: [Capability]
public let missing: [Capability]?
public let capabilities: [Capability.Variant]
public let missing: [Capability.Variant]?
// MARK: - Initialization
public init(capabilities: [Capability], missing: [Capability]? = nil) {
public init(capabilities: [Capability.Variant], missing: [Capability.Variant]? = nil) {
self.capabilities = capabilities
self.missing = missing
}
}
}
extension OpenGroupAPI.Capabilities.Capability {
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: SingleValueDecodingContainer = try decoder.singleValueContainer()
let valueString: String = try container.decode(String.self)
self = OpenGroupAPI.Capabilities.Capability(from: valueString)
}
public func encode(to encoder: Encoder) throws {
var container: SingleValueEncodingContainer = encoder.singleValueContainer()
try container.encode(rawValue)
}
}

View File

@ -10,6 +10,7 @@ extension OpenGroupAPI {
case sender = "session_id"
case posted
case edited
case deleted
case seqNo = "seqno"
case whisper
case whisperMods = "whisper_mods"
@ -23,6 +24,7 @@ extension OpenGroupAPI {
public let sender: String?
public let posted: TimeInterval
public let edited: TimeInterval?
public let deleted: Bool?
public let seqNo: Int64
public let whisper: Bool
public let whisperMods: Bool
@ -79,6 +81,7 @@ extension OpenGroupAPI.Message {
sender: try? container.decode(String.self, forKey: .sender),
posted: try container.decode(TimeInterval.self, forKey: .posted),
edited: try? container.decode(TimeInterval.self, forKey: .edited),
deleted: try? container.decode(Bool.self, forKey: .deleted),
seqNo: try container.decode(Int64.self, forKey: .seqNo),
whisper: ((try? container.decode(Bool.self, forKey: .whisper)) ?? false),
whisperMods: ((try? container.decode(Bool.self, forKey: .whisperMods)) ?? false),

View File

@ -348,7 +348,7 @@ public final class OpenGroupManager: NSObject {
capabilities.capabilities.forEach { capability in
_ = try? Capability(
openGroupServer: server.lowercased(),
variant: Capability.Variant(from: capability.rawValue),
variant: capability,
isMissing: false
)
.saved(db)
@ -356,7 +356,7 @@ public final class OpenGroupManager: NSObject {
capabilities.missing?.forEach { capability in
_ = try? Capability(
openGroupServer: server.lowercased(),
variant: Capability.Variant(from: capability.rawValue),
variant: capability,
isMissing: true
)
.saved(db)
@ -499,9 +499,12 @@ public final class OpenGroupManager: NSObject {
}
let sortedMessages: [OpenGroupAPI.Message] = messages
.filter { $0.deleted != true }
.sorted { lhs, rhs in lhs.id < rhs.id }
let messageServerIdsToRemove: [Int64] = messages
.filter { $0.deleted == true }
.map { $0.id }
let seqNo: Int64? = sortedMessages.map { $0.seqNo }.max()
var messageServerIdsToRemove: [UInt64] = []
// Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId')
if let seqNo: Int64 = seqNo {
@ -515,11 +518,7 @@ public final class OpenGroupManager: NSObject {
guard
let base64EncodedString: String = message.base64EncodedData,
let data = Data(base64Encoded: base64EncodedString)
else {
// A message with no data has been deleted so add it to the list to remove
messageServerIdsToRemove.append(UInt64(message.id))
return
}
else { return }
do {
let processedMessage: ProcessedMessage? = try Message.processReceivedOpenGroupMessage(

View File

@ -13,8 +13,7 @@ extension MessageReceiver {
switch message.kind {
case .started:
TypingIndicators.didStartTyping(
db,
let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart(
threadId: thread.id,
threadVariant: thread.variant,
threadIsMessageRequest: thread.isMessageRequest(db),
@ -22,6 +21,10 @@ extension MessageReceiver {
timestampMs: message.sentTimestamp.map { Int64($0) }
)
if needsToStartTypingIndicator {
TypingIndicators.start(db, threadId: thread.id, direction: .incoming)
}
case .stopped:
TypingIndicators.didStopTyping(db, threadId: thread.id, direction: .incoming)

View File

@ -291,7 +291,7 @@ public final class MessageSender {
errorCount += 1
guard errorCount == promiseCount else { return } // Only error out if all promises failed
Storage.shared.write { db in
Storage.shared.read { db in
handleFailure(db, with: .other(error))
}
}
@ -300,7 +300,7 @@ public final class MessageSender {
.catch(on: DispatchQueue.global(qos: .default)) { error in
SNLog("Couldn't send message due to error: \(error).")
Storage.shared.write { db in
Storage.shared.read { db in
handleFailure(db, with: .other(error))
}
}
@ -447,7 +447,7 @@ public final class MessageSender {
}
}
.catch(on: DispatchQueue.global(qos: .default)) { error in
dependencies.storage.write { db in
dependencies.storage.read { db in
handleFailure(db, with: .other(error))
}
}
@ -557,7 +557,7 @@ public final class MessageSender {
}
}
.catch(on: DispatchQueue.global(qos: .default)) { error in
dependencies.storage.write { db in
dependencies.storage.read { db in
handleFailure(db, with: .other(error))
}
}
@ -652,15 +652,34 @@ public final class MessageSender {
with error: MessageSenderError,
interactionId: Int64?
) {
// Mark any "sending" recipients as "failed"
_ = try? RecipientState
// Check if we need to mark any "sending" recipients as "failed"
//
// Note: The 'db' could be either read-only or writeable so we determine
// if a change is required, and if so dispatch to a separate queue for the
// actual write
let rowIds: [Int64] = (try? RecipientState
.select(Column.rowID)
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state == RecipientState.State.sending)
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.failed),
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription)
)
.asRequest(of: Int64.self)
.fetchAll(db))
.defaulting(to: [])
guard !rowIds.isEmpty else { return }
// Need to dispatch to a different thread to prevent a potential db re-entrancy
// issue from occuring in some cases
DispatchQueue.global(qos: .background).async {
Storage.shared.write { db in
try RecipientState
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.failed),
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription)
)
}
}
}
// MARK: - Convenience

View File

@ -152,6 +152,7 @@ public final class ClosedGroupPoller {
on queue: DispatchQueue = SessionSnodeKit.Threading.workQueue,
maxRetryCount: UInt = 0,
isBackgroundPoll: Bool = false,
isBackgroundPollValid: @escaping (() -> Bool) = { true },
poller: ClosedGroupPoller? = nil
) -> Promise<Void> {
let promise: Promise<Void> = SnodeAPI.getSwarm(for: groupPublicKey)
@ -160,9 +161,10 @@ public final class ClosedGroupPoller {
guard let snode = swarm.randomElement() else { return Promise(error: Error.insufficientSnodes) }
return attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) {
guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else {
return Promise(error: Error.pollingCanceled)
}
guard
(isBackgroundPoll && isBackgroundPollValid()) ||
poller?.isPolling.wrappedValue[groupPublicKey] == true
else { return Promise(error: Error.pollingCanceled) }
let promises: [Promise<[SnodeReceivedMessage]>] = {
if SnodeAPI.hardfork >= 19 && SnodeAPI.softfork >= 1 {
@ -181,9 +183,13 @@ public final class ClosedGroupPoller {
return when(resolved: promises)
.then(on: queue) { messageResults -> Promise<Void> in
guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) }
guard
(isBackgroundPoll && isBackgroundPollValid()) ||
poller?.isPolling.wrappedValue[groupPublicKey] == true
else { return Promise.value(()) }
var promises: [Promise<Void>] = []
var jobToRun: Job? = nil
let allMessages: [SnodeReceivedMessage] = messageResults
.reduce([]) { result, next in
switch next {
@ -192,8 +198,16 @@ public final class ClosedGroupPoller {
}
}
var messageCount: Int = 0
let totalMessagesCount: Int = allMessages.count
// No need to do anything if there are no messages
guard !allMessages.isEmpty else {
if !isBackgroundPoll {
SNLog("Received no new messages in closed group with public key: \(groupPublicKey)")
}
return Promise.value(())
}
// Otherwise process the messages and add them to the queue for handling
Storage.shared.write { db in
let processedMessages: [ProcessedMessage] = allMessages
.compactMap { message -> ProcessedMessage? in
@ -209,6 +223,14 @@ public final class ClosedGroupPoller {
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
// In the background ignore 'SQLITE_ABORT' (it generally means
// the BackgroundPoller has timed out
case DatabaseError.SQLITE_ABORT:
guard !isBackgroundPoll else { break }
SNLog("Failed to the database being suspended (running in background with no background task).")
break
default: SNLog("Failed to deserialize envelope due to error: \(error).")
}
@ -219,7 +241,7 @@ public final class ClosedGroupPoller {
messageCount = processedMessages.count
let jobToRun: Job? = Job(
jobToRun = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: groupPublicKey,
@ -232,35 +254,29 @@ public final class ClosedGroupPoller {
// If we are force-polling then add to the JobRunner so they are persistent and will retry on
// the next app run if they fail but don't let them auto-start
JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll)
// We want to try to handle the receive jobs immediately in the background
if isBackgroundPoll {
promises = promises.appending(
jobToRun.map { job -> Promise<Void> in
let (promise, seal) = Promise<Void>.pending()
// Note: In the background we just want jobs to fail silently
MessageReceiveJob.run(
job,
queue: queue,
success: { _, _ in seal.fulfill(()) },
failure: { _, _, _ in seal.fulfill(()) },
deferred: { _ in seal.fulfill(()) }
)
return promise
}
)
}
}
if !isBackgroundPoll {
if totalMessagesCount > 0 {
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(totalMessagesCount - messageCount))")
}
else {
SNLog("Received no new messages in closed group with public key: \(groupPublicKey)")
}
if isBackgroundPoll {
// We want to try to handle the receive jobs immediately in the background
promises = promises.appending(
jobToRun.map { job -> Promise<Void> in
let (promise, seal) = Promise<Void>.pending()
// Note: In the background we just want jobs to fail silently
MessageReceiveJob.run(
job,
queue: queue,
success: { _, _ in seal.fulfill(()) },
failure: { _, _, _ in seal.fulfill(()) },
deferred: { _ in seal.fulfill(()) }
)
return promise
}
)
}
else {
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(allMessages.count - messageCount))")
}
return when(fulfilled: promises)

View File

@ -8,6 +8,8 @@ import SessionUtilitiesKit
extension OpenGroupAPI {
public final class Poller {
typealias PollResponse = [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)]
private let server: String
private var timer: Timer? = nil
private var hasStarted = false
@ -71,6 +73,7 @@ extension OpenGroupAPI {
@discardableResult
public func poll(
isBackgroundPoll: Bool,
isBackgroundPollerValid: @escaping (() -> Bool) = { true },
isPostCapabilitiesRetry: Bool,
using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()
) -> Promise<Void> {
@ -83,8 +86,14 @@ extension OpenGroupAPI {
Threading.pollerQueue.async {
dependencies.storage
.read { db in
OpenGroupAPI
.read { db -> Promise<(Int64, PollResponse)> in
let failureCount: Int64 = (try? OpenGroup
.select(max(OpenGroup.Columns.pollFailureCount))
.asRequest(of: Int64.self)
.fetchOne(db))
.defaulting(to: 0)
return OpenGroupAPI
.poll(
db,
server: server,
@ -95,10 +104,24 @@ extension OpenGroupAPI {
),
using: dependencies
)
.map(on: OpenGroupAPI.workQueue) { (failureCount, $0) }
}
.done(on: OpenGroupAPI.workQueue) { [weak self] response in
.done(on: OpenGroupAPI.workQueue) { [weak self] failureCount, response in
guard !isBackgroundPoll || isBackgroundPollerValid() else {
// If this was a background poll and the background poll is no longer valid
// then just stop
self?.isPolling = false
seal.fulfill(())
return
}
self?.isPolling = false
self?.handlePollResponse(response, isBackgroundPoll: isBackgroundPoll, using: dependencies)
self?.handlePollResponse(
response,
failureCount: failureCount,
isBackgroundPoll: isBackgroundPoll,
using: dependencies
)
dependencies.mutableCache.mutate { cache in
cache.hasPerformedInitialPoll[server] = true
@ -106,17 +129,18 @@ extension OpenGroupAPI {
UserDefaults.standard[.lastOpen] = Date()
}
// Reset the failure count
Storage.shared.writeAsync { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0))
}
SNLog("Open group polling finished for \(server).")
seal.fulfill(())
}
.catch(on: OpenGroupAPI.workQueue) { [weak self] error in
guard !isBackgroundPoll || isBackgroundPollerValid() else {
// If this was a background poll and the background poll is no longer valid
// then just stop
self?.isPolling = false
seal.fulfill(())
return
}
// If we are retrying then the error is being handled so no need to continue (this
// method will always resolve)
self?.updateCapabilitiesAndRetryIfNeeded(
@ -141,7 +165,10 @@ extension OpenGroupAPI {
Storage.shared.writeAsync { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1)))
.updateAll(
db,
OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1))
)
}
SNLog("Open group polling failed due to error: \(error). Setting failure count to \(pollFailureCount).")
@ -221,18 +248,166 @@ extension OpenGroupAPI {
return promise
}
private func handlePollResponse(_ response: [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)], isBackgroundPoll: Bool, using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) {
private func handlePollResponse(
_ response: PollResponse,
failureCount: Int64,
isBackgroundPoll: Bool,
using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()
) {
let server: String = self.server
dependencies.storage.write { db in
try response.forEach { endpoint, endpointResponse in
let validResponses: PollResponse = response
.filter { endpoint, endpointResponse in
switch endpoint {
case .capabilities:
guard let responseData: BatchSubResponse<Capabilities> = endpointResponse.data as? BatchSubResponse<Capabilities>, let responseBody: Capabilities = responseData.body else {
guard (endpointResponse.data as? BatchSubResponse<Capabilities>)?.body != nil else {
SNLog("Open group polling failed due to invalid capability data.")
return
return false
}
return true
case .roomPollInfo(let roomToken, _):
guard (endpointResponse.data as? BatchSubResponse<RoomPollInfo>)?.body != nil else {
switch (endpointResponse.data as? BatchSubResponse<RoomPollInfo>)?.code {
case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.")
default: SNLog("Open group polling failed due to invalid room info data.")
}
return false
}
return true
case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _):
guard
let responseData: BatchSubResponse<[Failable<Message>]> = endpointResponse.data as? BatchSubResponse<[Failable<Message>]>,
let responseBody: [Failable<Message>] = responseData.body
else {
switch (endpointResponse.data as? BatchSubResponse<[Failable<Message>]>)?.code {
case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.")
default: SNLog("Open group polling failed due to invalid messages data.")
}
return false
}
let successfulMessages: [Message] = responseBody.compactMap { $0.value }
if successfulMessages.count != responseBody.count {
let droppedCount: Int = (responseBody.count - successfulMessages.count)
SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").")
}
return !successfulMessages.isEmpty
case .inbox, .inboxSince, .outbox, .outboxSince:
guard
let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>,
!responseData.failedToParseBody
else {
SNLog("Open group polling failed due to invalid inbox/outbox data.")
return false
}
// Double optional because the server can return a `304` with an empty body
let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? [])
return !messages.isEmpty
default: return false // No custom handling needed
}
}
// If there are no remaining 'validResponses' and there hasn't been a failure then there is
// no need to do anything else
guard !validResponses.isEmpty || failureCount != 0 else { return }
// Retrieve the current capability & group info to check if anything changed
let rooms: [String] = validResponses
.keys
.compactMap { endpoint -> String? in
switch endpoint {
case .roomPollInfo(let roomToken, _): return roomToken
default: return nil
}
}
let currentInfo: (capabilities: Capabilities, groups: [OpenGroup])? = dependencies.storage.read { db in
let allCapabilities: [Capability] = try Capability
.filter(Capability.Columns.openGroupServer == server)
.fetchAll(db)
let capabilities: Capabilities = Capabilities(
capabilities: allCapabilities
.filter { !$0.isMissing }
.map { $0.variant },
missing: {
let missingCapabilities: [Capability.Variant] = allCapabilities
.filter { $0.isMissing }
.map { $0.variant }
return (missingCapabilities.isEmpty ? nil : missingCapabilities)
}()
)
let openGroupIds: [String] = rooms
.map { OpenGroup.idFor(roomToken: $0, server: server) }
let groups: [OpenGroup] = try OpenGroup
.filter(ids: openGroupIds)
.fetchAll(db)
return (capabilities, groups)
}
let changedResponses: PollResponse = validResponses
.filter { endpoint, endpointResponse in
switch endpoint {
case .capabilities:
guard
let responseData: BatchSubResponse<Capabilities> = endpointResponse.data as? BatchSubResponse<Capabilities>,
let responseBody: Capabilities = responseData.body
else { return false }
return (responseBody != currentInfo?.capabilities)
case .roomPollInfo(let roomToken, _):
guard
let responseData: BatchSubResponse<RoomPollInfo> = endpointResponse.data as? BatchSubResponse<RoomPollInfo>,
let responseBody: RoomPollInfo = responseData.body
else { return false }
guard let existingOpenGroup: OpenGroup = currentInfo?.groups.first(where: { $0.roomToken == roomToken }) else {
return true
}
// Note: This might need to be updated in the future when we start tracking
// user permissions if changes to permissions don't trigger a change to
// the 'infoUpdates'
return (
responseBody.activeUsers != existingOpenGroup.userCount || (
responseBody.details != nil &&
responseBody.details?.infoUpdates != existingOpenGroup.infoUpdates
)
)
default: return true
}
}
// If there are no 'changedResponses' and there hasn't been a failure then there is
// no need to do anything else
guard !changedResponses.isEmpty || failureCount != 0 else { return }
dependencies.storage.write { db in
// Reset the failure count
if failureCount > 0 {
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0))
}
try changedResponses.forEach { endpoint, endpointResponse in
switch endpoint {
case .capabilities:
guard
let responseData: BatchSubResponse<Capabilities> = endpointResponse.data as? BatchSubResponse<Capabilities>,
let responseBody: Capabilities = responseData.body
else { return }
OpenGroupManager.handleCapabilities(
db,
capabilities: responseBody,
@ -240,13 +415,10 @@ extension OpenGroupAPI {
)
case .roomPollInfo(let roomToken, _):
guard let responseData: BatchSubResponse<RoomPollInfo> = endpointResponse.data as? BatchSubResponse<RoomPollInfo>, let responseBody: RoomPollInfo = responseData.body else {
switch (endpointResponse.data as? BatchSubResponse<RoomPollInfo>)?.code {
case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.")
default: SNLog("Open group polling failed due to invalid room info data.")
}
return
}
guard
let responseData: BatchSubResponse<RoomPollInfo> = endpointResponse.data as? BatchSubResponse<RoomPollInfo>,
let responseBody: RoomPollInfo = responseData.body
else { return }
try OpenGroupManager.handlePollInfo(
db,
@ -258,24 +430,14 @@ extension OpenGroupAPI {
)
case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _):
guard let responseData: BatchSubResponse<[Failable<Message>]> = endpointResponse.data as? BatchSubResponse<[Failable<Message>]>, let responseBody: [Failable<Message>] = responseData.body else {
switch (endpointResponse.data as? BatchSubResponse<[Failable<Message>]>)?.code {
case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.")
default: SNLog("Open group polling failed due to invalid messages data.")
}
return
}
let successfulMessages: [Message] = responseBody.compactMap { $0.value }
if successfulMessages.count != responseBody.count {
let droppedCount: Int = (responseBody.count - successfulMessages.count)
SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").")
}
guard
let responseData: BatchSubResponse<[Failable<Message>]> = endpointResponse.data as? BatchSubResponse<[Failable<Message>]>,
let responseBody: [Failable<Message>] = responseData.body
else { return }
OpenGroupManager.handleMessages(
db,
messages: successfulMessages,
messages: responseBody.compactMap { $0.value },
for: roomToken,
on: server,
isBackgroundPoll: isBackgroundPoll,
@ -283,10 +445,10 @@ extension OpenGroupAPI {
)
case .inbox, .inboxSince, .outbox, .outboxSince:
guard let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, !responseData.failedToParseBody else {
SNLog("Open group polling failed due to invalid inbox/outbox data.")
return
}
guard
let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>,
!responseData.failedToParseBody
else { return }
// Double optional because the server can return a `304` with an empty body
let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? [])

View File

@ -150,6 +150,10 @@ public final class Poller {
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
case DatabaseError.SQLITE_ABORT:
SNLog("Failed to the database being suspended (running in background with no background task).")
break
default: SNLog("Failed to deserialize envelope due to error: \(error).")
}

View File

@ -44,10 +44,7 @@ public class TypingIndicators {
self.timestampMs = (timestampMs ?? Int64(floor(Date().timeIntervalSince1970 * 1000)))
}
fileprivate func starting(_ db: Database) -> Indicator {
let direction: Direction = self.direction
let timestampMs: Int64 = self.timestampMs
fileprivate func start(_ db: Database) {
// Start the typing indicator
switch direction {
case .outgoing:
@ -55,27 +52,17 @@ public class TypingIndicators {
case .incoming:
try? ThreadTypingIndicator(
threadId: self.threadId,
threadId: threadId,
timestampMs: timestampMs
)
.save(db)
}
// Schedule the 'stopCallback' to cancel the typing indicator
stopTimer?.invalidate()
stopTimer = Timer.scheduledTimerOnMainThread(
withTimeInterval: (direction == .outgoing ? 3 : 5),
repeats: false
) { [weak self] _ in
Storage.shared.write { db in
self?.stoping(db)
}
}
return self
// Refresh the timeout since we just started
refreshTimeout()
}
@discardableResult fileprivate func stoping(_ db: Database) -> Indicator? {
fileprivate func stop(_ db: Database) {
self.refreshTimer?.invalidate()
self.refreshTimer = nil
self.stopTimer?.invalidate()
@ -84,7 +71,7 @@ public class TypingIndicators {
switch direction {
case .outgoing:
guard let thread: SessionThread = try? SessionThread.fetchOne(db, id: self.threadId) else {
return nil
return
}
try? MessageSender.send(
@ -99,8 +86,22 @@ public class TypingIndicators {
.filter(ThreadTypingIndicator.Columns.threadId == self.threadId)
.deleteAll(db)
}
}
fileprivate func refreshTimeout() {
let threadId: String = self.threadId
let direction: Direction = self.direction
return nil
// Schedule the 'stopCallback' to cancel the typing indicator
stopTimer?.invalidate()
stopTimer = Timer.scheduledTimerOnMainThread(
withTimeInterval: (direction == .outgoing ? 3 : 5),
repeats: false
) { _ in
Storage.shared.write { db in
TypingIndicators.didStopTyping(db, threadId: threadId, direction: direction)
}
}
}
private func scheduleRefreshCallback(_ db: Database, shouldSend: Bool = true) {
@ -138,56 +139,76 @@ public class TypingIndicators {
// MARK: - Functions
public static func didStartTyping(
_ db: Database,
public static func didStartTypingNeedsToStart(
threadId: String,
threadVariant: SessionThread.Variant,
threadIsMessageRequest: Bool,
direction: Direction,
timestampMs: Int64?
) {
) -> Bool {
switch direction {
case .outgoing:
let updatedIndicator: Indicator? = (
outgoing.wrappedValue[threadId] ??
Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
)?.starting(db)
// If we already have an existing typing indicator for this thread then just
// refresh it's timeout (no need to do anything else)
if let existingIndicator: Indicator = outgoing.wrappedValue[threadId] {
existingIndicator.refreshTimeout()
return false
}
outgoing.mutate { $0[threadId] = updatedIndicator }
let newIndicator: Indicator? = Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
newIndicator?.refreshTimeout()
outgoing.mutate { $0[threadId] = newIndicator }
return true
case .incoming:
let updatedIndicator: Indicator? = (
incoming.wrappedValue[threadId] ??
Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
)?.starting(db)
// If we already have an existing typing indicator for this thread then just
// refresh it's timeout (no need to do anything else)
if let existingIndicator: Indicator = incoming.wrappedValue[threadId] {
existingIndicator.refreshTimeout()
return false
}
incoming.mutate { $0[threadId] = updatedIndicator }
let newIndicator: Indicator? = Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
newIndicator?.refreshTimeout()
incoming.mutate { $0[threadId] = newIndicator }
return true
}
}
public static func start(_ db: Database, threadId: String, direction: Direction) {
switch direction {
case .outgoing: outgoing.wrappedValue[threadId]?.start(db)
case .incoming: incoming.wrappedValue[threadId]?.start(db)
}
}
public static func didStopTyping(_ db: Database, threadId: String, direction: Direction) {
switch direction {
case .outgoing:
let updatedIndicator: Indicator? = outgoing.wrappedValue[threadId]?.stoping(db)
outgoing.mutate { $0[threadId] = updatedIndicator }
if let indicator: Indicator = outgoing.wrappedValue[threadId] {
indicator.stop(db)
outgoing.mutate { $0[threadId] = nil }
}
case .incoming:
let updatedIndicator: Indicator? = incoming.wrappedValue[threadId]?.stoping(db)
incoming.mutate { $0[threadId] = updatedIndicator }
if let indicator: Indicator = incoming.wrappedValue[threadId] {
indicator.stop(db)
incoming.mutate { $0[threadId] = nil }
}
}
}
}

View File

@ -68,19 +68,34 @@ public extension SnodeReceivedMessageInfo {
public extension SnodeReceivedMessageInfo {
static func pruneExpiredMessageHashInfo(for snode: Snode, namespace: Int, associatedWith publicKey: String) {
// Delete any expired SnodeReceivedMessageInfo values associated to a specific node
// Delete any expired SnodeReceivedMessageInfo values associated to a specific node (even though
// this runs very quickly we fetch the rowIds we want to delete from a 'read' call to avoid
// blocking the write queue since this method is called very frequently)
let rowIds: [Int64] = Storage.shared
.read { db in
// Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want
// to clear out the legacy hashes)
let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.isNotEmpty(db)
guard hasNonLegacyHash else { return [] }
return try SnodeReceivedMessageInfo
.select(Column.rowID)
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000))
.asRequest(of: Int64.self)
.fetchAll(db)
}
.defaulting(to: [])
// If there are no rowIds to delete then do nothing
guard !rowIds.isEmpty else { return }
Storage.shared.write { db in
// Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want
// to clear out the legacy hashes)
let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.isNotEmpty(db)
guard hasNonLegacyHash else { return }
try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000))
.filter(rowIds.contains(Column.rowID))
.deleteAll(db)
}
}

View File

@ -60,6 +60,7 @@ public final class Storage {
// Configure the database and create the DatabasePool for interacting with the database
var config = Configuration()
config.maximumReaderCount = 10 // Increase the max read connection limit - Default is 5
config.observesSuspensionNotifications = true // Minimise `0xDEAD10CC` exceptions
config.prepareDatabase { db in
var keySpec: Data = Storage.getOrGenerateDatabaseKeySpec()
defer { keySpec.resetBytes(in: 0..<keySpec.count) } // Reset content immediately after use
@ -180,7 +181,6 @@ public final class Storage {
self?.hasCompletedMigrations = true
self?.migrationProgressUpdater = nil
SUKLegacy.clearLegacyDatabaseInstance()
// SUKLegacy.deleteLegacyDatabaseFilesAndKey() // TODO: Add a "Delete legacy database" migration to run after the '003' migrations
if let error = error {
SNLog("[Migration Error] Migration failed with error: \(error)")

View File

@ -28,6 +28,12 @@ public extension Dictionary.Values {
// MARK: - Functional Convenience
public extension Dictionary {
public subscript(_ key: Key?) -> Value? {
guard let key: Key = key else { return nil }
return self[key]
}
func setting(_ key: Key?, _ value: Value?) -> [Key: Value] {
guard let key: Key = key else { return self }

View File

@ -126,6 +126,9 @@ public final class JobRunner {
queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) }
// Don't start the queue if the job can't be started
guard canStartJob else { return }
// Start the job runner if needed
db.afterNextTransactionCommit { _ in
queues.wrappedValue[updatedJob.variant]?.start()
@ -253,6 +256,15 @@ public final class JobRunner {
JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true }
}
/// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run
/// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their
/// failure - they _should_ be picked up again the next time the app is launched)
public static func stopAndClearPendingJobs() {
queues.wrappedValue.values.forEach { queue in
queue.stopAndClearPendingJobs()
}
}
public static func isCurrentlyRunning(_ job: Job?) -> Bool {
guard let job: Job = job, let jobId: Int64 = job.id else { return false }
@ -347,6 +359,8 @@ private final class JobQueue {
}
}
private static let deferralLoopThreshold: Int = 3
private let type: QueueType
private let executionType: ExecutionType
private let qosClass: DispatchQoS
@ -376,6 +390,7 @@ private final class JobQueue {
private var queue: Atomic<[Job]> = Atomic([])
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty }
@ -555,7 +570,16 @@ private final class JobQueue {
runNextJob()
}
fileprivate func stopAndClearPendingJobs() {
isRunning.mutate { $0 = false }
queue.mutate { $0 = [] }
deferLoopTracker.mutate { $0 = [:] }
}
private func runNextJob() {
// Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job)
guard isRunning.wrappedValue else { return }
// Ensure this is running on the correct queue
guard DispatchQueue.getSpecific(key: queueKey) == queueContext else {
internalQueue.async { [weak self] in
@ -652,7 +676,7 @@ private final class JobQueue {
return
}
// Update the state to indicate it's running
// Update the state to indicate the particular job is running
//
// Note: We need to store 'numJobsRemaining' in it's own variable because
// the 'SNLog' seems to dispatch to it's own queue which ends up getting
@ -662,7 +686,6 @@ private final class JobQueue {
trigger?.invalidate() // Need to invalidate to prevent a memory leak
trigger = nil
}
isRunning.mutate { $0 = true }
jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in
jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id)
numJobsRunning = jobsCurrentlyRunning.count
@ -779,13 +802,20 @@ private final class JobQueue {
// `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over
// and over and reset their retry backoff in case they fail next time
case .recurringOnLaunch, .recurringOnActive:
Storage.shared.write { db in
_ = try job
.with(
failureCount: 0,
nextRunTimestamp: 0
)
.saved(db)
if
let jobId: Int64 = job.id,
job.failureCount != 0 &&
job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude
{
Storage.shared.write { db in
_ = try Job
.filter(id: jobId)
.updateAll(
db,
Job.Columns.failureCount.set(to: 0),
Job.Columns.nextRunTimestamp.set(to: 0)
)
}
}
default: break
@ -927,8 +957,48 @@ private final class JobQueue {
/// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant
/// on other jobs, and it should automatically manage those dependencies)
private func handleJobDeferred(_ job: Job) {
var stuckInDeferLoop: Bool = false
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
deferLoopTracker.mutate {
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
$0 = $0.setting(
job.id,
(1, [Date().timeIntervalSince1970])
)
return
}
let timeNow: TimeInterval = Date().timeIntervalSince1970
stuckInDeferLoop = (
lastRecord.count >= JobQueue.deferralLoopThreshold &&
(timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count)
)
$0 = $0.setting(
job.id,
(
lastRecord.count + 1,
// Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster
// than one loop per second
lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow]
)
)
}
// It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately
// defers itself but then attempts to run again (resulting in an infinite loop); this won't block
// the app since it's on a background thread but can result in 100% of a CPU being used (and a
// battery drain)
//
// This code will maintain an in-memory store for any jobs which are deferred too quickly (ie.
// more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds)
guard !stuckInDeferLoop else {
deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) }
handleJobFailed(job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false)
return
}
internalQueue.async { [weak self] in
self?.runNextJob()
}

View File

@ -11,4 +11,6 @@ public enum JobRunnerError: Error {
case missingRequiredDetails
case missingDependencies
case possibleDeferralLoop
}