Filter duplicate messages & make parsing strategy consistent

This commit is contained in:
Niels Andriesse 2019-05-22 11:32:32 +10:00
parent f3e2737043
commit e5463e545a
8 changed files with 145 additions and 93 deletions

View file

@ -15,7 +15,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -33,7 +33,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -56,7 +56,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -87,7 +87,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>

View file

@ -15,7 +15,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -117,7 +117,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -152,7 +152,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -193,7 +193,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>

View file

@ -30,7 +30,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -72,7 +72,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>
@ -92,7 +92,7 @@
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "D221A088169C9E5E00537ABF"
BuildableName = "Session.app"
BuildableName = "Signal.app"
BlueprintName = "Signal"
ReferencedContainer = "container:Signal.xcodeproj">
</BuildableReference>

View file

@ -7,7 +7,7 @@
<key>CarthageVersion</key>
<string>0.33.0</string>
<key>OSXVersion</key>
<string>10.14.4</string>
<string>10.14.5</string>
<key>WebRTCCommit</key>
<string>1445d719bf05280270e9f77576f80f973fd847f8 M73</string>
</dict>

View file

@ -35,22 +35,6 @@ extension LokiAPI {
}
}
/// Unwrap data sent by the storage server.
///
/// - Parameter data: The data from the storage server (not base 64 encoded).
/// - Returns: An `SSKProtoEnvelope` object.
/// - Throws: A `WrappingError` if something went wrong.
static func unwrap(data: Data) throws -> SSKProtoEnvelope {
do {
let webSocketMessage = try WebSocketProtoWebSocketMessage.parseData(data)
let envelope = webSocketMessage.request!.body!
return try SSKProtoEnvelope.parseData(envelope)
} catch let error {
owsFailDebug("[Loki] Failed to unwrap data: \(error).")
throw WrappingError.failedToUnwrapData
}
}
/// Wrap an `SSKProtoEnvelope` in a `WebSocketProtoWebSocketMessage`.
private static func createWebSocketMessage(around envelope: SSKProtoEnvelope) throws -> WebSocketProtoWebSocketMessage {
do {
@ -91,4 +75,20 @@ extension LokiAPI {
throw WrappingError.failedToWrapMessageInEnvelope
}
}
/// Unwrap data sent by the storage server.
///
/// - Parameter data: The data from the storage server (not base 64 encoded).
/// - Returns: An `SSKProtoEnvelope` object.
/// - Throws: A `WrappingError` if something went wrong.
static func unwrap(data: Data) throws -> SSKProtoEnvelope {
do {
let webSocketMessage = try WebSocketProtoWebSocketMessage.parseData(data)
let envelope = webSocketMessage.request!.body!
return try SSKProtoEnvelope.parseData(envelope)
} catch let error {
owsFailDebug("[Loki] Failed to unwrap data: \(error).")
throw WrappingError.failedToUnwrapData
}
}
}

View file

@ -3,15 +3,15 @@ import PromiseKit
@objc public final class LokiAPI : NSObject {
private static let storage = OWSPrimaryStorage.shared()
// MARK: Caching
private static var swarmCache: [String:Set<Target>] = [:]
// MARK: Settings
private static let version = "v1"
private static let defaultSnodePort: UInt16 = 8080
private static let targetSnodeCount = 2
public static let defaultMessageTTL: UInt64 = 4 * 24 * 60 * 60
// MARK: Caching
private static var swarmCache: [String:[Target]] = [:]
// MARK: Types
private struct Target : Hashable {
let address: String
@ -50,45 +50,43 @@ import PromiseKit
return Promise<Target> { _ in notImplemented() } // TODO: Implement
}
private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<Set<Target>> {
private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> {
if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= targetSnodeCount {
return Promise<Set<Target>> { $0.fulfill(cachedSwarm) }
return Promise<[Target]> { $0.fulfill(cachedSwarm) }
} else {
return getRandomSnode().then { invoke(.getSwarm, on: $0, with: [ "pubKey" : hexEncodedPublicKey ]) }
.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 }
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ]
return getRandomSnode().then { invoke(.getSwarm, on: $0, with: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 }
}
}
private static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<Set<Target>> {
return getSwarm(for: hexEncodedPublicKey).map { Set(Array($0).shuffled().prefix(targetSnodeCount)) }
private static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<[Target]> {
// shuffled() uses the system's default random generator, which is cryptographically secure
return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSnodeCount)) }
}
// MARK: Public API
public static func getMessages() -> Promise<Set<Promise<Set<SSKProtoEnvelope>>>> {
public static func getMessages() -> Promise<Set<Promise<[SSKProtoEnvelope]>>> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
let lastHash = getLastHash(for: targetSnode) ?? ""
let lastHash = getLastMessageHashValue(for: targetSnode) ?? ""
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHash ]
return invoke(.getMessages, on: targetSnode, with: parameters).map { rawResponse in
if let json = rawResponse as? JSON, let messages = json["messages"] as? [JSON], let lastMessage = messages.last,
let hash = lastMessage["hash"] as? String, let expiresAt = lastMessage["expiration"] as? Int {
setLastHash(for: targetSnode, hash: hash, expiresAt: UInt64(expiresAt))
}
return parseProtoEnvelopes(from: rawResponse)
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
}.map { Set($0) }
}
public static func sendMessage(_ lokiMessage: Message) -> Promise<[Promise<RawResponse>]> {
public static func sendMessage(_ lokiMessage: Message) -> Promise<Set<Promise<RawResponse>>> {
let parameters = lokiMessage.toJSON()
return getTargetSnodes(for: lokiMessage.destination).mapValues { invoke(.sendMessage, on: $0, with: parameters)
.recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }
return getTargetSnodes(for: lokiMessage.destination).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) }
}
public static func ping(_ hexEncodedPublicKey: String) -> Promise<[Promise<RawResponse>]> {
public static func ping(_ hexEncodedPublicKey: String) -> Promise<Set<Promise<RawResponse>>> {
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] // TODO: Figure out correct parameters
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, with: parameters)
.recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) }
}
// MARK: Public API (Obj-C)
@ -99,43 +97,86 @@ import PromiseKit
return anyPromise
}
// MARK: Last Hash
private static func setLastHash(for target: Target, hash: String, expiresAt: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setLastMessageHash(forServiceNode: target.address, hash: hash, expiresAt: expiresAt, transaction: transaction)
}
}
private static func getLastHash(for target: Target) -> String? {
var lastHash: String?
storage.dbReadWriteConnection.readWrite { transaction in
lastHash = storage.getLastMessageHash(forServiceNode: target.address, transaction: transaction)
}
return lastHash
}
// MARK: Parsing
private static func parseTargets(from rawResponse: Any) -> Set<Target> {
guard let json = rawResponse as? JSON, let addresses = json["snodes"] as? [String] else { return [] }
return Set(addresses.map { Target(address: $0, port: defaultSnodePort) })
// The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions.
private static func parseTargets(from rawResponse: Any) -> [Target] {
guard let json = rawResponse as? JSON, let addresses = json["snodes"] as? [String] else {
Logger.warn("[Loki] Failed to parse targets from: \(rawResponse).")
return []
}
return addresses.map { Target(address: $0, port: defaultSnodePort) }
}
private static func parseProtoEnvelopes(from rawResponse: Any) -> Set<SSKProtoEnvelope> {
guard let json = rawResponse as? JSON, let messages = json["messages"] as? [JSON] else { return [] }
return Set(messages.compactMap { message in
guard let base64EncodedData = message["data"] as? String, let data = Data(base64Encoded: base64EncodedData) else {
Logger.warn("[Loki] Failed to decode data for message: \(message).")
private static func updateLastMessageHashValueIfPossible(for target: Target, from rawMessages: [JSON]) {
guard let lastMessage = rawMessages.last, let hashValue = lastMessage["hash"] as? String, let expiresAt = lastMessage["expiration"] as? Int else {
Logger.warn("[Loki] Failed to update last message hash value from: \(rawMessages).")
return
}
setLastMessageHashValue(for: target, hashValue: hashValue, expiresAt: UInt64(expiresAt))
}
private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] {
var receivedMessageHashValues = getReceivedMessageHashValues()
return rawMessages.filter { rawMessage in
guard let hashValue = rawMessage["hash"] as? String else {
Logger.warn("[Loki] Missing hash value for message: \(rawMessage).")
return false
}
let isDuplicate = receivedMessageHashValues.contains(hashValue)
receivedMessageHashValues.insert(hashValue)
setReceivedMessageHashValues(to: receivedMessageHashValues)
return !isDuplicate
}
}
private static func parseProtoEnvelopes(from rawMessages: [JSON]) -> [SSKProtoEnvelope] {
return rawMessages.compactMap { rawMessage in
guard let base64EncodedData = rawMessage["data"] as? String, let data = Data(base64Encoded: base64EncodedData) else {
Logger.warn("[Loki] Failed to decode data for message: \(rawMessage).")
return nil
}
guard let envelope = try? unwrap(data: data) else {
Logger.warn("[Loki] Failed to unwrap data for message: \(message).")
Logger.warn("[Loki] Failed to unwrap data for message: \(rawMessage).")
return nil
}
return envelope
})
}
}
// MARK: Convenience
private static func getLastMessageHashValue(for target: Target) -> String? {
var result: String? = nil
// Uses a read/write connection because getting the last message hash value also removes expired messages as needed
storage.dbReadWriteConnection.readWrite { transaction in
result = storage.getLastMessageHash(forServiceNode: target.address, transaction: transaction)
}
return result
}
private static func setLastMessageHashValue(for target: Target, hashValue: String, expiresAt: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setLastMessageHash(forServiceNode: target.address, hash: hashValue, expiresAt: expiresAt, transaction: transaction)
}
}
private static func getReceivedMessageHashValues() -> Set<String> {
var result: Set<String> = []
storage.dbReadConnection.read { transaction in
result = storage.getReceivedMessageHashes(with: transaction)
}
return result
}
private static func setReceivedMessageHashValues(to receivedMessageHashValues: Set<String>) {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setReceivedMessageHashes(receivedMessageHashValues, with: transaction)
}
}
}
// MARK: Error Handling
private extension Promise {
func recoverNetworkErrorIfNeeded(on queue: DispatchQueue) -> Promise<T> {

View file

@ -72,7 +72,7 @@ NS_ASSUME_NONNULL_BEGIN
*/
- (void)removePreKeyBundleForContact:(NSString *)pubKey transaction:(YapDatabaseReadWriteTransaction *)transaction;
# pragma mark - Last Hash
# pragma mark - Last Hash Handling
/**
Get the last message hash for the given service node.
@ -95,6 +95,9 @@ NS_ASSUME_NONNULL_BEGIN
*/
- (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:));
- (NSSet<NSString *> *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction;
- (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction;
@end
NS_ASSUME_NONNULL_END

View file

@ -10,11 +10,14 @@
#import "YapDatabaseConnection+OWS.h"
#import "YapDatabaseTransaction+OWS.h"
#import <AxolotlKit/NSData+keyVersionByte.h>
#import "NSObject+Casting.h"
#define OWSPrimaryStoragePreKeyStoreCollection @"TSStorageManagerPreKeyStoreCollection"
#define LokiPreKeyContactCollection @"LokiPreKeyContactCollection"
#define LokiPreKeyBundleCollection @"LokiPreKeyBundleCollection"
#define LokiLastHashCollection @"LokiLastHashCollection"
#define LKPreKeyContactCollection @"LKPreKeyContactCollection"
#define LKPreKeyBundleCollection @"LKPreKeyBundleCollection"
#define LKLastMessageHashCollection @"LKLastMessageHashCollection"
#define LKReceivedMessageHashesKey @"LKReceivedMessageHashesKey"
#define LKReceivedMessageHashesCollection @"LKReceivedMessageHashesCollection"
@implementation OWSPrimaryStorage (Loki)
@ -31,13 +34,13 @@
# pragma mark - Prekey for Contact
- (BOOL)hasPreKeyForContact:(NSString *)pubKey {
int preKeyId = [self.dbReadWriteConnection intForKey:pubKey inCollection:LokiPreKeyContactCollection];
int preKeyId = [self.dbReadWriteConnection intForKey:pubKey inCollection:LKPreKeyContactCollection];
return preKeyId > 0;
}
- (PreKeyRecord *_Nullable)getPreKeyForContact:(NSString *)pubKey transaction:(YapDatabaseReadTransaction *)transaction {
OWSAssertDebug(pubKey.length > 0);
int preKeyId = [transaction intForKey:pubKey inCollection:LokiPreKeyContactCollection];
int preKeyId = [transaction intForKey:pubKey inCollection:LKPreKeyContactCollection];
// If we don't have an id then return nil
if (preKeyId <= 0) { return nil; }
@ -48,7 +51,7 @@
- (PreKeyRecord *)getOrCreatePreKeyForContact:(NSString *)pubKey {
OWSAssertDebug(pubKey.length > 0);
int preKeyId = [self.dbReadWriteConnection intForKey:pubKey inCollection:LokiPreKeyContactCollection];
int preKeyId = [self.dbReadWriteConnection intForKey:pubKey inCollection:LKPreKeyContactCollection];
// If we don't have an id then generate and store a new one
if (preKeyId <= 0) {
@ -73,7 +76,7 @@
OWSAssertDebug(records.count > 0);
PreKeyRecord *record = records.firstObject;
[self.dbReadWriteConnection setInt:record.Id forKey:pubKey inCollection:LokiPreKeyContactCollection];
[self.dbReadWriteConnection setInt:record.Id forKey:pubKey inCollection:LKPreKeyContactCollection];
return record;
}
@ -107,23 +110,23 @@
}
- (PreKeyBundle *_Nullable)getPreKeyBundleForContact:(NSString *)pubKey {
return [self.dbReadConnection preKeyBundleForKey:pubKey inCollection:LokiPreKeyBundleCollection];
return [self.dbReadConnection preKeyBundleForKey:pubKey inCollection:LKPreKeyBundleCollection];
}
- (void)setPreKeyBundle:(PreKeyBundle *)bundle forContact:(NSString *)pubKey transaction:(YapDatabaseReadWriteTransaction *)transaction {
[transaction setObject:bundle
forKey:pubKey
inCollection:LokiPreKeyBundleCollection];
inCollection:LKPreKeyBundleCollection];
}
- (void)removePreKeyBundleForContact:(NSString *)pubKey transaction:(YapDatabaseReadWriteTransaction *)transaction {
[transaction removeObjectForKey:pubKey inCollection:LokiPreKeyBundleCollection];
[transaction removeObjectForKey:pubKey inCollection:LKPreKeyBundleCollection];
}
# pragma mark - Last Hash
- (NSString *_Nullable)getLastMessageHashForServiceNode:(NSString *)serviceNode transaction:(YapDatabaseReadWriteTransaction *)transaction {
NSDictionary *_Nullable dict = [transaction objectForKey:serviceNode inCollection:LokiLastHashCollection];
NSDictionary *_Nullable dict = [transaction objectForKey:serviceNode inCollection:LKLastMessageHashCollection];
if (!dict) { return nil; }
NSString *_Nullable hash = dict[@"hash"];
@ -142,15 +145,20 @@
}
- (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction {
NSDictionary *dict = @{
@"hash": hash,
@"expiresAt": @(expiresAt)
};
[transaction setObject:dict forKey:serviceNode inCollection:LokiLastHashCollection];
NSDictionary *dict = @{ @"hash" : hash, @"expiresAt": @(expiresAt) };
[transaction setObject:dict forKey:serviceNode inCollection:LKLastMessageHashCollection];
}
- (void)removeLastMessageHashForServiceNode:(NSString *)serviceNode transaction:(YapDatabaseReadWriteTransaction *)transaction {
[transaction removeObjectForKey:serviceNode inCollection:LokiLastHashCollection];
[transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection];
}
- (NSSet<NSString *> *)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction {
return (NSSet *)[[transaction objectForKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection] as:NSSet.class];
}
- (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction {
[transaction setObject:receivedMessageHashes forKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection];
}
@end