Further debug attachment sending

This commit is contained in:
nielsandriesse 2020-11-27 09:07:24 +11:00
parent f04db2afb1
commit 77c1f721b9
14 changed files with 68 additions and 17 deletions

View File

@ -56,4 +56,16 @@ extension Storage {
#endif
return result.first
}
public func getMessageSendJob(for messageSendJobID: String) -> MessageSendJob? {
var result: MessageSendJob?
Storage.read { transaction in
result = transaction.object(forKey: messageSendJobID, inCollection: MessageSendJob.collection) as? MessageSendJob
}
return result
}
public func resumeMessageSendJobIfNeeded(_ messageSendJobID: String) {
getMessageSendJob(for: messageSendJobID)?.execute()
}
}

View File

@ -21,7 +21,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
// MARK: Settings
public class var collection: String { return "AttachmentDownloadJobCollection" }
public static let maxFailureCount: UInt = 20
public static let maxFailureCount: UInt = 100
// MARK: Initialization
public init(attachmentID: String, tsIncomingMessageID: String) {

View File

@ -3,6 +3,8 @@ import SessionUtilitiesKit
public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public let attachmentID: String
public let threadID: String
public let message: Message
public let messageSendJobID: String
public var delegate: JobDelegate?
public var id: String?
public var failureCount: UInt = 0
@ -22,18 +24,24 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
public static let maxFailureCount: UInt = 20
// MARK: Initialization
public init(attachmentID: String, threadID: String) {
public init(attachmentID: String, threadID: String, message: Message, messageSendJobID: String) {
self.attachmentID = attachmentID
self.threadID = threadID
self.message = message
self.messageSendJobID = messageSendJobID
}
// MARK: Coding
public init?(coder: NSCoder) {
guard let attachmentID = coder.decodeObject(forKey: "attachmentID") as! String?,
let threadID = coder.decodeObject(forKey: "threadID") as! String?,
let message = coder.decodeObject(forKey: "message") as! Message?,
let messageSendJobID = coder.decodeObject(forKey: "messageSendJobID") as! String?,
let id = coder.decodeObject(forKey: "id") as! String? else { return nil }
self.attachmentID = attachmentID
self.threadID = threadID
self.message = message
self.messageSendJobID = messageSendJobID
self.id = id
self.failureCount = coder.decodeObject(forKey: "failureCount") as! UInt? ?? 0
}
@ -41,6 +49,8 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
public func encode(with coder: NSCoder) {
coder.encode(attachmentID, forKey: "attachmentID")
coder.encode(threadID, forKey: "threadID")
coder.encode(message, forKey: "message")
coder.encode(messageSendJobID, forKey: "messageSendJobID")
coder.encode(id, forKey: "id")
coder.encode(failureCount, forKey: "failureCount")
}
@ -69,14 +79,30 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
private func handleSuccess() {
delegate?.handleJobSucceeded(self)
Configuration.shared.storage.resumeMessageSendJobIfNeeded(messageSendJobID)
}
private func handlePermanentFailure(error: Swift.Error) {
delegate?.handleJobFailedPermanently(self, with: error)
failAssociatedMessageSendJob(with: error)
}
private func handleFailure(error: Swift.Error) {
delegate?.handleJobFailed(self, with: error)
if failureCount + 1 == AttachmentUploadJob.maxFailureCount {
failAssociatedMessageSendJob(with: error)
}
}
private func failAssociatedMessageSendJob(with error: Swift.Error) {
let storage = Configuration.shared.storage
let messageSendJob = storage.getMessageSendJob(for: messageSendJobID)
storage.withAsync({ transaction in // Intentionally capture self
MessageSender.handleFailedMessageSend(self.message, with: error, using: transaction)
if let messageSendJob = messageSendJob {
storage.markJobAsFailed(messageSendJob, using: transaction)
}
}, completion: { })
}
}

View File

@ -5,5 +5,4 @@ public protocol JobDelegate {
func handleJobSucceeded(_ job: Job)
func handleJobFailed(_ job: Job, with error: Error)
func handleJobFailedPermanently(_ job: Job, with error: Error)
func postpone(_ job: Job)
}

View File

@ -1,5 +1,7 @@
import SessionUtilitiesKit
// TODO: Check that retrying works
@objc(SNJobQueue)
public final class JobQueue : NSObject, JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
@ -9,7 +11,7 @@ public final class JobQueue : NSObject, JobDelegate {
@objc public func add(_ job: Job, using transaction: Any) {
let transaction = transaction as! YapDatabaseReadWriteTransaction
addWithoutExecuting(job, using: transaction)
transaction.addCompletionQueue(DispatchQueue.global(qos: .userInitiated)) {
transaction.addCompletionQueue(Threading.jobQueue) {
job.execute()
}
}
@ -56,7 +58,7 @@ public final class JobQueue : NSObject, JobDelegate {
})
} else {
let retryInterval = self.getRetryInterval(for: job)
Timer.weakScheduledTimer(withTimeInterval: retryInterval, target: self, selector: #selector(self.retry(_:)), userInfo: job, repeats: false)
Timer.scheduledTimer(timeInterval: retryInterval, target: self, selector: #selector(self.retry(_:)), userInfo: job, repeats: false)
}
})
}
@ -74,10 +76,6 @@ public final class JobQueue : NSObject, JobDelegate {
})
})
}
public func postpone(_ job: Job) {
Timer.weakScheduledTimer(withTimeInterval: 3, target: self, selector: #selector(self.retry(_:)), userInfo: job, repeats: false)
}
private func getRetryInterval(for job: Job) -> TimeInterval {
// Arbitrary backoff factor...

View File

@ -1,6 +1,6 @@
import SessionUtilitiesKit
// TODO: Cancel when a message is deleted
// TODO: Cancel when a message/conversation is deleted
@objc(SNMessageSendJob)
public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
@ -71,13 +71,13 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
if storage.getAttachmentUploadJob(for: attachment.uniqueId!) != nil {
// Wait for it to finish
} else {
let job = AttachmentUploadJob(attachmentID: attachment.uniqueId!, threadID: message.threadID!)
let job = AttachmentUploadJob(attachmentID: attachment.uniqueId!, threadID: message.threadID!, message: message, messageSendJobID: id!)
storage.withAsync({ transaction in
JobQueue.shared.add(job, using: transaction)
}, completion: { })
}
}
if !attachmentsToUpload.isEmpty { delegate?.postpone(self); return } // Wait for all attachments to upload before continuing
if !attachmentsToUpload.isEmpty { return } // Wait for all attachments to upload before continuing
}
// FIXME: This doesn't yet handle the attachment side of link previews, quotes, etc.
storage.withAsync({ transaction in // Intentionally capture self

View File

@ -12,7 +12,7 @@ public extension TSIncomingMessage {
in: thread,
authorId: sender,
sourceDeviceId: 1,
messageBody: visibleMessage.text!,
messageBody: visibleMessage.text,
attachmentIds: visibleMessage.attachmentIDs,
expiresInSeconds: expiration,
quotedMessage: TSQuotedMessage.from(visibleMessage.quote),

View File

@ -1059,8 +1059,6 @@ NSString *NSStringForOutgoingMessageRecipientState(OWSOutgoingMessageRecipientSt
return [result copy];
}
- (uint)ttl { return 2 * 24 * 60 * 60 * 1000; }
@end
NS_ASSUME_NONNULL_END

View File

@ -84,7 +84,11 @@ internal enum MessageReceiver {
message.receivedTimestamp = NSDate.millisecondTimestamp()
message.groupPublicKey = groupPublicKey
message.openGroupServerMessageID = messageServerID
guard message.isValid else { throw Error.invalidMessage }
var isValid = message.isValid
if message is VisibleMessage && !isValid && proto.dataMessage?.attachments.isEmpty == false {
isValid = true
}
guard isValid else { throw Error.invalidMessage }
return (message, proto)
} else {
throw Error.unknownMessage

View File

@ -133,6 +133,8 @@ NSString *const kOutgoingReadReceiptManagerCollection = @"kOutgoingReadReceiptMa
}
- (NSArray<AnyPromise *> *)sendReceiptsForReceiptType:(OWSReceiptType)receiptType {
if (receiptType == OWSReceiptType_Delivery) { return @[]; } // Don't send delivery receipts
NSString *collection = [self collectionForReceiptType:receiptType];
NSMutableDictionary<NSString *, NSSet<NSNumber *> *> *queuedReceiptMap = [NSMutableDictionary new];

View File

@ -32,6 +32,8 @@ public protocol SessionMessagingKitStorageProtocol {
func markJobAsFailed(_ job: Job, using transaction: Any)
func getAllPendingJobs(of type: Job.Type) -> [Job]
func getAttachmentUploadJob(for attachmentID: String) -> AttachmentUploadJob?
func getMessageSendJob(for messageSendJobID: String) -> MessageSendJob?
func resumeMessageSendJobIfNeeded(_ messageSendJobID: String)
// MARK: - Authorization

View File

@ -0,0 +1,6 @@
import Foundation
internal enum Threading {
internal static let jobQueue = DispatchQueue(label: "SessionMessagingKit.jobQueue", qos: .userInitiated)
}

View File

@ -15,7 +15,7 @@ NS_ASSUME_NONNULL_BEGIN
return value;
}
- (nullable NSDictionary *)dictionaryForKey : (NSString *)key inCollection : (NSString *)collection
- (nullable NSDictionary *)dictionaryForKey : (NSString *)key inCollection : (NSString *)collection
{
return [self objectForKey:key inCollection:collection ofExpectedType:[NSDictionary class]];
}

View File

@ -913,6 +913,7 @@
C3DFFAC623E96F0D0058DAF8 /* Sheet.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3DFFAC523E96F0D0058DAF8 /* Sheet.swift */; };
C3E5C2FA251DBABB0040DFFC /* EditClosedGroupVC.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3E5C2F9251DBABB0040DFFC /* EditClosedGroupVC.swift */; };
C3E7134F251C867C009649BB /* Sodium+Conversion.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3E7134E251C867C009649BB /* Sodium+Conversion.swift */; };
C3ECBF7B257056B700EA7FCE /* Threading.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3ECBF7A257056B700EA7FCE /* Threading.swift */; };
C3F0A530255C80BC007BE2A3 /* NoopNotificationsManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3F0A52F255C80BC007BE2A3 /* NoopNotificationsManager.swift */; };
C3F0A5EC255C970D007BE2A3 /* Configuration.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3F0A5EB255C970D007BE2A3 /* Configuration.swift */; };
D2179CFC16BB0B3A0006F3AB /* CoreTelephony.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D2179CFB16BB0B3A0006F3AB /* CoreTelephony.framework */; };
@ -2012,6 +2013,7 @@
C3DFFAC523E96F0D0058DAF8 /* Sheet.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Sheet.swift; sourceTree = "<group>"; };
C3E5C2F9251DBABB0040DFFC /* EditClosedGroupVC.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EditClosedGroupVC.swift; sourceTree = "<group>"; };
C3E7134E251C867C009649BB /* Sodium+Conversion.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Sodium+Conversion.swift"; sourceTree = "<group>"; };
C3ECBF7A257056B700EA7FCE /* Threading.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Threading.swift; sourceTree = "<group>"; };
C3F0A52F255C80BC007BE2A3 /* NoopNotificationsManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NoopNotificationsManager.swift; sourceTree = "<group>"; };
C3F0A5B2255C915C007BE2A3 /* en */ = {isa = PBXFileReference; lastKnownFileType = text.plist.strings; name = en; path = en.lproj/Localizable.strings; sourceTree = "<group>"; };
C3F0A5EB255C970D007BE2A3 /* Configuration.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Configuration.swift; sourceTree = "<group>"; };
@ -3351,6 +3353,7 @@
C33FDACD255A580200E217F9 /* SSKJobRecord.m */,
C3A3A098256E17B2004D228D /* SSKJobRecordFinder.swift */,
C3A3A170256E1D25004D228D /* SSKReachabilityManager.swift */,
C3ECBF7A257056B700EA7FCE /* Threading.swift */,
C33FDB5F255A580E00E217F9 /* YapDatabaseConnection+OWS.h */,
C33FDB43255A580C00E217F9 /* YapDatabaseConnection+OWS.m */,
C33FDA88255A57FD00E217F9 /* YapDatabaseTransaction+OWS.h */,
@ -5216,6 +5219,7 @@
B8856D69256F141F001CE70E /* OWSWindowManager.m in Sources */,
C32C5F5F256DFD90003C73A2 /* TSInvalidIdentityKeyErrorMessage.m in Sources */,
C3D9E3BE25676AD70040E4F3 /* TSAttachmentPointer.m in Sources */,
C3ECBF7B257056B700EA7FCE /* Threading.swift in Sources */,
C32C5AAB256DBE8F003C73A2 /* TSIncomingMessage+Conversion.swift in Sources */,
C32C5A88256DBCF9003C73A2 /* MessageReceiver+Handling.swift in Sources */,
C32C5C1B256DC9E0003C73A2 /* GeneralUtilities.swift in Sources */,