This commit is contained in:
Niels Andriesse 2020-11-23 15:08:01 +11:00
parent d7c71a8c06
commit d2e8f2142e
6 changed files with 27 additions and 8 deletions

View File

@ -11,6 +11,9 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
public class var collection: String { return "AttachmentUploadJobCollection" }
public static let maxFailureCount: UInt = 20
// MARK: Initialization
public override init() { }
// MARK: Coding
public init?(coder: NSCoder) { }

View File

@ -5,4 +5,5 @@ public protocol JobDelegate {
func handleJobSucceeded(_ job: Job)
func handleJobFailed(_ job: Job, with error: Error)
func handleJobFailedPermanently(_ job: Job, with error: Error)
func postpone(_ job: Job)
}

View File

@ -64,6 +64,10 @@ public final class JobQueue : NSObject, JobDelegate {
})
})
}
public func postpone(_ job: Job) {
Timer.weakScheduledTimer(withTimeInterval: 3, target: self, selector: #selector(self.retry(_:)), userInfo: job, repeats: false)
}
private func getRetryInterval(for job: Job) -> TimeInterval {
// Arbitrary backoff factor...

View File

@ -10,7 +10,7 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
// MARK: Settings
public class var collection: String { return "MessageSendJobCollection" }
public static let maxFailureCount: UInt = 20
public static let maxFailureCount: UInt = 10
// MARK: Initialization
@objc public convenience init(message: Message, publicKey: String) { self.init(message: message, destination: .contact(publicKey: publicKey)) }
@ -61,7 +61,23 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
// MARK: Running
public func execute() {
Configuration.shared.storage.withAsync({ transaction in // Intentionally capture self
let storage = Configuration.shared.storage
if let message = message as? VisibleMessage {
let attachments = message.attachmentIDs.compactMap { TSAttachmentStream.fetch(uniqueId: $0) }
let attachmentsToUpload = attachments.filter { !$0.isUploaded }
attachmentsToUpload.forEach { attachment in
if storage.getAttachmentUploadJob(for: attachment.uniqueId!) != nil {
// Wait for it to finish
} else {
let job = AttachmentUploadJob()
storage.withAsync({ transaction in
JobQueue.shared.add(job, using: transaction)
}, completion: { })
}
}
if !attachmentsToUpload.isEmpty { delegate?.postpone(self); return } // Wait for all attachments to upload before continuing
}
storage.withAsync({ transaction in // Intentionally capture self
Threading.workQueue.async {
MessageSender.send(self.message, to: self.destination, using: transaction).done(on: Threading.workQueue) {
self.handleSuccess()

View File

@ -1,11 +1,5 @@
import SessionUtilitiesKit
// TODO:
// Threads don't show up on the first message; only on the second.
// Profile pictures aren't showing up.
// Check that message expiration works.
// Open group messages (sync messages).
internal enum MessageReceiver {
internal enum Error : LocalizedError {

View File

@ -29,6 +29,7 @@ public protocol SessionMessagingKitStorageProtocol {
func markJobAsSucceeded(_ job: Job, using transaction: Any)
func markJobAsFailed(_ job: Job, using transaction: Any)
func getAllPendingJobs(of type: Job.Type) -> [Job]
func getAttachmentUploadJob(for attachmentID: String) -> AttachmentUploadJob?
// MARK: - Authorization