Converted getMessage function to more granular functions.

This is because we only want to process messages from long polling if it hasn't been stopped.
This commit is contained in:
Mikunj 2019-06-12 12:00:40 +10:00
parent 04bdaff3c6
commit 8c839090e1
4 changed files with 35 additions and 24 deletions

2
Pods

@ -1 +1 @@
Subproject commit 0c0008c2306e7b5be87c2c943fade9bd5612ee17
Subproject commit 788d43379038478b34765fbc9538470172ba0730

View File

@ -84,13 +84,16 @@ public extension LokiAPI {
func getMessagesInfinitely(from target: Target) -> Promise<Void> {
// The only way to exit the infinite loop is to throw an error 3 times or cancel
return getMessages(from: target).then { messages -> Promise<Void> in
// Send our messages as a notification
NotificationCenter.default.post(name: .receivedNewMessages, object: nil, userInfo: ["messages": messages])
return getRawMessages(from: target).then { rawMessages -> Promise<Void> in
// Check if we need to abort
guard !isCancelled else { throw PMKError.cancelled }
// Process the messages
let messages = process(rawMessages: rawMessages, from: target)
// Send our messages as a notification
NotificationCenter.default.post(name: .receivedNewMessages, object: nil, userInfo: ["messages": messages])
// Continue fetching if we haven't cancelled
return getMessagesInfinitely(from: target)
}.retryingIfNeeded(maxRetryCount: 3)
@ -99,15 +102,13 @@ public extension LokiAPI {
// Keep getting messages for this snode
// If we errored out then connect to the next snode
return getMessagesInfinitely(from: nextSnode).recover { _ -> Promise<Void> in
// Connect to the next snode if we haven't cancelled
if (!isCancelled) {
// We also need to remove the cached snode so we don't contact it again
removeCachedSnode(nextSnode, for: hexEncodedPublicKey)
return connectToNextSnode()
}
// Cancelled, so just return successfully
return Promise.value(())
guard !isCancelled else { return Promise.value(()) }
// Connect to the next snode if we haven't cancelled
// We also need to remove the cached snode so we don't contact it again
removeCachedSnode(nextSnode, for: hexEncodedPublicKey)
return connectToNextSnode()
}
}

View File

@ -43,15 +43,12 @@ import PromiseKit
.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey).recoveringNetworkErrorsIfNeeded()
}
// MARK: Public API
public static func getMessages() -> Promise<Set<MessageListPromise>> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
return getMessages(from: targetSnode, longPolling: false)
}.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount)
}
internal static func getMessages(from target: Target, longPolling: Bool = true) -> MessageListPromise {
return getRawMessages(from: target, longPolling: longPolling).map { process(rawMessages: $0, from: target) }
}
internal static func getRawMessages(from target: Target, longPolling: Bool = true) -> Promise<[JSON]> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
let lastHashValue = getLastMessageHashValue(for: target) ?? ""
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ]
@ -59,12 +56,25 @@ import PromiseKit
let timeout: TimeInterval? = longPolling ? 40 : nil // 40 second timeout
return invoke(.getMessages, on: target, associatedWith: hexEncodedPublicKey, parameters: parameters, headers: headers, timeout: timeout).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: target, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
return rawMessages
}
}
internal static func process(rawMessages: [JSON], from target: Target) -> [SSKProtoEnvelope] {
updateLastMessageHashValueIfPossible(for: target, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
// MARK: Public API
public static func getMessages() -> Promise<Set<MessageListPromise>> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
return getMessages(from: targetSnode, longPolling: false)
}.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount)
}
public static func sendSignalMessage(_ signalMessage: SignalMessage, with timestamp: UInt64, onP2PSuccess: @escaping () -> Void) -> Promise<Set<RawResponsePromise>> {
guard let lokiMessage = Message.from(signalMessage: signalMessage, with: timestamp) else { return Promise(error: Error.messageConversionFailed) }
let destination = lokiMessage.destination

View File

@ -174,7 +174,7 @@
AssertIsOnMainThread()
guard let message = onlineBroadcastMessage(forThread: thread) else {
owsFailDebug("P2P Address not set")
Logger.warn("[Loki] P2P Address not set")
return
}