This commit is contained in:
Niels Andriesse 2020-12-07 16:00:21 +11:00
parent 3f5bc18f6b
commit efe8f1c8bb
20 changed files with 76 additions and 64 deletions

View File

@ -1,17 +1,19 @@
import PromiseKit
// TODO: Since we now have YapDatabase as a dependency in all modules we can work with YapDatabaseTransactions directly
// rather than passing transactions around as Any everywhere.
extension Storage { extension Storage {
// TODO: This is essentially a duplicate of Storage.writeSync @discardableResult
public func with(_ work: @escaping (Any) -> Void) { public func write(with block: @escaping (Any) -> Void) -> Promise<Void> {
Storage.writeSync { work($0) } Storage.write(with: { block($0) })
} }
// TODO: This is essentially a duplicate of Storage.write @discardableResult
public func withAsync(_ work: @escaping (Any) -> Void, completion: @escaping () -> Void) { public func write(with block: @escaping (Any) -> Void, completion: @escaping () -> Void) -> Promise<Void> {
Storage.write(with: { work($0) }, completion: completion) Storage.write(with: { block($0) }, completion: completion)
}
public func writeSync(with block: @escaping (Any) -> Void) {
Storage.writeSync { block($0) }
} }
@objc public func getUserPublicKey() -> String? { @objc public func getUserPublicKey() -> String? {

View File

@ -53,21 +53,21 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
return handleFailure(error: Error.noAttachment) return handleFailure(error: Error.noAttachment)
} }
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.setAttachmentState(to: .downloading, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction) storage.setAttachmentState(to: .downloading, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction)
}, completion: { }) }, completion: { })
let temporaryFilePath = URL(fileURLWithPath: OWSTemporaryDirectoryAccessibleAfterFirstAuth() + UUID().uuidString) let temporaryFilePath = URL(fileURLWithPath: OWSTemporaryDirectoryAccessibleAfterFirstAuth() + UUID().uuidString)
let handleFailure: (Swift.Error) -> Void = { error in // Intentionally capture self let handleFailure: (Swift.Error) -> Void = { error in // Intentionally capture self
OWSFileSystem.deleteFile(temporaryFilePath.absoluteString) OWSFileSystem.deleteFile(temporaryFilePath.absoluteString)
if let error = error as? Error, case .noAttachment = error { if let error = error as? Error, case .noAttachment = error {
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction) storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction)
}, completion: { }) }, completion: { })
self.handlePermanentFailure(error: error) self.handlePermanentFailure(error: error)
} else if let error = error as? DotNetAPI.Error, case .parsingFailed = error { } else if let error = error as? DotNetAPI.Error, case .parsingFailed = error {
// No need to retry if the response is invalid. Most likely this means we (incorrectly) // No need to retry if the response is invalid. Most likely this means we (incorrectly)
// got a "Cannot GET ..." error from the file server. // got a "Cannot GET ..." error from the file server.
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction) storage.setAttachmentState(to: .failed, for: pointer, associatedWith: self.tsIncomingMessageID, using: transaction)
}, completion: { }) }, completion: { })
self.handlePermanentFailure(error: error) self.handlePermanentFailure(error: error)
@ -98,7 +98,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
return handleFailure(error) return handleFailure(error)
} }
OWSFileSystem.deleteFile(temporaryFilePath.absoluteString) OWSFileSystem.deleteFile(temporaryFilePath.absoluteString)
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.persist(stream, associatedWith: self.tsIncomingMessageID, using: transaction) storage.persist(stream, associatedWith: self.tsIncomingMessageID, using: transaction)
}, completion: { }) }, completion: { })
}.catch(on: DispatchQueue.global()) { error in }.catch(on: DispatchQueue.global()) { error in

View File

@ -81,7 +81,7 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
SNLog("Attachment uploaded successfully.") SNLog("Attachment uploaded successfully.")
delegate?.handleJobSucceeded(self) delegate?.handleJobSucceeded(self)
SNMessagingKitConfiguration.shared.storage.resumeMessageSendJobIfNeeded(messageSendJobID) SNMessagingKitConfiguration.shared.storage.resumeMessageSendJobIfNeeded(messageSendJobID)
Storage.shared.withAsync({ transaction in Storage.shared.write(with: { transaction in
var interaction: TSInteraction? var interaction: TSInteraction?
let transaction = transaction as! YapDatabaseReadWriteTransaction let transaction = transaction as! YapDatabaseReadWriteTransaction
TSDatabaseSecondaryIndexes.enumerateMessages(withTimestamp: self.message.sentTimestamp!, with: { _, key, _ in TSDatabaseSecondaryIndexes.enumerateMessages(withTimestamp: self.message.sentTimestamp!, with: { _, key, _ in
@ -108,7 +108,7 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
private func failAssociatedMessageSendJob(with error: Swift.Error) { private func failAssociatedMessageSendJob(with error: Swift.Error) {
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
let messageSendJob = storage.getMessageSendJob(for: messageSendJobID) let messageSendJob = storage.getMessageSendJob(for: messageSendJobID)
storage.withAsync({ transaction in // Intentionally capture self storage.write(with: { transaction in // Intentionally capture self
MessageSender.handleFailedMessageSend(self.message, with: error, using: transaction) MessageSender.handleFailedMessageSend(self.message, with: error, using: transaction)
if let messageSendJob = messageSendJob { if let messageSendJob = messageSendJob {
storage.markJobAsFailed(messageSendJob, using: transaction) storage.markJobAsFailed(messageSendJob, using: transaction)

View File

@ -39,7 +39,7 @@ public final class JobQueue : NSObject, JobDelegate {
} }
public func handleJobSucceeded(_ job: Job) { public func handleJobSucceeded(_ job: Job) {
SNMessagingKitConfiguration.shared.storage.withAsync({ transaction in SNMessagingKitConfiguration.shared.storage.write(with: { transaction in
SNMessagingKitConfiguration.shared.storage.markJobAsSucceeded(job, using: transaction) SNMessagingKitConfiguration.shared.storage.markJobAsSucceeded(job, using: transaction)
}, completion: { }, completion: {
// Do nothing // Do nothing
@ -50,11 +50,11 @@ public final class JobQueue : NSObject, JobDelegate {
job.failureCount += 1 job.failureCount += 1
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
guard !storage.isJobCanceled(job) else { return SNLog("\(type(of: job)) canceled.") } guard !storage.isJobCanceled(job) else { return SNLog("\(type(of: job)) canceled.") }
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.persist(job, using: transaction) storage.persist(job, using: transaction)
}, completion: { // Intentionally capture self }, completion: { // Intentionally capture self
if job.failureCount == type(of: job).maxFailureCount { if job.failureCount == type(of: job).maxFailureCount {
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.markJobAsFailed(job, using: transaction) storage.markJobAsFailed(job, using: transaction)
}, completion: { }, completion: {
// Do nothing // Do nothing
@ -70,10 +70,10 @@ public final class JobQueue : NSObject, JobDelegate {
public func handleJobFailedPermanently(_ job: Job, with error: Error) { public func handleJobFailedPermanently(_ job: Job, with error: Error) {
job.failureCount += 1 job.failureCount += 1
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.persist(job, using: transaction) storage.persist(job, using: transaction)
}, completion: { // Intentionally capture self }, completion: { // Intentionally capture self
storage.withAsync({ transaction in storage.write(with: { transaction in
storage.markJobAsFailed(job, using: transaction) storage.markJobAsFailed(job, using: transaction)
}, completion: { }, completion: {
// Do nothing // Do nothing

View File

@ -55,7 +55,7 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
public func execute() -> Promise<Void> { public func execute() -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending() let (promise, seal) = Promise<Void>.pending()
SNMessagingKitConfiguration.shared.storage.withAsync({ transaction in // Intentionally capture self SNMessagingKitConfiguration.shared.storage.write(with: { transaction in // Intentionally capture self
do { do {
let (message, proto) = try MessageReceiver.parse(self.data, openGroupMessageServerID: self.openGroupMessageServerID, using: transaction) let (message, proto) = try MessageReceiver.parse(self.data, openGroupMessageServerID: self.openGroupMessageServerID, using: transaction)
try MessageReceiver.handle(message, associatedWithProto: proto, openGroupID: self.openGroupID, isBackgroundPoll: self.isBackgroundPoll, using: transaction) try MessageReceiver.handle(message, associatedWithProto: proto, openGroupID: self.openGroupID, isBackgroundPoll: self.isBackgroundPoll, using: transaction)

View File

@ -71,14 +71,14 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
// Wait for it to finish // Wait for it to finish
} else { } else {
let job = AttachmentUploadJob(attachmentID: attachment.uniqueId!, threadID: message.threadID!, message: message, messageSendJobID: id!) let job = AttachmentUploadJob(attachmentID: attachment.uniqueId!, threadID: message.threadID!, message: message, messageSendJobID: id!)
storage.withAsync({ transaction in storage.write(with: { transaction in
JobQueue.shared.add(job, using: transaction) JobQueue.shared.add(job, using: transaction)
}, completion: { }) }, completion: { })
} }
} }
if !attachmentsToUpload.isEmpty { return } // Wait for all attachments to upload before continuing if !attachmentsToUpload.isEmpty { return } // Wait for all attachments to upload before continuing
} }
storage.withAsync({ transaction in // Intentionally capture self storage.write(with: { transaction in // Intentionally capture self
MessageSender.send(self.message, to: self.destination, using: transaction).done(on: DispatchQueue.global(qos: .userInitiated)) { MessageSender.send(self.message, to: self.destination, using: transaction).done(on: DispatchQueue.global(qos: .userInitiated)) {
self.handleSuccess() self.handleSuccess()
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in }.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in

View File

@ -34,7 +34,7 @@ public final class SessionRequest : ControlMessage {
public override class func fromProto(_ proto: SNProtoContent) -> SessionRequest? { public override class func fromProto(_ proto: SNProtoContent) -> SessionRequest? {
guard proto.nullMessage != nil, let preKeyBundleProto = proto.prekeyBundleMessage else { return nil } guard proto.nullMessage != nil, let preKeyBundleProto = proto.prekeyBundleMessage else { return nil }
var registrationID: UInt32 = 0 var registrationID: UInt32 = 0
SNMessagingKitConfiguration.shared.storage.with { transaction in SNMessagingKitConfiguration.shared.storage.writeSync { transaction in
registrationID = SNMessagingKitConfiguration.shared.storage.getOrGenerateRegistrationID(using: transaction) registrationID = SNMessagingKitConfiguration.shared.storage.getOrGenerateRegistrationID(using: transaction)
} }
guard let preKeyBundle = PreKeyBundle( guard let preKeyBundle = PreKeyBundle(

View File

@ -27,7 +27,7 @@ public final class OpenGroupAPI : DotNetAPI {
let url = URL(string: server)! let url = URL(string: server)!
let request = TSRequest(url: url) let request = TSRequest(url: url)
return OnionRequestAPI.sendOnionRequest(request, to: server, using: publicKey, isJSONRequired: false).map(on: DispatchQueue.global(qos: .default)) { _ -> String in return OnionRequestAPI.sendOnionRequest(request, to: server, using: publicKey, isJSONRequired: false).map(on: DispatchQueue.global(qos: .default)) { _ -> String in
SNMessagingKitConfiguration.shared.storage.with { transaction in SNMessagingKitConfiguration.shared.storage.writeSync { transaction in
SNMessagingKitConfiguration.shared.storage.setOpenGroupPublicKey(for: server, to: publicKey, using: transaction) SNMessagingKitConfiguration.shared.storage.setOpenGroupPublicKey(for: server, to: publicKey, using: transaction)
} }
return publicKey return publicKey
@ -81,7 +81,7 @@ public final class OpenGroupAPI : DotNetAPI {
} }
let lastMessageServerID = storage.getLastMessageServerID(for: channel, on: server) let lastMessageServerID = storage.getLastMessageServerID(for: channel, on: server)
if serverID > (lastMessageServerID ?? 0) { if serverID > (lastMessageServerID ?? 0) {
storage.with { transaction in storage.writeSync { transaction in
storage.setLastMessageServerID(for: channel, on: server, to: serverID, using: transaction) storage.setLastMessageServerID(for: channel, on: server, to: serverID, using: transaction)
} }
} }
@ -205,7 +205,7 @@ public final class OpenGroupAPI : DotNetAPI {
} }
let lastDeletionServerID = storage.getLastDeletionServerID(for: channel, on: server) let lastDeletionServerID = storage.getLastDeletionServerID(for: channel, on: server)
if serverID > (lastDeletionServerID ?? 0) { if serverID > (lastDeletionServerID ?? 0) {
storage.with { transaction in storage.writeSync { transaction in
storage.setLastDeletionServerID(for: channel, on: server, to: serverID, using: transaction) storage.setLastDeletionServerID(for: channel, on: server, to: serverID, using: transaction)
} }
} }
@ -256,7 +256,7 @@ public final class OpenGroupAPI : DotNetAPI {
throw Error.parsingFailed throw Error.parsingFailed
} }
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
storage.with { transaction in storage.writeSync { transaction in
data.forEach { data in data.forEach { data in
guard let user = data["user"] as? JSON, let hexEncodedPublicKey = user["username"] as? String, let rawDisplayName = user["name"] as? String else { return } guard let user = data["user"] as? JSON, let hexEncodedPublicKey = user["username"] as? String, let rawDisplayName = user["name"] as? String else { return }
let endIndex = hexEncodedPublicKey.endIndex let endIndex = hexEncodedPublicKey.endIndex
@ -346,7 +346,7 @@ public final class OpenGroupAPI : DotNetAPI {
throw Error.parsingFailed throw Error.parsingFailed
} }
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
storage.with { transaction in storage.writeSync { transaction in
storage.setUserCount(to: memberCount, forOpenGroupWithID: "\(server).\(channel)", using: transaction) storage.setUserCount(to: memberCount, forOpenGroupWithID: "\(server).\(channel)", using: transaction)
} }
let openGroupInfo = OpenGroupInfo(displayName: displayName, profilePictureURL: profilePictureURL, memberCount: memberCount) let openGroupInfo = OpenGroupInfo(displayName: displayName, profilePictureURL: profilePictureURL, memberCount: memberCount)
@ -360,7 +360,8 @@ public final class OpenGroupAPI : DotNetAPI {
public static func updateProfileIfNeeded(for channel: UInt64, on server: String, from info: OpenGroupInfo) { public static func updateProfileIfNeeded(for channel: UInt64, on server: String, from info: OpenGroupInfo) {
let openGroupID = "\(server).\(channel)" let openGroupID = "\(server).\(channel)"
Storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
let transaction = transaction as! YapDatabaseReadWriteTransaction
// Update user count // Update user count
Storage.shared.setUserCount(to: info.memberCount, forOpenGroupWithID: openGroupID, using: transaction) Storage.shared.setUserCount(to: info.memberCount, forOpenGroupWithID: openGroupID, using: transaction)
let thread = TSGroupThread.getOrCreateThread(withGroupId: openGroupID.data(using: .utf8)!, groupType: .openGroup, transaction: transaction) let thread = TSGroupThread.getOrCreateThread(withGroupId: openGroupID.data(using: .utf8)!, groupType: .openGroup, transaction: transaction)
@ -474,7 +475,7 @@ internal extension Promise {
if case OnionRequestAPI.Error.httpRequestFailedAtDestination(let statusCode, _) = error, statusCode == 401 || statusCode == 403 { if case OnionRequestAPI.Error.httpRequestFailedAtDestination(let statusCode, _) = error, statusCode == 401 || statusCode == 403 {
SNLog("Auth token for: \(server) expired; dropping it.") SNLog("Auth token for: \(server) expired; dropping it.")
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
storage.with { transaction in storage.writeSync { transaction in
storage.removeAuthToken(for: server, using: transaction) storage.removeAuthToken(for: server, using: transaction)
} }
} }

View File

@ -89,7 +89,7 @@ extension MessageSender : SharedSenderKeysDelegate {
} }
when(resolved: promises).done2 { _ in seal.fulfill(()) }.catch2 { seal.reject($0) } when(resolved: promises).done2 { _ in seal.fulfill(()) }.catch2 { seal.reject($0) }
let _ = promise.done { let _ = promise.done {
Storage.writeSync { transaction in SNMessagingKitConfiguration.shared.storage.writeSync { transaction in
let allOldRatchets = Storage.shared.getAllClosedGroupRatchets(for: groupPublicKey) let allOldRatchets = Storage.shared.getAllClosedGroupRatchets(for: groupPublicKey)
for (senderPublicKey, oldRatchet) in allOldRatchets { for (senderPublicKey, oldRatchet) in allOldRatchets {
let collection = ClosedGroupRatchetCollectionType.old let collection = ClosedGroupRatchetCollectionType.old
@ -106,6 +106,7 @@ extension MessageSender : SharedSenderKeysDelegate {
} else { } else {
// Send closed group update messages to any new members using established channels // Send closed group update messages to any new members using established channels
for member in newMembers { for member in newMembers {
let transaction = transaction as! YapDatabaseReadWriteTransaction
let thread = TSContactThread.getOrCreateThread(withContactId: member, transaction: transaction) let thread = TSContactThread.getOrCreateThread(withContactId: member, transaction: transaction)
thread.save(with: transaction) thread.save(with: transaction)
let closedGroupUpdateKind = ClosedGroupUpdate.Kind.new(groupPublicKey: Data(hex: groupPublicKey), name: name, let closedGroupUpdateKind = ClosedGroupUpdate.Kind.new(groupPublicKey: Data(hex: groupPublicKey), name: name,
@ -118,6 +119,7 @@ extension MessageSender : SharedSenderKeysDelegate {
let userRatchet = SharedSenderKeys.generateRatchet(for: groupPublicKey, senderPublicKey: userPublicKey, using: transaction) let userRatchet = SharedSenderKeys.generateRatchet(for: groupPublicKey, senderPublicKey: userPublicKey, using: transaction)
let userSenderKey = ClosedGroupSenderKey(chainKey: Data(hex: userRatchet.chainKey), keyIndex: userRatchet.keyIndex, publicKey: Data(hex: userPublicKey)) let userSenderKey = ClosedGroupSenderKey(chainKey: Data(hex: userRatchet.chainKey), keyIndex: userRatchet.keyIndex, publicKey: Data(hex: userPublicKey))
for member in members { for member in members {
let transaction = transaction as! YapDatabaseReadWriteTransaction
guard member != userPublicKey else { continue } guard member != userPublicKey else { continue }
let thread = TSContactThread.getOrCreateThread(withContactId: member, transaction: transaction) let thread = TSContactThread.getOrCreateThread(withContactId: member, transaction: transaction)
thread.save(with: transaction) thread.save(with: transaction)

View File

@ -133,7 +133,7 @@ public final class MessageSender : NSObject {
guard message.isValid else { handleFailure(with: Error.invalidMessage, using: transaction); return promise } guard message.isValid else { handleFailure(with: Error.invalidMessage, using: transaction); return promise }
// Stop here if this is a self-send // Stop here if this is a self-send
guard !isSelfSend else { guard !isSelfSend else {
storage.withAsync({ transaction in storage.write(with: { transaction in
MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction) MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction)
seal.fulfill(()) seal.fulfill(())
}, completion: { }) }, completion: { })
@ -237,7 +237,7 @@ public final class MessageSender : NSObject {
NotificationCenter.default.post(name: .messageSent, object: NSNumber(value: message.sentTimestamp!)) NotificationCenter.default.post(name: .messageSent, object: NSNumber(value: message.sentTimestamp!))
} }
} }
storage.withAsync({ transaction in storage.write(with: { transaction in
MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction) MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction)
var shouldNotify = (message is VisibleMessage) var shouldNotify = (message is VisibleMessage)
if let closedGroupUpdate = message as? ClosedGroupUpdate, case .new = closedGroupUpdate.kind { if let closedGroupUpdate = message as? ClosedGroupUpdate, case .new = closedGroupUpdate.kind {
@ -262,14 +262,14 @@ public final class MessageSender : NSObject {
$0.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in $0.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
errorCount += 1 errorCount += 1
guard errorCount == promiseCount else { return } // Only error out if all promises failed guard errorCount == promiseCount else { return } // Only error out if all promises failed
storage.withAsync({ transaction in storage.write(with: { transaction in
handleFailure(with: error, using: transaction as! YapDatabaseReadWriteTransaction) handleFailure(with: error, using: transaction as! YapDatabaseReadWriteTransaction)
}, completion: { }) }, completion: { })
} }
} }
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in }.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
SNLog("Couldn't send message due to error: \(error).") SNLog("Couldn't send message due to error: \(error).")
storage.withAsync({ transaction in storage.write(with: { transaction in
handleFailure(with: error, using: transaction as! YapDatabaseReadWriteTransaction) handleFailure(with: error, using: transaction as! YapDatabaseReadWriteTransaction)
}, completion: { }) }, completion: { })
} }
@ -322,12 +322,12 @@ public final class MessageSender : NSObject {
// Send the result // Send the result
OpenGroupAPI.sendMessage(openGroupMessage, to: channel, on: server).done(on: DispatchQueue.global(qos: .userInitiated)) { openGroupMessage in OpenGroupAPI.sendMessage(openGroupMessage, to: channel, on: server).done(on: DispatchQueue.global(qos: .userInitiated)) { openGroupMessage in
message.openGroupServerMessageID = openGroupMessage.serverID message.openGroupServerMessageID = openGroupMessage.serverID
storage.withAsync({ transaction in storage.write(with: { transaction in
MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction) MessageSender.handleSuccessfulMessageSend(message, to: destination, using: transaction)
seal.fulfill(()) seal.fulfill(())
}, completion: { }) }, completion: { })
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in }.catch(on: DispatchQueue.global(qos: .userInitiated)) { error in
storage.withAsync({ transaction in storage.write(with: { transaction in
handleFailure(with: error, using: transaction as! YapDatabaseReadWriteTransaction) handleFailure(with: error, using: transaction as! YapDatabaseReadWriteTransaction)
}, completion: { }) }, completion: { })
} }

View File

@ -68,7 +68,7 @@ public final class ClosedGroupPoller : NSObject {
do { do {
let data = try envelope.serializedData() let data = try envelope.serializedData()
let job = MessageReceiveJob(data: data, isBackgroundPoll: false) let job = MessageReceiveJob(data: data, isBackgroundPoll: false)
Storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction) SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
} }
} catch { } catch {

View File

@ -167,7 +167,7 @@ public final class OpenGroupPoller : NSObject {
envelope.setSourceDevice(1) envelope.setSourceDevice(1)
envelope.setContent(try! content.build().serializedData()) envelope.setContent(try! content.build().serializedData())
envelope.setServerTimestamp(message.serverTimestamp) envelope.setServerTimestamp(message.serverTimestamp)
Storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
Storage.shared.setOpenGroupDisplayName(to: senderDisplayName, for: senderPublicKey, inOpenGroupWithID: openGroup.id, using: transaction) Storage.shared.setOpenGroupDisplayName(to: senderDisplayName, for: senderPublicKey, inOpenGroupWithID: openGroup.id, using: transaction)
let messageServerID = message.serverID let messageServerID = message.serverID
let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: messageServerID, openGroupID: openGroup.id, isBackgroundPoll: isBackgroundPoll) let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: messageServerID, openGroupID: openGroup.id, isBackgroundPoll: isBackgroundPoll)
@ -193,9 +193,9 @@ public final class OpenGroupPoller : NSObject {
let openGroup = self.openGroup let openGroup = self.openGroup
let _ = OpenGroupAPI.getDeletedMessageServerIDs(for: openGroup.channel, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { deletedMessageServerIDs in let _ = OpenGroupAPI.getDeletedMessageServerIDs(for: openGroup.channel, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { deletedMessageServerIDs in
let deletedMessageIDs = deletedMessageServerIDs.compactMap { Storage.shared.getIDForMessage(withServerID: UInt64($0)) } let deletedMessageIDs = deletedMessageServerIDs.compactMap { Storage.shared.getIDForMessage(withServerID: UInt64($0)) }
Storage.writeSync { transaction in SNMessagingKitConfiguration.shared.storage.writeSync { transaction in
deletedMessageIDs.forEach { messageID in deletedMessageIDs.forEach { messageID in
TSMessage.fetch(uniqueId: String(messageID))?.remove(with: transaction) TSMessage.fetch(uniqueId: String(messageID))?.remove(with: transaction as! YapDatabaseReadWriteTransaction)
} }
} }
} }

View File

@ -98,7 +98,7 @@ public final class Poller : NSObject {
do { do {
let data = try envelope.serializedData() let data = try envelope.serializedData()
let job = MessageReceiveJob(data: data, isBackgroundPoll: false) let job = MessageReceiveJob(data: data, isBackgroundPoll: false)
Storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction) SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
} }
} catch { } catch {

View File

@ -272,8 +272,8 @@ public class TypingIndicatorsImpl: NSObject, TypingIndicators {
let typingIndicator = TypingIndicator() let typingIndicator = TypingIndicator()
typingIndicator.kind = action typingIndicator.kind = action
Storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
MessageSender.send(typingIndicator, in: thread, using: transaction) MessageSender.send(typingIndicator, in: thread, using: transaction as! YapDatabaseReadWriteTransaction)
} }
} }
} }

View File

@ -1,11 +1,15 @@
import SessionProtocolKit import SessionProtocolKit
import PromiseKit
public protocol SessionMessagingKitStorageProtocol { public protocol SessionMessagingKitStorageProtocol {
// MARK: - Shared // MARK: - Shared
func with(_ work: @escaping (Any) -> Void) @discardableResult
func withAsync(_ work: @escaping (Any) -> Void, completion: @escaping () -> Void) func write(with block: @escaping (Any) -> Void) -> Promise<Void>
@discardableResult
func write(with block: @escaping (Any) -> Void, completion: @escaping () -> Void) -> Promise<Void>
func writeSync(with block: @escaping (Any) -> Void)
// MARK: - General // MARK: - General

View File

@ -94,7 +94,7 @@ public class DotNetAPI : NSObject {
return Promise.value(token) return Promise.value(token)
} else { } else {
return requestNewAuthToken(for: server).then(on: DispatchQueue.global(qos: .userInitiated)) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global(qos: .userInitiated)) { token in return requestNewAuthToken(for: server).then(on: DispatchQueue.global(qos: .userInitiated)) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global(qos: .userInitiated)) { token in
storage.with { transaction in storage.writeSync { transaction in
storage.setAuthToken(for: server, to: token, using: transaction) storage.setAuthToken(for: server, to: token, using: transaction)
} }
return token return token

View File

@ -5,7 +5,7 @@ public enum ClosedGroupRatchetCollectionType {
public protocol SessionProtocolKitStorageProtocol { public protocol SessionProtocolKitStorageProtocol {
func with(_ work: @escaping (Any) -> Void) func writeSync(with block: @escaping (Any) -> Void)
func getUserKeyPair() -> ECKeyPair? func getUserKeyPair() -> ECKeyPair?
func getClosedGroupRatchet(for groupPublicKey: String, senderPublicKey: String, from collection: ClosedGroupRatchetCollectionType) -> ClosedGroupRatchet? func getClosedGroupRatchet(for groupPublicKey: String, senderPublicKey: String, from collection: ClosedGroupRatchetCollectionType) -> ClosedGroupRatchet?

View File

@ -140,7 +140,7 @@ public enum OnionRequestAPI {
} }
}.map2 { paths in }.map2 { paths in
OnionRequestAPI.paths = paths + reusablePaths OnionRequestAPI.paths = paths + reusablePaths
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNLog("Persisting onion request paths to database.") SNLog("Persisting onion request paths to database.")
SNSnodeKitConfiguration.shared.storage.setOnionRequestPaths(to: paths, using: transaction) SNSnodeKitConfiguration.shared.storage.setOnionRequestPaths(to: paths, using: transaction)
} }
@ -226,7 +226,7 @@ public enum OnionRequestAPI {
oldPaths.remove(at: pathIndex) oldPaths.remove(at: pathIndex)
let newPaths = oldPaths + [ path ] let newPaths = oldPaths + [ path ]
paths = newPaths paths = newPaths
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNLog("Persisting onion request paths to database.") SNLog("Persisting onion request paths to database.")
SNSnodeKitConfiguration.shared.storage.setOnionRequestPaths(to: newPaths, using: transaction) SNSnodeKitConfiguration.shared.storage.setOnionRequestPaths(to: newPaths, using: transaction)
} }
@ -241,7 +241,7 @@ public enum OnionRequestAPI {
guard let pathIndex = paths.firstIndex(of: path) else { return } guard let pathIndex = paths.firstIndex(of: path) else { return }
paths.remove(at: pathIndex) paths.remove(at: pathIndex)
OnionRequestAPI.paths = paths OnionRequestAPI.paths = paths
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
if !paths.isEmpty { if !paths.isEmpty {
SNLog("Persisting onion request paths to database.") SNLog("Persisting onion request paths to database.")
SNSnodeKitConfiguration.shared.storage.setOnionRequestPaths(to: paths, using: transaction) SNSnodeKitConfiguration.shared.storage.setOnionRequestPaths(to: paths, using: transaction)

View File

@ -67,7 +67,7 @@ public final class SnodeAPI : NSObject {
let isSnodePoolExpired = given(Storage.shared.getLastSnodePoolRefreshDate()) { now.timeIntervalSince($0) > 24 * 60 * 60 } ?? true let isSnodePoolExpired = given(Storage.shared.getLastSnodePoolRefreshDate()) { now.timeIntervalSince($0) > 24 * 60 * 60 } ?? true
let isRefreshNeeded = (snodePool.count < minimumSnodePoolCount) || isSnodePoolExpired let isRefreshNeeded = (snodePool.count < minimumSnodePoolCount) || isSnodePoolExpired
if isRefreshNeeded { if isRefreshNeeded {
Storage.write { transaction in SNSnodeKitConfiguration.shared.storage.write { transaction in
Storage.shared.setLastSnodePoolRefreshDate(to: now, using: transaction) Storage.shared.setLastSnodePoolRefreshDate(to: now, using: transaction)
} }
let target = seedNodePool.randomElement()! let target = seedNodePool.randomElement()!
@ -104,7 +104,7 @@ public final class SnodeAPI : NSObject {
} }
}.done2 { snode in }.done2 { snode in
seal.fulfill(snode) seal.fulfill(snode)
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNLog("Persisting snode pool to database.") SNLog("Persisting snode pool to database.")
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: SnodeAPI.snodePool, using: transaction) SNSnodeKitConfiguration.shared.storage.setSnodePool(to: SnodeAPI.snodePool, using: transaction)
} }
@ -129,7 +129,7 @@ public final class SnodeAPI : NSObject {
var snodePool = SnodeAPI.snodePool var snodePool = SnodeAPI.snodePool
snodePool.remove(snode) snodePool.remove(snode)
SnodeAPI.snodePool = snodePool SnodeAPI.snodePool = snodePool
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: snodePool, using: transaction) SNSnodeKitConfiguration.shared.storage.setSnodePool(to: snodePool, using: transaction)
} }
} }
@ -137,7 +137,7 @@ public final class SnodeAPI : NSObject {
// MARK: Public API // MARK: Public API
@objc public static func clearSnodePool() { @objc public static func clearSnodePool() {
snodePool.removeAll() snodePool.removeAll()
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: [], using: transaction) SNSnodeKitConfiguration.shared.storage.setSnodePool(to: [], using: transaction)
} }
} }
@ -150,7 +150,7 @@ public final class SnodeAPI : NSObject {
if var swarm = swarm, let index = swarm.firstIndex(of: snode) { if var swarm = swarm, let index = swarm.firstIndex(of: snode) {
swarm.remove(at: index) swarm.remove(at: index)
SnodeAPI.swarmCache[publicKey] = swarm SnodeAPI.swarmCache[publicKey] = swarm
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction) SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction)
} }
} }
@ -177,7 +177,7 @@ public final class SnodeAPI : NSObject {
}.map2 { rawSnodes in }.map2 { rawSnodes in
let swarm = parseSnodes(from: rawSnodes) let swarm = parseSnodes(from: rawSnodes)
swarmCache[publicKey] = swarm swarmCache[publicKey] = swarm
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction) SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction)
} }
return swarm return swarm
@ -187,7 +187,7 @@ public final class SnodeAPI : NSObject {
public static func getRawMessages(from snode: Snode, associatedWith publicKey: String) -> RawResponsePromise { public static func getRawMessages(from snode: Snode, associatedWith publicKey: String) -> RawResponsePromise {
let storage = SNSnodeKitConfiguration.shared.storage let storage = SNSnodeKitConfiguration.shared.storage
storage.with { transaction in storage.writeSync { transaction in
storage.pruneLastMessageHashInfoIfExpired(for: snode, associatedWith: publicKey, using: transaction) storage.pruneLastMessageHashInfoIfExpired(for: snode, associatedWith: publicKey, using: transaction)
} }
let lastHash = storage.getLastMessageHash(for: snode, associatedWith: publicKey) ?? "" let lastHash = storage.getLastMessageHash(for: snode, associatedWith: publicKey) ?? ""
@ -201,7 +201,7 @@ public final class SnodeAPI : NSObject {
Threading.workQueue.async { Threading.workQueue.async {
attempt(maxRetryCount: maxRetryCount, recoveringOn: Threading.workQueue) { attempt(maxRetryCount: maxRetryCount, recoveringOn: Threading.workQueue) {
getTargetSnodes(for: publicKey).mapValues2 { targetSnode in getTargetSnodes(for: publicKey).mapValues2 { targetSnode in
storage.with { transaction in storage.writeSync { transaction in
storage.pruneLastMessageHashInfoIfExpired(for: targetSnode, associatedWith: publicKey, using: transaction) storage.pruneLastMessageHashInfoIfExpired(for: targetSnode, associatedWith: publicKey, using: transaction)
} }
let lastHash = storage.getLastMessageHash(for: targetSnode, associatedWith: publicKey) ?? "" let lastHash = storage.getLastMessageHash(for: targetSnode, associatedWith: publicKey) ?? ""
@ -267,7 +267,7 @@ public final class SnodeAPI : NSObject {
private static func updateLastMessageHashValueIfPossible(for snode: Snode, associatedWith publicKey: String, from rawMessages: [JSON]) { private static func updateLastMessageHashValueIfPossible(for snode: Snode, associatedWith publicKey: String, from rawMessages: [JSON]) {
if let lastMessage = rawMessages.last, let lastHash = lastMessage["hash"] as? String, let expirationDate = lastMessage["expiration"] as? UInt64 { if let lastMessage = rawMessages.last, let lastHash = lastMessage["hash"] as? String, let expirationDate = lastMessage["expiration"] as? UInt64 {
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setLastMessageHashInfo(for: snode, associatedWith: publicKey, SNSnodeKitConfiguration.shared.storage.setLastMessageHashInfo(for: snode, associatedWith: publicKey,
to: [ "hash" : lastHash, "expirationDate" : NSNumber(value: expirationDate) ], using: transaction) to: [ "hash" : lastHash, "expirationDate" : NSNumber(value: expirationDate) ], using: transaction)
} }
@ -285,7 +285,7 @@ public final class SnodeAPI : NSObject {
} }
let isDuplicate = receivedMessages.contains(hash) let isDuplicate = receivedMessages.contains(hash)
receivedMessages.insert(hash) receivedMessages.insert(hash)
SNSnodeKitConfiguration.shared.storage.with { transaction in SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setReceivedMessages(to: receivedMessages, for: publicKey, using: transaction) SNSnodeKitConfiguration.shared.storage.setReceivedMessages(to: receivedMessages, for: publicKey, using: transaction)
} }
return !isDuplicate return !isDuplicate

View File

@ -1,8 +1,11 @@
import SessionUtilitiesKit import SessionUtilitiesKit
import PromiseKit
public protocol SessionSnodeKitStorageProtocol { public protocol SessionSnodeKitStorageProtocol {
func with(_ work: @escaping (Any) -> Void) @discardableResult
func write(with block: @escaping (Any) -> Void) -> Promise<Void>
func writeSync(with block: @escaping (Any) -> Void)
func getUserPublicKey() -> String? func getUserPublicKey() -> String?
func getOnionRequestPaths() -> [OnionRequestAPI.Path] func getOnionRequestPaths() -> [OnionRequestAPI.Path]