This commit is contained in:
nielsandriesse 2020-11-24 20:09:23 +11:00
parent 5e476e8330
commit 178ab7e3e2
15 changed files with 84 additions and 60 deletions

View File

@ -3238,7 +3238,10 @@ typedef enum : NSUInteger {
TSOutgoingMessage *tsMessage = [TSOutgoingMessage from:message associatedWith:thread];
[LKStorage writeWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[tsMessage saveWithTransaction:transaction];
[SNMessageSender send:message withAttachments:attachments inThread:thread usingTransaction:transaction];
} completion:^{
[LKStorage writeWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[SNMessageSender send:message withAttachments:attachments inThread:thread usingTransaction:transaction];
}];
}];
[self messageWasSent:tsMessage];
});

View File

@ -3,9 +3,9 @@ import SessionUtilitiesKit
import SignalCoreKit
public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public let attachmentID: String
public let tsIncomingMessageID: String
public var delegate: JobDelegate?
private let attachmentID: String
private let tsIncomingMessageID: String
public var id: String?
public var failureCount: UInt = 0

View File

@ -1,9 +1,9 @@
import SessionUtilitiesKit
public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public var delegate: JobDelegate?
public let attachmentID: String
public let threadID: String
public var delegate: JobDelegate?
public var id: String?
public var failureCount: UInt = 0

View File

@ -2,13 +2,14 @@ import SessionUtilitiesKit
@objc(SNJobQueue)
public final class JobQueue : NSObject, JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
@objc public static let shared = JobQueue()
@objc public func add(_ job: Job, using transaction: Any) {
let transaction = transaction as! YapDatabaseReadWriteTransaction
addWithoutExecuting(job, using: transaction)
transaction.addCompletionQueue(Threading.workQueue) {
transaction.addCompletionQueue(DispatchQueue.global(qos: .userInitiated)) {
job.execute()
}
}
@ -20,6 +21,12 @@ public final class JobQueue : NSObject, JobDelegate {
}
@objc public func resumePendingJobs() {
if hasResumedPendingJobs {
#if DEBUG
preconditionFailure("resumePendingJobs() should only be called once.")
#endif
}
hasResumedPendingJobs = true
let allJobTypes: [Job.Type] = [ AttachmentDownloadJob.self, AttachmentUploadJob.self, MessageReceiveJob.self, MessageSendJob.self, NotifyPNServerJob.self ]
allJobTypes.forEach { type in
let allPendingJobs = Configuration.shared.storage.getAllPendingJobs(of: type)

View File

@ -1,10 +1,10 @@
import SessionUtilitiesKit
public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public let data: Data
public let messageServerID: UInt64?
public var delegate: JobDelegate?
private let data: Data
public var id: String?
private let messageServerID: UInt64?
public var failureCount: UInt = 0
// MARK: Settings
@ -22,33 +22,31 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
guard let data = coder.decodeObject(forKey: "data") as! Data?,
let id = coder.decodeObject(forKey: "id") as! String? else { return nil }
self.data = data
self.id = id
self.messageServerID = coder.decodeObject(forKey: "messageServerUD") as! UInt64?
self.id = id
self.failureCount = coder.decodeObject(forKey: "failureCount") as! UInt? ?? 0
}
public func encode(with coder: NSCoder) {
coder.encode(data, forKey: "data")
coder.encode(id, forKey: "id")
coder.encode(messageServerID, forKey: "messageServerID")
coder.encode(id, forKey: "id")
coder.encode(failureCount, forKey: "failureCount")
}
// MARK: Running
public func execute() {
Configuration.shared.storage.withAsync({ transaction in // Intentionally capture self
Threading.workQueue.async {
do {
let (message, proto) = try MessageReceiver.parse(self.data, messageServerID: self.messageServerID, using: transaction)
try MessageReceiver.handle(message, associatedWithProto: proto, using: transaction)
self.handleSuccess()
} catch {
SNLog("Couldn't parse message due to error: \(error).")
if let error = error as? MessageReceiver.Error, !error.isRetryable {
self.handlePermanentFailure(error: error)
} else {
self.handleFailure(error: error)
}
do {
let (message, proto) = try MessageReceiver.parse(self.data, messageServerID: self.messageServerID, using: transaction)
try MessageReceiver.handle(message, associatedWithProto: proto, using: transaction)
self.handleSuccess()
} catch {
SNLog("Couldn't parse message due to error: \(error).")
if let error = error as? MessageReceiver.Error, !error.isRetryable {
self.handlePermanentFailure(error: error)
} else {
self.handleFailure(error: error)
}
}
}, completion: { })

View File

@ -2,9 +2,9 @@ import SessionUtilitiesKit
@objc(SNMessageSendJob)
public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public var delegate: JobDelegate?
public let message: Message
private let destination: Message.Destination
public let destination: Message.Destination
public var delegate: JobDelegate?
public var id: String?
public var failureCount: UInt = 0
@ -50,12 +50,12 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
public func encode(with coder: NSCoder) {
coder.encode(message, forKey: "message")
coder.encode(id, forKey: "id")
switch destination {
case .contact(let publicKey): coder.encode("contact(\(publicKey))", forKey: "destination")
case .closedGroup(let groupPublicKey): coder.encode("closedGroup(\(groupPublicKey))", forKey: "destination")
case .openGroup(let channel, let server): coder.encode("openGroup(\(channel), \(server))")
}
coder.encode(id, forKey: "id")
coder.encode(failureCount, forKey: "failureCount")
}
@ -79,16 +79,14 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
}
// FIXME: This doesn't yet handle the attachment side of link previews, quotes, etc.
storage.withAsync({ transaction in // Intentionally capture self
Threading.workQueue.async {
MessageSender.send(self.message, to: self.destination, using: transaction).done(on: Threading.workQueue) {
self.handleSuccess()
}.catch(on: Threading.workQueue) { error in
SNLog("Couldn't send message due to error: \(error).")
if let error = error as? MessageSender.Error, !error.isRetryable {
self.handlePermanentFailure(error: error)
} else {
self.handleFailure(error: error)
}
MessageSender.send(self.message, to: self.destination, using: transaction).done(on: DispatchQueue.global(qos: .userInitiated)) {
self.handleSuccess()
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
SNLog("Couldn't send message due to error: \(error).")
if let error = error as? MessageSender.Error, !error.isRetryable {
self.handlePermanentFailure(error: error)
} else {
self.handleFailure(error: error)
}
}
}, completion: { })

View File

@ -3,8 +3,8 @@ import SessionSnodeKit
import SessionUtilitiesKit
public final class NotifyPNServerJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public let message: SnodeMessage
public var delegate: JobDelegate?
private let message: SnodeMessage
public var id: String?
public var failureCount: UInt = 0

View File

@ -16,7 +16,6 @@ public final class OpenGroupAPI : DotNetAPI {
private static let maxRetryCount: UInt = 4
public static let profilePictureType = "network.loki.messenger.avatar"
@objc public static let openGroupMessageType = "network.loki.messenger.publicChat"
// MARK: Open Group Public Key Validation

View File

@ -2,6 +2,7 @@ import SessionUtilitiesKit
internal enum MessageReceiver {
// MARK: Error
internal enum Error : LocalizedError {
case invalidMessage
case unknownMessage
@ -41,6 +42,7 @@ internal enum MessageReceiver {
}
}
// MARK: Parsing
internal static func parse(_ data: Data, messageServerID: UInt64?, using transaction: Any) throws -> (Message, SNProtoContent) {
let userPublicKey = Configuration.shared.storage.getUserPublicKey()
// Parse the envelope
@ -91,6 +93,7 @@ internal enum MessageReceiver {
}
}
// MARK: Handling
internal static func handle(_ message: Message, associatedWithProto proto: SNProtoContent, using transaction: Any) throws {
switch message {
case let message as ReadReceipt: handleReadReceipt(message, using: transaction)

View File

@ -5,6 +5,7 @@ import SessionUtilitiesKit
@objc(SNMessageSender)
public final class MessageSender : NSObject {
// MARK: Error
public enum Error : LocalizedError {
case invalidMessage
case protoConversionFailed
@ -27,9 +28,11 @@ public final class MessageSender : NSObject {
}
}
}
// MARK: Initialization
private override init() { }
// MARK: Convenience
public static func send(_ message: Message, to destination: Message.Destination, using transaction: Any) -> Promise<Void> {
switch destination {
case .contact(_), .closedGroup(_): return sendToSnodeDestination(destination, message: message, using: transaction)
@ -37,10 +40,11 @@ public final class MessageSender : NSObject {
}
}
// MARK: One-on-One Chats & Closed Groups
internal static func sendToSnodeDestination(_ destination: Message.Destination, message: Message, using transaction: Any) -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending()
let storage = Configuration.shared.storage
if message.sentTimestamp == nil { // Visible messages will already have the sent timestamp set
if message.sentTimestamp == nil { // Visible messages will already have their sent timestamp set
message.sentTimestamp = NSDate.millisecondTimestamp()
}
message.sender = storage.getUserPublicKey()
@ -49,7 +53,7 @@ public final class MessageSender : NSObject {
case .closedGroup(let groupPublicKey): message.recipient = groupPublicKey
case .openGroup(_, _): preconditionFailure()
}
// Set the failure handler
// Set the failure handler (for precondition failure handling)
let _ = promise.catch(on: DispatchQueue.main) { error in
storage.withAsync({ transaction in
Configuration.shared.messageSenderDelegate.handleFailedMessageSend(message, with: error, using: transaction)
@ -130,26 +134,27 @@ public final class MessageSender : NSObject {
}
}
let snodeMessage = SnodeMessage(recipient: recipient, data: base64EncodedData, ttl: type(of: message).ttl, timestamp: timestamp, nonce: nonce)
SnodeAPI.sendMessage(snodeMessage).done(on: Threading.workQueue) { promises in
SnodeAPI.sendMessage(snodeMessage).done(on: DispatchQueue.global(qos: .userInitiated)) { promises in
var isSuccess = false
let promiseCount = promises.count
var errorCount = 0
promises.forEach {
let _ = $0.done(on: Threading.workQueue) { _ in
let _ = $0.done(on: DispatchQueue.global(qos: .userInitiated)) { _ in
guard !isSuccess else { return } // Succeed as soon as the first promise succeeds
isSuccess = true
seal.fulfill(())
}
$0.catch(on: Threading.workQueue) { error in
$0.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
errorCount += 1
guard errorCount == promiseCount else { return } // Only error out if all promises failed
seal.reject(error)
}
}
}.catch(on: Threading.workQueue) { error in
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
SNLog("Couldn't send message due to error: \(error).")
seal.reject(error)
}
// Handle completion
let _ = promise.done(on: DispatchQueue.main) {
storage.withAsync({ transaction in
Configuration.shared.messageSenderDelegate.handleSuccessfulMessageSend(message, using: transaction)
@ -157,14 +162,18 @@ public final class MessageSender : NSObject {
if case .contact(_) = destination {
NotificationCenter.default.post(name: .messageSent, object: NSNumber(value: message.sentTimestamp!))
}
let notifyPNServerJob = NotifyPNServerJob(message: snodeMessage)
storage.withAsync({ transaction in
JobQueue.shared.add(notifyPNServerJob, using: transaction)
}, completion: { })
if message is VisibleMessage {
let notifyPNServerJob = NotifyPNServerJob(message: snodeMessage)
storage.withAsync({ transaction in
JobQueue.shared.add(notifyPNServerJob, using: transaction)
}, completion: { })
}
}
// Return
return promise
}
// MARK: Open Groups
internal static func sendToOpenGroupDestination(_ destination: Message.Destination, message: Message, using transaction: Any) -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending()
let storage = Configuration.shared.storage
@ -174,12 +183,15 @@ public final class MessageSender : NSObject {
case .closedGroup(_): preconditionFailure()
case .openGroup(let channel, let server): message.recipient = "\(server).\(channel)"
}
// Set the failure handler (for precondition failure handling)
let _ = promise.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
storage.withAsync({ transaction in
Configuration.shared.messageSenderDelegate.handleFailedMessageSend(message, with: error, using: transaction)
}, completion: { })
}
// Validate the message
guard message.isValid else { seal.reject(Error.invalidMessage); return promise }
// Convert the message to an open group message
let (channel, server) = { () -> (UInt64, String) in
switch destination {
case .openGroup(let channel, let server): return (channel, server)
@ -188,17 +200,20 @@ public final class MessageSender : NSObject {
}()
guard let message = message as? VisibleMessage,
let openGroupMessage = OpenGroupMessage.from(message, for: server) else { seal.reject(Error.invalidMessage); return promise }
// Send the result
OpenGroupAPI.sendMessage(openGroupMessage, to: channel, on: server).done(on: DispatchQueue.global(qos: .userInitiated)) { openGroupMessage in
message.openGroupServerMessageID = openGroupMessage.serverID
seal.fulfill(())
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
seal.reject(error)
}
// Handle completion
let _ = promise.done(on: DispatchQueue.global(qos: .userInitiated)) {
storage.withAsync({ transaction in
Configuration.shared.messageSenderDelegate.handleSuccessfulMessageSend(message, using: transaction)
}, completion: { })
}
// Return
return promise
}
}

View File

@ -1,6 +0,0 @@
import Foundation
internal enum Threading {
internal static let workQueue = DispatchQueue(label: "SessionMessagingKit.workQueue", qos: .userInitiated) // It's important that this is a serial queue
}

View File

@ -538,7 +538,6 @@
C3402FE52559036600EA6424 /* SessionUIKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C331FF1B2558F9D300070591 /* SessionUIKit.framework */; };
C3471ECB2555356A00297E91 /* MessageSender+Encryption.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3471ECA2555356A00297E91 /* MessageSender+Encryption.swift */; };
C3471ED42555386B00297E91 /* AESGCM.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3C2A5D72553860B00C340D1 /* AESGCM.swift */; };
C3471F4225553A4D00297E91 /* Threading.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3471F4125553A4D00297E91 /* Threading.swift */; };
C3471F4C25553AB000297E91 /* MessageReceiver+Decryption.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3471F4B25553AB000297E91 /* MessageReceiver+Decryption.swift */; };
C3471FA42555439E00297E91 /* Notification+MessageSender.swift in Sources */ = {isa = PBXBuildFile; fileRef = C3471FA32555439E00297E91 /* Notification+MessageSender.swift */; };
C34C8F7423A7830B00D82669 /* SpaceMono-Bold.ttf in Resources */ = {isa = PBXBuildFile; fileRef = C34C8F7323A7830A00D82669 /* SpaceMono-Bold.ttf */; };
@ -1658,7 +1657,6 @@
C33FDC1E255A581F00E217F9 /* OWSUploadOperation.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = OWSUploadOperation.m; sourceTree = "<group>"; };
C33FDC1F255A581F00E217F9 /* LokiSessionRestorationImplementation.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = LokiSessionRestorationImplementation.swift; sourceTree = "<group>"; };
C3471ECA2555356A00297E91 /* MessageSender+Encryption.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "MessageSender+Encryption.swift"; sourceTree = "<group>"; };
C3471F4125553A4D00297E91 /* Threading.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Threading.swift; sourceTree = "<group>"; };
C3471F4B25553AB000297E91 /* MessageReceiver+Decryption.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "MessageReceiver+Decryption.swift"; sourceTree = "<group>"; };
C3471FA32555439E00297E91 /* Notification+MessageSender.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Notification+MessageSender.swift"; sourceTree = "<group>"; };
C34C8F7323A7830A00D82669 /* SpaceMono-Bold.ttf */ = {isa = PBXFileReference; lastKnownFileType = file; path = "SpaceMono-Bold.ttf"; sourceTree = "<group>"; };
@ -3410,7 +3408,6 @@
C3BBE0C62554F1570050F1E3 /* FixedWidthInteger+BigEndian.swift */,
C3A71D0A2558989C0043A11F /* MessageWrapper.swift */,
C3BBE0B42554F0E10050F1E3 /* ProofOfWork.swift */,
C3471F4125553A4D00297E91 /* Threading.swift */,
);
path = Utilities;
sourceTree = "<group>";
@ -5309,7 +5306,6 @@
C300A5BD2554B00D00555489 /* ReadReceipt.swift in Sources */,
C3BBE0762554CDA60050F1E3 /* Configuration.swift in Sources */,
C3D9E3BE25676AD70040E4F3 /* TSAttachmentPointer.m in Sources */,
C3471F4225553A4D00297E91 /* Threading.swift in Sources */,
C300A5DD2554B06600555489 /* ClosedGroupUpdate.swift in Sources */,
C3471FA42555439E00297E91 /* Notification+MessageSender.swift in Sources */,
C3A7222A2558C1E40043A11F /* DotNetAPI.swift in Sources */,

View File

@ -1,10 +1,15 @@
// TODO: Since we now have YapDatabase as a dependency in all modules we can work with YapDatabaseTransactions directly
// rather than passing transactions around as Any everywhere.
extension Storage {
// TODO: This is essentially a duplicate of Storage.writeSync
public func with(_ work: @escaping (Any) -> Void) {
Storage.writeSync { work($0) }
}
// TODO: This is essentially a duplicate of Storage.write
public func withAsync(_ work: @escaping (Any) -> Void, completion: @escaping () -> Void) {
Storage.write(with: { work($0) }, completion: completion)
}

View File

@ -5,10 +5,16 @@ public extension MessageSender {
@objc(send:withAttachments:inThread:usingTransaction:)
static func send(_ message: Message, with attachments: [SignalAttachment] = [], in thread: TSThread, using transaction: YapDatabaseReadWriteTransaction) {
if let message = message as? VisibleMessage {
guard let tsMessage = TSOutgoingMessage.find(withTimestamp: message.sentTimestamp!) else {
#if DEBUG
preconditionFailure()
#endif
return
}
var streams: [TSAttachmentStream] = []
attachments.forEach {
let stream = TSAttachmentStream(contentType: $0.mimeType, byteCount: UInt32($0.dataLength), sourceFilename: $0.sourceFilename,
caption: $0.captionText, albumMessageId: nil)
caption: $0.captionText, albumMessageId: tsMessage.uniqueId!)
streams.append(stream)
stream.write($0.dataSource)
stream.save(with: transaction)

View File

@ -22,9 +22,9 @@ public final class OpenGroupAPIDelegate : SessionMessagingKit.OpenGroupAPIDelega
Storage.shared.setProfilePictureURL(to: info.profilePictureURL, forOpenGroupWithID: openGroupID, using: transaction)
if let profilePictureURL = info.profilePictureURL {
var sanitizedServerURL = server
while sanitizedServerURL.hasSuffix("/") { sanitizedServerURL.removeLast() }
var sanitizedProfilePictureURL = profilePictureURL
while sanitizedServerURL.hasSuffix("/") { sanitizedServerURL.removeLast(1) }
while sanitizedProfilePictureURL.hasPrefix("/") { sanitizedProfilePictureURL.removeFirst(1) }
while sanitizedProfilePictureURL.hasPrefix("/") { sanitizedProfilePictureURL.removeFirst() }
let url = "\(sanitizedServerURL)/\(sanitizedProfilePictureURL)"
FileServerAPI.downloadAttachment(from: url).map2 { data in
let attachmentStream = TSAttachmentStream(contentType: OWSMimeTypeImageJpeg, byteCount: UInt32(data.count), sourceFilename: nil, caption: nil, albumMessageId: nil)