Integrate swarm changes into message receiving code

This commit is contained in:
Niels Andriesse 2019-05-22 12:06:02 +10:00
parent e5463e545a
commit adcb469b53
4 changed files with 54 additions and 37 deletions

View File

@ -47,8 +47,30 @@ public class MessageFetcherJob: NSObject {
// }
// ========
Logger.info("fetching messages via REST.")
Logger.info("Fetching messages via REST.")
let promise = fetchUndeliveredMessages().then { promises -> Promise<Void> in
let promises = promises.map { promise -> Promise<Void> in
return promise.then { envelopes -> Promise<Void> in
for envelope in envelopes {
Logger.info("Envelope received.")
do {
let envelopeData = try envelope.serializedData()
self.messageReceiver.handleReceivedEnvelopeData(envelopeData)
} catch {
owsFailDebug("Failed to serialize envelope.")
}
self.acknowledgeDelivery(envelope: envelope)
}
return Promise.value(())
}
}
return when(resolved: promises).asVoid()
}
promise.retainUntilComplete()
return promise
/* Loki: Original code
* ========
let promise = self.fetchUndeliveredMessages().then { (envelopes: [SSKProtoEnvelope], more: Bool) -> Promise<Void> in
for envelope in envelopes {
Logger.info("received envelope.")
@ -73,6 +95,8 @@ public class MessageFetcherJob: NSObject {
promise.retainUntilComplete()
return promise
* ========
*/
}
@objc
@ -174,37 +198,30 @@ public class MessageFetcherJob: NSObject {
}
}
private func fetchUndeliveredMessages() -> Promise<(envelopes: [SSKProtoEnvelope], more: Bool)> {
notImplemented()
// return Promise { resolver in
// LokiAPI.getMessages().done { envelopes in
// resolver.fulfill((envelopes: envelopes, more: false))
// }.catch { error in
private func fetchUndeliveredMessages() -> Promise<Set<Promise<[SSKProtoEnvelope]>>> {
return LokiAPI.getMessages()
// Loki: Original code
// ========
// let request = OWSRequestFactory.getMessagesRequest()
// self.networkManager.makeRequest(
// request,
// success: { (_: URLSessionDataTask?, responseObject: Any?) -> Void in
// guard let (envelopes, more) = self.parseMessagesResponse(responseObject: responseObject) else {
// Logger.error("response object had unexpected content")
// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError())
// }
//
// resolver.fulfill((envelopes: envelopes, more: more))
// },
// failure: { (_: URLSessionDataTask?, error: Error?) in
// guard let error = error else {
// Logger.error("error was surpringly nil. sheesh rough day.")
// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError())
// }
//
// resolver.reject(error)
// }
// Loki: Original code
// ========
// let request = OWSRequestFactory.getMessagesRequest()
// self.networkManager.makeRequest(
// request,
// success: { (_: URLSessionDataTask?, responseObject: Any?) -> Void in
// guard let (envelopes, more) = self.parseMessagesResponse(responseObject: responseObject) else {
// Logger.error("response object had unexpected content")
// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError())
// }
//
// resolver.fulfill((envelopes: envelopes, more: more))
// },
// failure: { (_: URLSessionDataTask?, error: Error?) in
// guard let error = error else {
// Logger.error("error was surpringly nil. sheesh rough day.")
// return resolver.reject(OWSErrorMakeUnableToProcessServerResponseError())
// }
//
// resolver.reject(error)
// })
// ========
// }
// })
// ========
}
private func acknowledgeDelivery(envelope: SSKProtoEnvelope) {

View File

@ -118,7 +118,7 @@ import PromiseKit
}
private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] {
var receivedMessageHashValues = getReceivedMessageHashValues()
var receivedMessageHashValues = getReceivedMessageHashValues() ?? []
return rawMessages.filter { rawMessage in
guard let hashValue = rawMessage["hash"] as? String else {
Logger.warn("[Loki] Missing hash value for message: \(rawMessage).")
@ -161,8 +161,8 @@ import PromiseKit
}
}
private static func getReceivedMessageHashValues() -> Set<String> {
var result: Set<String> = []
private static func getReceivedMessageHashValues() -> Set<String>? {
var result: Set<String>? = nil
storage.dbReadConnection.read { transaction in
result = storage.getReceivedMessageHashes(with: transaction)
}

View File

@ -95,7 +95,7 @@ NS_ASSUME_NONNULL_BEGIN
*/
- (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:));
- (NSSet<NSString *> *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction;
- (NSSet<NSString *> *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction;
- (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction;
@end

View File

@ -153,7 +153,7 @@
[transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection];
}
- (NSSet<NSString *> *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction {
- (NSSet<NSString *> *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction {
return (NSSet *)[[transaction objectForKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection] as:NSSet.class];
}