Cleaned up received message handling and a few bugs with duplicate message handling
Updated the YDB to GRDB migrations to include some progress when importing swarms & interactions (ie. the slow parts we can't properly show progress for) Changed the MessageReceiveJob into a MessageHandlingJob (when receiving a message we now parse and store everything immediately to avoid a number of weird edge-cases) Fixed a bug where the Poller would drop a Snode when returning from the background because it's last request would generally time out Fixed a few bugs with invalid attachments Added the ability to retry downloading a failed attachment Added back the search results limit
This commit is contained in:
parent
93b54a3b7d
commit
af073657a2
|
@ -713,15 +713,34 @@ extension ConversationVC:
|
|||
guard let mediaView = albumView.mediaView(forLocation: locationInAlbumView) else { return }
|
||||
|
||||
switch mediaView.attachment.state {
|
||||
case .pendingDownload, .downloading, .uploading:
|
||||
// TODO: Tapped a failed incoming attachment
|
||||
break
|
||||
case .pendingDownload, .downloading, .uploading: break
|
||||
|
||||
// Failed uploads should be handled via the "resend" process instead
|
||||
case .failedUpload: break
|
||||
|
||||
case .failedDownload, .failedUpload:
|
||||
// TODO: Tapped a failed incoming attachment
|
||||
case .failedDownload:
|
||||
let threadId: String = self.viewModel.threadData.threadId
|
||||
|
||||
// Retry downloading the failed attachment
|
||||
GRDBStorage.shared.writeAsync { db in
|
||||
JobRunner.add(
|
||||
db,
|
||||
job: Job(
|
||||
variant: .attachmentDownload,
|
||||
threadId: threadId,
|
||||
interactionId: cellViewModel.id,
|
||||
details: AttachmentDownloadJob.Details(
|
||||
attachmentId: mediaView.attachment.id
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
break
|
||||
|
||||
default:
|
||||
// Ignore invalid media
|
||||
guard mediaView.attachment.isValid else { return }
|
||||
|
||||
let viewController: UIViewController? = MediaGalleryViewModel.createDetailViewController(
|
||||
for: self.viewModel.threadData.threadId,
|
||||
threadVariant: self.viewModel.threadData.threadVariant,
|
||||
|
|
|
@ -86,6 +86,10 @@ public class MediaView: UIView {
|
|||
configure(forError: .failed)
|
||||
return
|
||||
}
|
||||
guard attachment.isValid else {
|
||||
configure(forError: .invalid)
|
||||
return
|
||||
}
|
||||
|
||||
if attachment.isAnimated {
|
||||
configureForAnimatedImage(attachment: attachment)
|
||||
|
@ -144,6 +148,7 @@ public class MediaView: UIView {
|
|||
animatedImageView.layer.minificationFilter = .trilinear
|
||||
animatedImageView.layer.magnificationFilter = .trilinear
|
||||
animatedImageView.backgroundColor = Colors.unimportant
|
||||
animatedImageView.isHidden = !attachment.isValid
|
||||
addSubview(animatedImageView)
|
||||
animatedImageView.autoPinEdgesToSuperviewEdges()
|
||||
_ = addUploadProgressIfNecessary(animatedImageView)
|
||||
|
@ -159,10 +164,7 @@ public class MediaView: UIView {
|
|||
}
|
||||
strongSelf.tryToLoadMedia(
|
||||
loadMediaBlock: { applyMediaBlock in
|
||||
guard attachment.isValid else {
|
||||
Logger.warn("Ignoring invalid attachment.")
|
||||
return
|
||||
}
|
||||
guard attachment.isValid else { return }
|
||||
guard let filePath: String = attachment.originalFilePath else {
|
||||
owsFailDebug("Attachment stream missing original file path.")
|
||||
return
|
||||
|
@ -200,6 +202,7 @@ public class MediaView: UIView {
|
|||
stillImageView.layer.minificationFilter = .trilinear
|
||||
stillImageView.layer.magnificationFilter = .trilinear
|
||||
stillImageView.backgroundColor = Colors.unimportant
|
||||
stillImageView.isHidden = !attachment.isValid
|
||||
addSubview(stillImageView)
|
||||
stillImageView.autoPinEdgesToSuperviewEdges()
|
||||
_ = addUploadProgressIfNecessary(stillImageView)
|
||||
|
@ -213,10 +216,7 @@ public class MediaView: UIView {
|
|||
}
|
||||
self?.tryToLoadMedia(
|
||||
loadMediaBlock: { applyMediaBlock in
|
||||
guard attachment.isValid else {
|
||||
Logger.warn("Ignoring invalid attachment.")
|
||||
return
|
||||
}
|
||||
guard attachment.isValid else { return }
|
||||
|
||||
attachment.thumbnail(
|
||||
size: .large,
|
||||
|
@ -254,6 +254,7 @@ public class MediaView: UIView {
|
|||
stillImageView.layer.minificationFilter = .trilinear
|
||||
stillImageView.layer.magnificationFilter = .trilinear
|
||||
stillImageView.backgroundColor = Colors.unimportant
|
||||
stillImageView.isHidden = !attachment.isValid
|
||||
|
||||
addSubview(stillImageView)
|
||||
stillImageView.autoPinEdgesToSuperviewEdges()
|
||||
|
@ -276,10 +277,7 @@ public class MediaView: UIView {
|
|||
}
|
||||
self?.tryToLoadMedia(
|
||||
loadMediaBlock: { applyMediaBlock in
|
||||
guard attachment.isValid else {
|
||||
Logger.warn("Ignoring invalid attachment.")
|
||||
return
|
||||
}
|
||||
guard attachment.isValid else { return }
|
||||
|
||||
attachment.thumbnail(
|
||||
size: .medium,
|
||||
|
|
|
@ -142,7 +142,6 @@ class GlobalSearchViewController: BaseVC, UITableViewDelegate, UITableViewDataSo
|
|||
let result: Result<[SectionModel], Error>? = GRDBStorage.shared.read { db -> Result<[SectionModel], Error> in
|
||||
do {
|
||||
let userPublicKey: String = getUserHexEncodedPublicKey(db)
|
||||
|
||||
let contactsAndGroupsResults: [SessionThreadViewModel] = try SessionThreadViewModel
|
||||
.contactsAndGroupsQuery(
|
||||
userPublicKey: userPublicKey,
|
||||
|
@ -150,7 +149,6 @@ class GlobalSearchViewController: BaseVC, UITableViewDelegate, UITableViewDataSo
|
|||
searchTerm: searchText
|
||||
)
|
||||
.fetchAll(db)
|
||||
|
||||
let messageResults: [SessionThreadViewModel] = try SessionThreadViewModel
|
||||
.messagesQuery(
|
||||
userPublicKey: userPublicKey,
|
||||
|
|
|
@ -322,13 +322,17 @@ public class MediaGalleryViewModel {
|
|||
.trackingConstantRegion { db -> [Item] in
|
||||
guard let interactionId: Int64 = interactionId else { return [] }
|
||||
|
||||
let attachment: TypedTableAlias<Attachment> = TypedTableAlias()
|
||||
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
|
||||
let interactionAttachment: TypedTableAlias<InteractionAttachment> = TypedTableAlias()
|
||||
|
||||
return try Item
|
||||
.baseQuery(
|
||||
orderSQL: SQL(interactionAttachment[.albumIndex]),
|
||||
baseFilterSQL: SQL("\(interaction[.id]) = \(interactionId)")
|
||||
baseFilterSQL: SQL("""
|
||||
\(attachment[.isValid]) = true AND
|
||||
\(interaction[.id]) = \(interactionId)
|
||||
""")
|
||||
)
|
||||
.fetchAll(db)
|
||||
}
|
||||
|
@ -342,13 +346,17 @@ public class MediaGalleryViewModel {
|
|||
// but to avoid displaying stale data we re-fetch from the database anyway
|
||||
let maybeAlbumInfo: AlbumInfo? = GRDBStorage.shared
|
||||
.read { db -> AlbumInfo in
|
||||
let attachment: TypedTableAlias<Attachment> = TypedTableAlias()
|
||||
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
|
||||
let interactionAttachment: TypedTableAlias<InteractionAttachment> = TypedTableAlias()
|
||||
|
||||
let newAlbumData: [Item] = try Item
|
||||
.baseQuery(
|
||||
orderSQL: SQL(interactionAttachment[.albumIndex]),
|
||||
baseFilterSQL: SQL("\(interaction[.id]) = \(interactionId)")
|
||||
baseFilterSQL: SQL("""
|
||||
\(attachment[.isValid]) = true AND
|
||||
\(interaction[.id]) = \(interactionId)
|
||||
""")
|
||||
)
|
||||
.fetchAll(db)
|
||||
|
||||
|
|
|
@ -125,15 +125,13 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
|
|||
AssertIsOnMainThread()
|
||||
|
||||
switch notification.object {
|
||||
case let incomingMessage as TSIncomingMessage:
|
||||
Logger.debug("canceled notification for message: \(incomingMessage)")
|
||||
if let identifier = incomingMessage.notificationIdentifier {
|
||||
cancelNotification(identifier)
|
||||
} else {
|
||||
cancelNotifications(threadId: incomingMessage.uniqueThreadId)
|
||||
}
|
||||
default:
|
||||
break
|
||||
case let interaction as Interaction:
|
||||
guard interaction.variant == .standardIncoming else { return }
|
||||
|
||||
Logger.debug("canceled notification for message: \(interaction)")
|
||||
cancelNotifications(identifiers: interaction.notificationIdentifiers)
|
||||
|
||||
default: break
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -73,27 +73,27 @@ public final class BackgroundPoller : NSObject {
|
|||
var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:]
|
||||
|
||||
messages.forEach { message in
|
||||
guard let envelope = SNProtoEnvelope.from(message) else { return }
|
||||
|
||||
// Extract the threadId and add that to the messageReceive job for
|
||||
// multi-threading and garbage collection purposes
|
||||
let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope)
|
||||
|
||||
do {
|
||||
threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? [])
|
||||
.appending(
|
||||
MessageReceiveJob.Details.MessageInfo(
|
||||
data: try envelope.serializedData(),
|
||||
serverHash: message.info.hash,
|
||||
serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000)
|
||||
)
|
||||
)
|
||||
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message)
|
||||
let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId)
|
||||
|
||||
// Persist the received message after the MessageReceiveJob is created
|
||||
_ = try message.info.saved(db)
|
||||
threadMessages[key] = (threadMessages[key] ?? [])
|
||||
.appending(processedMessage?.messageInfo)
|
||||
}
|
||||
catch {
|
||||
SNLog("Failed to deserialize envelope due to error: \(error).")
|
||||
switch error {
|
||||
// Ignore duplicate & selfSend message errors (and don't bother logging
|
||||
// them as there will be a lot since we each service node duplicates messages)
|
||||
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
|
||||
MessageReceiverError.duplicateMessage,
|
||||
MessageReceiverError.duplicateControlMessage,
|
||||
MessageReceiverError.selfSend:
|
||||
break
|
||||
|
||||
default: SNLog("Failed to deserialize envelope due to error: \(error).")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ public final class BackgroundPoller : NSObject {
|
|||
threadId: threadId,
|
||||
details: MessageReceiveJob.Details(
|
||||
messages: threadMessages,
|
||||
isBackgroundPoll: false
|
||||
isBackgroundPoll: true
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -188,6 +188,20 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
|
||||
SNLog("[Migration Info] \(target.key(with: self)) - Processing Interactions")
|
||||
|
||||
/// **Note:** There is no index on the collection column so unfortunately it takes the same amount of time to enumerate through all
|
||||
/// collections as it does to just get the count of collections, due to this, if the database is very large, importing thecollections can be
|
||||
/// very slow (~15s with 2,000,000 rows) - we want to show some kind of progress while enumerating so the below code creates a
|
||||
/// very rought guess of the number of collections based on the file size of the database (this shouldn't affect most users at all)
|
||||
let roughKbPerRow: CGFloat = 2.25
|
||||
let oldDatabaseSizeBytes: CGFloat = (try? FileManager.default
|
||||
.attributesOfItem(atPath: SUKLegacy.legacyDatabaseFilepath)[.size]
|
||||
.asType(CGFloat.self))
|
||||
.defaulting(to: 0)
|
||||
let roughNumRows: CGFloat = ((oldDatabaseSizeBytes / 1024) / roughKbPerRow)
|
||||
let startProgress: CGFloat = 0.04
|
||||
let interactionsCompleteProgress: CGFloat = 0.19
|
||||
var rowIndex: CGFloat = 0
|
||||
|
||||
transaction.enumerateKeysAndObjects(inCollection: SMKLegacy.interactionCollection) { _, object, _ in
|
||||
guard let interaction: SMKLegacy._DBInteraction = object as? SMKLegacy._DBInteraction else {
|
||||
SNLog("[Migration Error] Unable to process interaction")
|
||||
|
@ -197,8 +211,19 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
|
||||
interactions[interaction.uniqueThreadId] = (interactions[interaction.uniqueThreadId] ?? [])
|
||||
.appending(interaction)
|
||||
|
||||
rowIndex += 1
|
||||
|
||||
GRDBStorage.shared.update(
|
||||
progress: min(
|
||||
interactionsCompleteProgress,
|
||||
((rowIndex / roughNumRows) * (interactionsCompleteProgress - startProgress))
|
||||
),
|
||||
for: self,
|
||||
in: target
|
||||
)
|
||||
}
|
||||
GRDBStorage.shared.update(progress: 0.19, for: self, in: target)
|
||||
GRDBStorage.shared.update(progress: interactionsCompleteProgress, for: self, in: target)
|
||||
|
||||
// MARK: --Attachments
|
||||
|
||||
|
@ -1066,39 +1091,21 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
return
|
||||
}
|
||||
|
||||
// We need to extract the `threadId` from the legacyJob data as the new
|
||||
// MessageReceiveJob requires it for multi-threading and garbage collection purposes
|
||||
guard let envelope: SNProtoEnvelope = try? SNProtoEnvelope.parseData(legacyJob.data) else {
|
||||
// We have changed how messageReceive jobs work - we now parse the message upon receipt and
|
||||
// the MessageReceiveJob only does the handling - as a result we need to do the same behaviour
|
||||
// here so we don't need to support the legacy behaviour
|
||||
guard let processedMessage: ProcessedMessage = try? Message.processRawReceivedMessage(db, serializedData: legacyJob.data, serverHash: legacyJob.serverHash) else {
|
||||
return
|
||||
}
|
||||
|
||||
let threadId: String?
|
||||
|
||||
switch envelope.type {
|
||||
// For closed group messages the 'groupPublicKey' is stored in the
|
||||
// 'envelope.source' value and that should be used for the 'threadId'
|
||||
case .closedGroupMessage:
|
||||
threadId = envelope.source
|
||||
break
|
||||
|
||||
default:
|
||||
threadId = MessageReceiver.extractSenderPublicKey(db, from: envelope)
|
||||
}
|
||||
|
||||
_ = try Job(
|
||||
failureCount: legacyJob.failureCount,
|
||||
variant: .messageReceive,
|
||||
behaviour: .runOnce,
|
||||
nextRunTimestamp: 0,
|
||||
threadId: threadId,
|
||||
threadId: processedMessage.threadId,
|
||||
details: MessageReceiveJob.Details(
|
||||
messages: [
|
||||
MessageReceiveJob.Details.MessageInfo(
|
||||
data: legacyJob.data,
|
||||
serverHash: legacyJob.serverHash,
|
||||
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds)
|
||||
)
|
||||
],
|
||||
messages: [processedMessage.messageInfo],
|
||||
isBackgroundPoll: legacyJob.isBackgroundPoll
|
||||
)
|
||||
)?.inserted(db)
|
||||
|
@ -1238,8 +1245,8 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
try autoreleasepool {
|
||||
try attachmentDownloadJobs.forEach { legacyJob in
|
||||
guard let interactionId: Int64 = legacyInteractionToIdMap[legacyJob.tsMessageID] else {
|
||||
SNLog("[Migration Error] attachmentDownload job unable to find interaction")
|
||||
throw StorageError.migrationFailed
|
||||
SNLog("[Migration Warning] attachmentDownload job with no interaction found - ignoring")
|
||||
return
|
||||
}
|
||||
guard processedAttachmentIds.contains(legacyJob.attachmentID) else {
|
||||
SNLog("[Migration Error] attachmentDownload job unable to find attachment")
|
||||
|
@ -1422,7 +1429,7 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
return (attachmentVailidityInfo.isValid, attachmentVailidityInfo.duration)
|
||||
}
|
||||
|
||||
if stream.isVideo {
|
||||
if stream.isVisualMedia {
|
||||
let attachmentVailidityInfo = Attachment.determineValidityAndDuration(
|
||||
contentType: stream.contentType,
|
||||
localRelativeFilePath: processedLocalRelativeFilePath,
|
||||
|
@ -1432,10 +1439,6 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
return (attachmentVailidityInfo.isValid, attachmentVailidityInfo.duration)
|
||||
}
|
||||
|
||||
if stream.isVisualMedia {
|
||||
return (stream.isValidVisualMedia, nil)
|
||||
}
|
||||
|
||||
return (true, nil)
|
||||
}()
|
||||
|
||||
|
@ -1460,7 +1463,13 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
duration: duration,
|
||||
isValid: isValid,
|
||||
encryptionKey: legacyAttachment.encryptionKey,
|
||||
digest: (legacyAttachment as? SMKLegacy._AttachmentStream)?.digest,
|
||||
digest: {
|
||||
switch legacyAttachment {
|
||||
case let stream as SMKLegacy._AttachmentStream: return stream.digest
|
||||
case let pointer as SMKLegacy._AttachmentPointer: return pointer.digest
|
||||
default: return nil
|
||||
}
|
||||
}(),
|
||||
caption: legacyAttachment.caption
|
||||
).inserted(db)
|
||||
|
||||
|
|
|
@ -662,7 +662,7 @@ extension Attachment {
|
|||
}
|
||||
|
||||
// Process image attachments
|
||||
if MIMETypeUtil.isImage(contentType) {
|
||||
if MIMETypeUtil.isImage(contentType) || MIMETypeUtil.isAnimated(contentType) {
|
||||
return (
|
||||
NSData.ows_isValidImage(atPath: targetPath, mimeType: contentType),
|
||||
nil
|
||||
|
|
|
@ -64,19 +64,12 @@ public struct ControlMessageProcessRecord: Codable, FetchableRecord, Persistable
|
|||
public init?(
|
||||
threadId: String,
|
||||
message: Message,
|
||||
serverExpirationTimestamp: TimeInterval?,
|
||||
isRetry: Bool = false
|
||||
serverExpirationTimestamp: TimeInterval?
|
||||
) {
|
||||
// All `VisibleMessage` values will have an associated `Interaction` so just let
|
||||
// the unique constraints on that table prevent duplicate messages
|
||||
if message is VisibleMessage { return nil }
|
||||
|
||||
// TODO: Need to allow duplicates for call messages
|
||||
|
||||
// If the message failed to process and we are retrying then there will already
|
||||
// be a `ControlMessageProcessRecord`, so return nil to prevent the insertion
|
||||
// causing a unique constraint violation
|
||||
if isRetry { return nil }
|
||||
|
||||
// Allow '.new' and 'encryptionKeyPair' closed group control message duplicates to avoid
|
||||
// the following situation:
|
||||
|
|
|
@ -25,7 +25,7 @@ public enum FailedMessageSendsJob: JobExecutor {
|
|||
.filter(Attachment.Columns.state == Attachment.State.uploading)
|
||||
.updateAll(db, Attachment.Columns.state.set(to: Attachment.State.failedUpload))
|
||||
|
||||
Logger.debug("Marked \(changeCount) message\(changeCount == 1 ? "" : "s") as failed (\(attachmentChangeCount) upload\(attachmentChangeCount == 1 ? "" : "s") cancelled)")
|
||||
SNLog("Marked \(changeCount) message\(changeCount == 1 ? "" : "s") as failed (\(attachmentChangeCount) upload\(attachmentChangeCount == 1 ? "" : "s") cancelled)")
|
||||
}
|
||||
|
||||
success(job, false)
|
||||
|
|
|
@ -32,46 +32,27 @@ public enum MessageReceiveJob: JobExecutor {
|
|||
|
||||
for messageInfo in details.messages {
|
||||
do {
|
||||
// Note: It generally shouldn't be possible for 'MessageReceiver.parse' to fail
|
||||
// the main situation where this can happen is when the jobs run out of order (eg.
|
||||
// a closed group message encrypted with a new key gets processed before the key
|
||||
// gets added - this shouldn't be as possible with the updated JobRunner)
|
||||
let isRetry: Bool = (job.failureCount > 0)
|
||||
let (message, proto) = try MessageReceiver.parse(
|
||||
db,
|
||||
data: messageInfo.data,
|
||||
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
|
||||
isRetry: isRetry
|
||||
)
|
||||
message.serverHash = messageInfo.serverHash
|
||||
|
||||
try MessageReceiver.handle(
|
||||
db,
|
||||
message: message,
|
||||
associatedWithProto: proto,
|
||||
message: messageInfo.message,
|
||||
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
|
||||
openGroupId: nil,
|
||||
isBackgroundPoll: details.isBackgroundPoll
|
||||
)
|
||||
}
|
||||
catch {
|
||||
switch error {
|
||||
// Note: This is the same as the 'MessageReceiverError.duplicateMessage'
|
||||
// which is not retryable so just skip to the next message to process (no
|
||||
// longer logging this because all de-duping happens here now rather than
|
||||
// when parsing as it did previously - this change results in excessive
|
||||
// logging which isn't useful)
|
||||
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE: continue
|
||||
|
||||
default: break
|
||||
}
|
||||
|
||||
// If the current message is a permanent failure then override it with the
|
||||
// new error (we want to retry if there is a single non-permanent error)
|
||||
switch error {
|
||||
// Ignore self-send errors (they will be permanently failed but no need
|
||||
// to log since we are going to have a lot of the due to the change to the
|
||||
// de-duping logic)
|
||||
case MessageReceiverError.selfSend: continue
|
||||
// Ignore duplicate and self-send errors (these will usually be caught during
|
||||
// parsing but sometimes can get past and conflict at database insertion - eg.
|
||||
// for open group messages) we also don't bother logging as it results in
|
||||
// excessive logging which isn't useful)
|
||||
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
|
||||
MessageReceiverError.duplicateMessage,
|
||||
MessageReceiverError.duplicateControlMessage,
|
||||
MessageReceiverError.selfSend:
|
||||
break
|
||||
|
||||
case let receiverError as MessageReceiverError where !receiverError.isRetryable:
|
||||
SNLog("MessageReceiveJob permanently failed message due to error: \(error)")
|
||||
|
@ -107,7 +88,7 @@ public enum MessageReceiveJob: JobExecutor {
|
|||
failure(updatedJob, error, true)
|
||||
|
||||
case .some(let error):
|
||||
failure(updatedJob, error, false)
|
||||
failure(updatedJob, error, false) // TODO: Confirm the 'noKeyPair' errors here aren't an issue
|
||||
|
||||
case .none:
|
||||
success(updatedJob, false)
|
||||
|
@ -120,18 +101,64 @@ public enum MessageReceiveJob: JobExecutor {
|
|||
extension MessageReceiveJob {
|
||||
public struct Details: Codable {
|
||||
public struct MessageInfo: Codable {
|
||||
public let data: Data
|
||||
public let serverHash: String?
|
||||
public let serverExpirationTimestamp: TimeInterval?
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case message
|
||||
case variant
|
||||
case serializedProtoData
|
||||
}
|
||||
|
||||
public let message: Message
|
||||
public let variant: Message.Variant
|
||||
public let serializedProtoData: Data
|
||||
|
||||
public init(
|
||||
data: Data,
|
||||
serverHash: String?,
|
||||
serverExpirationTimestamp: TimeInterval?
|
||||
message: Message,
|
||||
variant: Message.Variant,
|
||||
proto: SNProtoContent
|
||||
) throws {
|
||||
self.message = message
|
||||
self.variant = variant
|
||||
self.serializedProtoData = try proto.serializedData()
|
||||
}
|
||||
|
||||
private init(
|
||||
message: Message,
|
||||
variant: Message.Variant,
|
||||
serializedProtoData: Data
|
||||
) {
|
||||
self.data = data
|
||||
self.serverHash = serverHash
|
||||
self.serverExpirationTimestamp = serverExpirationTimestamp
|
||||
self.message = message
|
||||
self.variant = variant
|
||||
self.serializedProtoData = serializedProtoData
|
||||
}
|
||||
|
||||
// MARK: - Codable
|
||||
|
||||
public init(from decoder: Decoder) throws {
|
||||
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
|
||||
|
||||
guard let variant: Message.Variant = try? container.decode(Message.Variant.self, forKey: .variant) else {
|
||||
SNLog("Unable to decode messageReceive job due to missing variant")
|
||||
throw StorageError.decodingFailed
|
||||
}
|
||||
|
||||
self = MessageInfo(
|
||||
message: try variant.decode(from: container, forKey: .message),
|
||||
variant: variant,
|
||||
serializedProtoData: try container.decode(Data.self, forKey: .serializedProtoData)
|
||||
)
|
||||
}
|
||||
|
||||
public func encode(to encoder: Encoder) throws {
|
||||
var container: KeyedEncodingContainer<CodingKeys> = encoder.container(keyedBy: CodingKeys.self)
|
||||
|
||||
guard let variant: Message.Variant = Message.Variant(from: message) else {
|
||||
SNLog("Unable to encode messageReceive job due to unsupported variant")
|
||||
throw StorageError.objectNotFound
|
||||
}
|
||||
|
||||
try container.encode(message, forKey: .message)
|
||||
try container.encode(variant, forKey: .variant)
|
||||
try container.encode(serializedProtoData, forKey: .serializedProtoData)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -170,31 +170,15 @@ public enum MessageSendJob: JobExecutor {
|
|||
|
||||
extension MessageSendJob {
|
||||
public struct Details: Codable {
|
||||
// Note: This approach is less than ideal (since it needs to be manually maintained) but
|
||||
// I couldn't think of an easy way to support a generic decoded type for the 'message'
|
||||
// value in the database while using Codable
|
||||
private static let supportedMessageTypes: [String: Message.Type] = [
|
||||
"VisibleMessage": VisibleMessage.self,
|
||||
|
||||
"ReadReceipt": ReadReceipt.self,
|
||||
"TypingIndicator": TypingIndicator.self,
|
||||
"ClosedGroupControlMessage": ClosedGroupControlMessage.self,
|
||||
"DataExtractionNotification": DataExtractionNotification.self,
|
||||
"ExpirationTimerUpdate": ExpirationTimerUpdate.self,
|
||||
"ConfigurationMessage": ConfigurationMessage.self,
|
||||
"UnsendRequest": UnsendRequest.self,
|
||||
"MessageRequestResponse": MessageRequestResponse.self
|
||||
]
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case interactionId
|
||||
case destination
|
||||
case messageType
|
||||
case message
|
||||
case variant
|
||||
}
|
||||
|
||||
public let destination: Message.Destination
|
||||
public let message: Message
|
||||
public let variant: Message.Variant?
|
||||
|
||||
// MARK: - Initialization
|
||||
|
||||
|
@ -204,49 +188,36 @@ extension MessageSendJob {
|
|||
) {
|
||||
self.destination = destination
|
||||
self.message = message
|
||||
self.variant = Message.Variant(from: message)
|
||||
}
|
||||
|
||||
// MARK: - Codable
|
||||
|
||||
public init(from decoder: Decoder) throws {
|
||||
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
|
||||
|
||||
guard let messageType: String = try? container.decode(String.self, forKey: .messageType) else {
|
||||
Logger.error("Unable to decode messageSend job due to missing messageType")
|
||||
guard let variant: Message.Variant = try? container.decode(Message.Variant.self, forKey: .variant) else {
|
||||
SNLog("Unable to decode messageSend job due to missing variant")
|
||||
throw StorageError.decodingFailed
|
||||
}
|
||||
|
||||
/// Note: This **MUST** be a `Codable.Type` rather than a `Message.Type` otherwise the decoding will result
|
||||
/// in a `Message` object being returned rather than the desired subclass
|
||||
guard let MessageType: Codable.Type = MessageSendJob.Details.supportedMessageTypes[messageType] else {
|
||||
Logger.error("Unable to decode messageSend job due to unsupported messageType")
|
||||
throw StorageError.decodingFailed
|
||||
}
|
||||
guard let message: Message = try MessageType.decoded(with: container, forKey: .message) as? Message else {
|
||||
Logger.error("Unable to decode messageSend job due to message conversion issue")
|
||||
throw StorageError.decodingFailed
|
||||
}
|
||||
|
||||
self = Details(
|
||||
destination: try container.decode(Message.Destination.self, forKey: .destination),
|
||||
message: message
|
||||
message: try variant.decode(from: container, forKey: .message)
|
||||
)
|
||||
}
|
||||
|
||||
public func encode(to encoder: Encoder) throws {
|
||||
var container: KeyedEncodingContainer<CodingKeys> = encoder.container(keyedBy: CodingKeys.self)
|
||||
|
||||
let messageType: Codable.Type = type(of: message)
|
||||
let maybeMessageTypeString: String? = MessageSendJob.Details.supportedMessageTypes
|
||||
.first(where: { _, type in messageType == type })?
|
||||
.key
|
||||
|
||||
guard let messageTypeString: String = maybeMessageTypeString else {
|
||||
Logger.error("Unable to encode messageSend job due to unsupported messageType")
|
||||
guard let variant: Message.Variant = Message.Variant(from: message) else {
|
||||
SNLog("Unable to encode messageSend job due to unsupported variant")
|
||||
throw StorageError.objectNotFound
|
||||
}
|
||||
|
||||
try container.encode(destination, forKey: .destination)
|
||||
try container.encode(messageTypeString, forKey: .messageType)
|
||||
try container.encode(message, forKey: .message)
|
||||
try container.encode(variant, forKey: .variant)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import Foundation
|
||||
import GRDB
|
||||
import SessionSnodeKit
|
||||
|
||||
/// Abstract base class for `VisibleMessage` and `ControlMessage`.
|
||||
public class Message: Codable {
|
||||
|
@ -76,6 +77,258 @@ public class Message: Codable {
|
|||
}
|
||||
}
|
||||
|
||||
// MARK: - Message Parsing/Processing
|
||||
|
||||
public typealias ProcessedMessage = (
|
||||
threadId: String?,
|
||||
proto: SNProtoContent,
|
||||
messageInfo: MessageReceiveJob.Details.MessageInfo
|
||||
)
|
||||
|
||||
public extension Message {
|
||||
static let nonThreadMessageId: String = "NON_THREAD_MESSAGE"
|
||||
|
||||
enum Variant: String, Codable {
|
||||
case readReceipt
|
||||
case typingIndicator
|
||||
case closedGroupControlMessage
|
||||
case dataExtractionNotification
|
||||
case expirationTimerUpdate
|
||||
case configurationMessage
|
||||
case unsendRequest
|
||||
case messageRequestResponse
|
||||
case visibleMessage
|
||||
|
||||
init?(from type: Message) {
|
||||
switch type {
|
||||
case is ReadReceipt: self = .readReceipt
|
||||
case is TypingIndicator: self = .typingIndicator
|
||||
case is ClosedGroupControlMessage: self = .closedGroupControlMessage
|
||||
case is DataExtractionNotification: self = .dataExtractionNotification
|
||||
case is ExpirationTimerUpdate: self = .expirationTimerUpdate
|
||||
case is ConfigurationMessage: self = .configurationMessage
|
||||
case is UnsendRequest: self = .unsendRequest
|
||||
case is MessageRequestResponse: self = .messageRequestResponse
|
||||
case is VisibleMessage: self = .visibleMessage
|
||||
default: return nil
|
||||
}
|
||||
}
|
||||
|
||||
var messageType: Message.Type {
|
||||
switch self {
|
||||
case .readReceipt: return ReadReceipt.self
|
||||
case .typingIndicator: return TypingIndicator.self
|
||||
case .closedGroupControlMessage: return ClosedGroupControlMessage.self
|
||||
case .dataExtractionNotification: return DataExtractionNotification.self
|
||||
case .expirationTimerUpdate: return ExpirationTimerUpdate.self
|
||||
case .configurationMessage: return ConfigurationMessage.self
|
||||
case .unsendRequest: return UnsendRequest.self
|
||||
case .messageRequestResponse: return MessageRequestResponse.self
|
||||
case .visibleMessage: return VisibleMessage.self
|
||||
}
|
||||
}
|
||||
|
||||
func decode<CodingKeys: CodingKey>(from container: KeyedDecodingContainer<CodingKeys>, forKey key: CodingKeys) throws -> Message {
|
||||
switch self {
|
||||
case .readReceipt: return try container.decode(ReadReceipt.self, forKey: key)
|
||||
case .typingIndicator: return try container.decode(TypingIndicator.self, forKey: key)
|
||||
|
||||
case .closedGroupControlMessage:
|
||||
return try container.decode(ClosedGroupControlMessage.self, forKey: key)
|
||||
|
||||
case .dataExtractionNotification:
|
||||
return try container.decode(DataExtractionNotification.self, forKey: key)
|
||||
|
||||
case .expirationTimerUpdate: return try container.decode(ExpirationTimerUpdate.self, forKey: key)
|
||||
case .configurationMessage: return try container.decode(ConfigurationMessage.self, forKey: key)
|
||||
case .unsendRequest: return try container.decode(UnsendRequest.self, forKey: key)
|
||||
case .messageRequestResponse: return try container.decode(MessageRequestResponse.self, forKey: key)
|
||||
case .visibleMessage: return try container.decode(VisibleMessage.self, forKey: key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static func createMessageFrom(_ proto: SNProtoContent, sender: String) -> Message? {
|
||||
// Note: This array is ordered intentionally to ensure the correct types are processed
|
||||
// and aren't parsed as the wrong type
|
||||
let prioritisedVariants: [Variant] = [
|
||||
.readReceipt,
|
||||
.typingIndicator,
|
||||
.closedGroupControlMessage,
|
||||
.dataExtractionNotification,
|
||||
.expirationTimerUpdate,
|
||||
.configurationMessage,
|
||||
.unsendRequest,
|
||||
.messageRequestResponse,
|
||||
.visibleMessage
|
||||
]
|
||||
|
||||
return prioritisedVariants
|
||||
.reduce(nil) { prev, variant in
|
||||
guard prev == nil else { return prev }
|
||||
|
||||
return variant.messageType.fromProto(proto, sender: sender)
|
||||
}
|
||||
}
|
||||
|
||||
static func processRawReceivedMessage(
|
||||
_ db: Database,
|
||||
rawMessage: SnodeReceivedMessage
|
||||
) throws -> ProcessedMessage? {
|
||||
guard let envelope = SNProtoEnvelope.from(rawMessage) else {
|
||||
throw MessageReceiverError.invalidMessage
|
||||
}
|
||||
|
||||
do {
|
||||
let processedMessage: ProcessedMessage? = try processRawReceivedMessage(
|
||||
db,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: (TimeInterval(rawMessage.info.expirationDateMs) / 1000),
|
||||
serverHash: rawMessage.info.hash,
|
||||
handleClosedGroupKeyUpdateMessages: true
|
||||
)
|
||||
|
||||
// Retrieve the number of entries we have for the hash of this message
|
||||
let numExistingHashes: Int = (try? SnodeReceivedMessageInfo
|
||||
.filter(SnodeReceivedMessageInfo.Columns.hash == rawMessage.info.hash)
|
||||
.fetchCount(db))
|
||||
.defaulting(to: 0)
|
||||
|
||||
// Try to insert the raw message info into the database (used for both request paging and
|
||||
// de-duping purposes)
|
||||
_ = try rawMessage.info.inserted(db)
|
||||
|
||||
// If the above insertion worked then we hadn't processed this message for this specific
|
||||
// service node, but may have done so for another node - if the hash already existed in
|
||||
// the database before we inserted it for this node then we can ignore this message as a
|
||||
// duplicate
|
||||
guard numExistingHashes == 0 else { throw MessageReceiverError.duplicateMessage }
|
||||
|
||||
return processedMessage
|
||||
}
|
||||
catch {
|
||||
// If we get 'selfSend' or 'duplicateControlMessage' errors then we still want to insert
|
||||
// the SnodeReceivedMessageInfo to prevent retrieving and attempting to process the same
|
||||
// message again (as well as ensure the next poll doesn't retrieve the same message)
|
||||
switch error {
|
||||
case MessageReceiverError.selfSend, MessageReceiverError.duplicateControlMessage:
|
||||
_ = try? rawMessage.info.inserted(db)
|
||||
break
|
||||
|
||||
default: break
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
static func processRawReceivedMessage(
|
||||
_ db: Database,
|
||||
serializedData: Data,
|
||||
serverHash: String?
|
||||
) throws -> ProcessedMessage? {
|
||||
guard let envelope = try? SNProtoEnvelope.parseData(serializedData) else {
|
||||
throw MessageReceiverError.invalidMessage
|
||||
}
|
||||
|
||||
return try processRawReceivedMessage(
|
||||
db,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds),
|
||||
serverHash: serverHash,
|
||||
handleClosedGroupKeyUpdateMessages: true
|
||||
)
|
||||
}
|
||||
|
||||
/// This method behaves slightly differently from the other `processRawReceivedMessage` methods as it doesn't
|
||||
/// insert the "message info" for deduping (we want the poller to re-process the message) and also avoids handling any
|
||||
/// closed group key update messages (the `NotificationServiceExtension` does this itself)
|
||||
static func processRawReceivedMessageAsNotification(
|
||||
_ db: Database,
|
||||
envelope: SNProtoEnvelope
|
||||
) throws -> ProcessedMessage? {
|
||||
let processedMessage: ProcessedMessage? = try processRawReceivedMessage(
|
||||
db,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds),
|
||||
serverHash: nil,
|
||||
handleClosedGroupKeyUpdateMessages: false
|
||||
)
|
||||
|
||||
return processedMessage
|
||||
}
|
||||
|
||||
private static func processRawReceivedMessage(
|
||||
_ db: Database,
|
||||
envelope: SNProtoEnvelope,
|
||||
serverExpirationTimestamp: TimeInterval,
|
||||
serverHash: String?,
|
||||
// TODO: These
|
||||
openGroupId: String? = nil,
|
||||
openGroupMessageServerId: UInt64? = nil,
|
||||
handleClosedGroupKeyUpdateMessages: Bool
|
||||
) throws -> ProcessedMessage? {
|
||||
let (message, proto, threadId) = try MessageReceiver.parse(
|
||||
db,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: serverExpirationTimestamp,
|
||||
openGroupId: openGroupId,
|
||||
openGroupMessageServerId: openGroupMessageServerId
|
||||
)
|
||||
message.serverHash = serverHash
|
||||
|
||||
// Ignore invalid messages and hashes for messages we have previously handled
|
||||
guard let variant: Message.Variant = Message.Variant(from: message) else {
|
||||
throw MessageReceiverError.invalidMessage
|
||||
}
|
||||
|
||||
/// **Note:** We want to immediately handle any `ClosedGroupControlMessage` with the kind `encryptionKeyPair` as
|
||||
/// we need the keyPair in storage in order to be able to parse and messages which were signed with the new key (also no need to add
|
||||
/// these as jobs as they will be fully handled in here)
|
||||
if handleClosedGroupKeyUpdateMessages {
|
||||
switch message {
|
||||
case let closedGroupControlMessage as ClosedGroupControlMessage:
|
||||
switch closedGroupControlMessage.kind {
|
||||
case .encryptionKeyPair:
|
||||
try MessageReceiver.handleClosedGroupControlMessage(db, closedGroupControlMessage)
|
||||
return nil
|
||||
|
||||
default: break
|
||||
}
|
||||
|
||||
default: break
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent ControlMessages from being handled multiple times if not supported
|
||||
do {
|
||||
try ControlMessageProcessRecord(
|
||||
threadId: threadId,
|
||||
message: message,
|
||||
serverExpirationTimestamp: serverExpirationTimestamp
|
||||
)?.insert(db)
|
||||
}
|
||||
catch {
|
||||
// We want to custom handle this
|
||||
if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error {
|
||||
throw MessageReceiverError.duplicateControlMessage
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
return (
|
||||
threadId,
|
||||
proto,
|
||||
try MessageReceiveJob.Details.MessageInfo(
|
||||
message: message,
|
||||
variant: variant,
|
||||
proto: proto
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Mutation
|
||||
|
||||
internal extension Message {
|
||||
|
|
|
@ -4,6 +4,7 @@ import Foundation
|
|||
|
||||
public enum MessageReceiverError: LocalizedError {
|
||||
case duplicateMessage
|
||||
case duplicateControlMessage
|
||||
case invalidMessage
|
||||
case unknownMessage
|
||||
case unknownEnvelopeType
|
||||
|
@ -20,7 +21,7 @@ public enum MessageReceiverError: LocalizedError {
|
|||
|
||||
public var isRetryable: Bool {
|
||||
switch self {
|
||||
case .duplicateMessage, .invalidMessage, .unknownMessage, .unknownEnvelopeType,
|
||||
case .duplicateMessage, .duplicateControlMessage, .invalidMessage, .unknownMessage, .unknownEnvelopeType,
|
||||
.invalidSignature, .noData, .senderBlocked, .noThread, .selfSend, .decryptionFailed:
|
||||
return false
|
||||
|
||||
|
@ -31,6 +32,7 @@ public enum MessageReceiverError: LocalizedError {
|
|||
public var errorDescription: String? {
|
||||
switch self {
|
||||
case .duplicateMessage: return "Duplicate message."
|
||||
case .duplicateControlMessage: return "Duplicate control message."
|
||||
case .invalidMessage: return "Invalid message."
|
||||
case .unknownMessage: return "Unknown message type."
|
||||
case .unknownEnvelopeType: return "Unknown envelope type."
|
||||
|
|
|
@ -924,7 +924,11 @@ extension MessageReceiver {
|
|||
).insert(db)
|
||||
}
|
||||
catch {
|
||||
return SNLog("Ignoring duplicate closed group encryption key pair.")
|
||||
if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error {
|
||||
return SNLog("Ignoring duplicate closed group encryption key pair.")
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
SNLog("Received a new closed group encryption key pair.")
|
||||
|
|
|
@ -10,18 +10,14 @@ public enum MessageReceiver {
|
|||
|
||||
public static func parse(
|
||||
_ db: Database,
|
||||
data: Data,
|
||||
envelope: SNProtoEnvelope,
|
||||
serverExpirationTimestamp: TimeInterval?,
|
||||
openGroupId: String? = nil,
|
||||
openGroupMessageServerId: UInt64? = nil,
|
||||
isRetry: Bool = false
|
||||
) throws -> (Message, SNProtoContent) {
|
||||
openGroupId: String?,
|
||||
openGroupMessageServerId: UInt64?
|
||||
) throws -> (Message, SNProtoContent, String) {
|
||||
let userPublicKey: String = getUserHexEncodedPublicKey()
|
||||
let isOpenGroupMessage: Bool = (openGroupMessageServerId != nil)
|
||||
|
||||
// Parse the envelope
|
||||
let envelope = try SNProtoEnvelope.parseData(data)
|
||||
|
||||
// Decrypt the contents
|
||||
guard let ciphertext = envelope.content else { throw MessageReceiverError.noData }
|
||||
|
||||
|
@ -118,69 +114,50 @@ public enum MessageReceiver {
|
|||
}
|
||||
|
||||
// Parse the message
|
||||
let message: Message? = {
|
||||
if let readReceipt = ReadReceipt.fromProto(proto, sender: sender) { return readReceipt }
|
||||
if let typingIndicator = TypingIndicator.fromProto(proto, sender: sender) { return typingIndicator }
|
||||
if let closedGroupControlMessage = ClosedGroupControlMessage.fromProto(proto, sender: sender) { return closedGroupControlMessage }
|
||||
if let dataExtractionNotification = DataExtractionNotification.fromProto(proto, sender: sender) { return dataExtractionNotification }
|
||||
if let expirationTimerUpdate = ExpirationTimerUpdate.fromProto(proto, sender: sender) { return expirationTimerUpdate }
|
||||
if let configurationMessage = ConfigurationMessage.fromProto(proto, sender: sender) { return configurationMessage }
|
||||
if let unsendRequest = UnsendRequest.fromProto(proto, sender: sender) { return unsendRequest }
|
||||
if let messageRequestResponse = MessageRequestResponse.fromProto(proto, sender: sender) { return messageRequestResponse }
|
||||
if let visibleMessage = VisibleMessage.fromProto(proto, sender: sender) { return visibleMessage }
|
||||
return nil
|
||||
}()
|
||||
|
||||
if let message = message {
|
||||
// Ignore self sends if needed
|
||||
if !message.isSelfSendValid {
|
||||
guard sender != userPublicKey else { throw MessageReceiverError.selfSend }
|
||||
}
|
||||
|
||||
// Guard against control messages in open groups
|
||||
if isOpenGroupMessage {
|
||||
guard message is VisibleMessage else { throw MessageReceiverError.invalidMessage }
|
||||
}
|
||||
|
||||
// Finish parsing
|
||||
message.sender = sender
|
||||
message.recipient = userPublicKey
|
||||
message.sentTimestamp = envelope.timestamp
|
||||
message.receivedTimestamp = UInt64((Date().timeIntervalSince1970) * 1000)
|
||||
message.groupPublicKey = groupPublicKey
|
||||
message.openGroupServerMessageId = openGroupMessageServerId
|
||||
|
||||
// Validate
|
||||
var isValid: Bool = message.isValid
|
||||
if message is VisibleMessage && !isValid && proto.dataMessage?.attachments.isEmpty == false {
|
||||
isValid = true
|
||||
}
|
||||
|
||||
guard isValid else {
|
||||
throw MessageReceiverError.invalidMessage
|
||||
}
|
||||
|
||||
// Prevent ControlMessages from being handled multiple times if not supported
|
||||
try ControlMessageProcessRecord(
|
||||
threadId: {
|
||||
if let groupPublicKey: String = groupPublicKey { return groupPublicKey }
|
||||
if let openGroupId: String = openGroupId { return openGroupId }
|
||||
|
||||
switch message {
|
||||
case let message as VisibleMessage: return (message.syncTarget ?? sender)
|
||||
case let message as ExpirationTimerUpdate: return (message.syncTarget ?? sender)
|
||||
default: return sender
|
||||
}
|
||||
}(),
|
||||
message: message,
|
||||
serverExpirationTimestamp: serverExpirationTimestamp,
|
||||
isRetry: false
|
||||
)?.insert(db)
|
||||
|
||||
// Return
|
||||
return (message, proto)
|
||||
guard let message: Message = Message.createMessageFrom(proto, sender: sender) else {
|
||||
throw MessageReceiverError.unknownMessage
|
||||
}
|
||||
|
||||
throw MessageReceiverError.unknownMessage
|
||||
// Ignore self sends if needed
|
||||
guard message.isSelfSendValid || sender != userPublicKey else {
|
||||
throw MessageReceiverError.selfSend
|
||||
}
|
||||
|
||||
// Guard against control messages in open groups
|
||||
guard !isOpenGroupMessage || message is VisibleMessage else {
|
||||
throw MessageReceiverError.invalidMessage
|
||||
}
|
||||
|
||||
// Finish parsing
|
||||
message.sender = sender
|
||||
message.recipient = userPublicKey
|
||||
message.sentTimestamp = envelope.timestamp
|
||||
message.receivedTimestamp = UInt64((Date().timeIntervalSince1970) * 1000)
|
||||
message.groupPublicKey = groupPublicKey
|
||||
message.openGroupServerMessageId = openGroupMessageServerId
|
||||
|
||||
// Validate
|
||||
var isValid: Bool = message.isValid
|
||||
if message is VisibleMessage && !isValid && proto.dataMessage?.attachments.isEmpty == false {
|
||||
isValid = true
|
||||
}
|
||||
|
||||
guard isValid else {
|
||||
throw MessageReceiverError.invalidMessage
|
||||
}
|
||||
|
||||
// Extract the proper threadId for the message
|
||||
let threadId: String = {
|
||||
if let groupPublicKey: String = groupPublicKey { return groupPublicKey }
|
||||
if let openGroupId: String = openGroupId { return openGroupId }
|
||||
|
||||
switch message {
|
||||
case let message as VisibleMessage: return (message.syncTarget ?? sender)
|
||||
case let message as ExpirationTimerUpdate: return (message.syncTarget ?? sender)
|
||||
default: return sender
|
||||
}
|
||||
}()
|
||||
|
||||
return (message, proto, threadId)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -468,8 +468,7 @@ public final class MessageSender {
|
|||
}
|
||||
}(),
|
||||
message: message,
|
||||
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds),
|
||||
isRetry: false
|
||||
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds)
|
||||
)?.insert(db)
|
||||
|
||||
// Sync the message if:
|
||||
|
|
|
@ -174,29 +174,22 @@ public final class ClosedGroupPoller: NSObject {
|
|||
var jobDetailMessages: [MessageReceiveJob.Details.MessageInfo] = []
|
||||
|
||||
messages.forEach { message in
|
||||
guard let envelope = SNProtoEnvelope.from(message) else { return }
|
||||
|
||||
do {
|
||||
let serialisedData: Data = try envelope.serializedData()
|
||||
_ = try message.info.inserted(db)
|
||||
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message)
|
||||
|
||||
// Ignore hashes for messages we have previously handled
|
||||
guard try SnodeReceivedMessageInfo.filter(SnodeReceivedMessageInfo.Columns.hash == message.info.hash).fetchCount(db) == 1 else {
|
||||
throw MessageReceiverError.duplicateMessage
|
||||
}
|
||||
|
||||
jobDetailMessages.append(
|
||||
MessageReceiveJob.Details.MessageInfo(
|
||||
data: serialisedData,
|
||||
serverHash: message.info.hash,
|
||||
serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000)
|
||||
)
|
||||
)
|
||||
jobDetailMessages = jobDetailMessages
|
||||
.appending(processedMessage?.messageInfo)
|
||||
}
|
||||
catch {
|
||||
switch error {
|
||||
// Ignore duplicate messages
|
||||
case .SQLITE_CONSTRAINT_UNIQUE, MessageReceiverError.duplicateMessage: break
|
||||
// Ignore duplicate & selfSend message errors (and don't bother logging
|
||||
// them as there will be a lot since we each service node duplicates messages)
|
||||
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
|
||||
MessageReceiverError.duplicateMessage,
|
||||
MessageReceiverError.duplicateControlMessage,
|
||||
MessageReceiverError.selfSend:
|
||||
break
|
||||
|
||||
default: SNLog("Failed to deserialize envelope due to error: \(error).")
|
||||
}
|
||||
}
|
||||
|
@ -218,7 +211,7 @@ public final class ClosedGroupPoller: NSObject {
|
|||
)
|
||||
}
|
||||
|
||||
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (\(messages.count - messageCount) duplicates)")
|
||||
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(messages.count - messageCount))")
|
||||
}
|
||||
}
|
||||
.map { _ in }
|
||||
|
|
|
@ -109,6 +109,9 @@ public final class Poller {
|
|||
if let error = error as? Error, error == .pollLimitReached {
|
||||
self?.pollCount = 0
|
||||
}
|
||||
else if UserDefaults.sharedLokiProject?[.isMainAppActive] != true {
|
||||
// Do nothing when an error gets throws right after returning from the background (happens frequently)
|
||||
}
|
||||
else {
|
||||
SNLog("Polling \(nextSnode) failed; dropping it and switching to next snode.")
|
||||
SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, publicKey: userPublicKey)
|
||||
|
@ -123,7 +126,7 @@ public final class Poller {
|
|||
private func poll(_ snode: Snode, seal longTermSeal: Resolver<Void>) -> Promise<Void> {
|
||||
guard isPolling.wrappedValue else { return Promise { $0.fulfill(()) } }
|
||||
|
||||
let userPublicKey = getUserHexEncodedPublicKey()
|
||||
let userPublicKey: String = getUserHexEncodedPublicKey()
|
||||
|
||||
return SnodeAPI.getMessages(from: snode, associatedWith: userPublicKey)
|
||||
.then(on: Threading.pollerQueue) { [weak self] messages -> Promise<Void> in
|
||||
|
@ -136,43 +139,26 @@ public final class Poller {
|
|||
var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:]
|
||||
|
||||
messages.forEach { message in
|
||||
guard let envelope = SNProtoEnvelope.from(message) else { return }
|
||||
|
||||
// Extract the threadId and add that to the messageReceive job for
|
||||
// multi-threading and garbage collection purposes
|
||||
let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope)
|
||||
|
||||
if threadId == nil {
|
||||
// TODO: I assume a configuration message doesn't need a 'threadId' (confirm this and set the 'requiresThreadId' requirement accordingly)
|
||||
// TODO: Does the configuration message come through here????
|
||||
print("RAWR WHAT CASES LETS THIS BE NIL????")
|
||||
}
|
||||
|
||||
do {
|
||||
let serialisedData: Data = try envelope.serializedData()
|
||||
_ = try message.info.inserted(db)
|
||||
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message)
|
||||
let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId)
|
||||
|
||||
// Ignore hashes for messages we have previously handled
|
||||
guard try SnodeReceivedMessageInfo.filter(SnodeReceivedMessageInfo.Columns.hash == message.info.hash).fetchCount(db) == 1 else {
|
||||
throw MessageReceiverError.duplicateMessage
|
||||
}
|
||||
|
||||
threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? [])
|
||||
.appending(
|
||||
MessageReceiveJob.Details.MessageInfo(
|
||||
data: serialisedData,
|
||||
serverHash: message.info.hash,
|
||||
serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000)
|
||||
)
|
||||
)
|
||||
threadMessages[key] = (threadMessages[key] ?? [])
|
||||
.appending(processedMessage?.messageInfo)
|
||||
}
|
||||
catch {
|
||||
switch error {
|
||||
// Ignore duplicate messages
|
||||
case .SQLITE_CONSTRAINT_UNIQUE, MessageReceiverError.duplicateMessage: break
|
||||
|
||||
default:
|
||||
SNLog("Failed to deserialize envelope due to error: \(error).")
|
||||
// Ignore duplicate & selfSend message errors (and don't bother logging
|
||||
// them as there will be a lot since we each service node duplicates messages)
|
||||
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
|
||||
MessageReceiverError.duplicateMessage,
|
||||
MessageReceiverError.duplicateControlMessage,
|
||||
MessageReceiverError.selfSend:
|
||||
break
|
||||
|
||||
default: SNLog("Failed to deserialize envelope due to error: \(error).")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,7 +183,7 @@ public final class Poller {
|
|||
}
|
||||
}
|
||||
|
||||
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (\(messages.count - messageCount) duplicates)")
|
||||
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (duplicates: \(messages.count - messageCount))")
|
||||
}
|
||||
|
||||
self?.pollCount += 1
|
||||
|
|
|
@ -710,6 +710,8 @@ public extension SessionThreadViewModel {
|
|||
// MARK: - Search Queries
|
||||
|
||||
public extension SessionThreadViewModel {
|
||||
fileprivate static let searchResultsLimit: Int = 500
|
||||
|
||||
static func searchTermParts(_ searchTerm: String) -> [String] {
|
||||
/// Process the search term in order to extract the parts of the search pattern we want
|
||||
///
|
||||
|
@ -836,6 +838,7 @@ public extension SessionThreadViewModel {
|
|||
)
|
||||
|
||||
ORDER BY \(Column.rank), \(interaction[.timestampMs].desc)
|
||||
LIMIT \(SQL("\(SessionThreadViewModel.searchResultsLimit)"))
|
||||
"""
|
||||
|
||||
return request.adapted { db in
|
||||
|
@ -1194,6 +1197,7 @@ public extension SessionThreadViewModel {
|
|||
\(ViewModel.closedGroupNameKey),
|
||||
\(ViewModel.openGroupNameKey),
|
||||
\(ViewModel.threadIdKey)
|
||||
LIMIT \(SQL("\(SessionThreadViewModel.searchResultsLimit)"))
|
||||
"""
|
||||
|
||||
// Construct the actual request
|
||||
|
|
|
@ -35,8 +35,11 @@ public final class NotificationServiceExtension : UNNotificationServiceExtension
|
|||
}
|
||||
}
|
||||
let notificationContent = self.notificationContent!
|
||||
guard let base64EncodedData = notificationContent.userInfo["ENCRYPTED_DATA"] as! String?, let data = Data(base64Encoded: base64EncodedData),
|
||||
let envelope = try? MessageWrapper.unwrap(data: data), let envelopeAsData = try? envelope.serializedData() else {
|
||||
guard
|
||||
let base64EncodedData: String = notificationContent.userInfo["ENCRYPTED_DATA"] as? String,
|
||||
let data: Data = Data(base64Encoded: base64EncodedData),
|
||||
let envelope = try? MessageWrapper.unwrap(data: data)
|
||||
else {
|
||||
return self.handleFailure(for: notificationContent)
|
||||
}
|
||||
|
||||
|
@ -45,14 +48,20 @@ public final class NotificationServiceExtension : UNNotificationServiceExtension
|
|||
// is added to notification center
|
||||
GRDBStorage.shared.write { db in
|
||||
do {
|
||||
let (message, proto) = try MessageReceiver.parse(
|
||||
db,
|
||||
data: envelopeAsData,
|
||||
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds)
|
||||
)
|
||||
switch message {
|
||||
guard let processedMessage: ProcessedMessage = try Message.processRawReceivedMessageAsNotification(db, envelope: envelope) else {
|
||||
self.handleFailure(for: notificationContent)
|
||||
return
|
||||
}
|
||||
|
||||
switch processedMessage.messageInfo.message {
|
||||
case let visibleMessage as VisibleMessage:
|
||||
let interactionId: Int64 = try MessageReceiver.handleVisibleMessage(db, message: visibleMessage, associatedWithProto: proto, openGroupId: nil, isBackgroundPoll: false)
|
||||
let interactionId: Int64 = try MessageReceiver.handleVisibleMessage(
|
||||
db,
|
||||
message: visibleMessage,
|
||||
associatedWithProto: processedMessage.proto,
|
||||
openGroupId: nil,
|
||||
isBackgroundPoll: false
|
||||
)
|
||||
|
||||
// Remove the notifications if there is an outgoing messages from a linked device
|
||||
if
|
||||
|
|
|
@ -78,20 +78,21 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
|
||||
// MARK: --Swarms
|
||||
|
||||
// Note: There is no index on the collection column so unfortunately it takes the same amount of
|
||||
// time to enumerate through all collections as it does to just get the count of collections, as
|
||||
// a result if the database is very large this part can be slow (~15s with 2,000,000 rows) - we
|
||||
// want to show some kind of progress while doing this enumeration so the below code includes a
|
||||
// number of rough values to show some kind of progression while the enumeration occurs (most users
|
||||
// won't run into issues with this at all)
|
||||
var swarmCollections: Set<String> = []
|
||||
/// **Note:** There is no index on the collection column so unfortunately it takes the same amount of time to enumerate through all
|
||||
/// collections as it does to just get the count of collections, due to this, if the database is very large, importing thecollections can be
|
||||
/// very slow (~15s with 2,000,000 rows) - we want to show some kind of progress while enumerating so the below code creates a
|
||||
/// very rought guess of the number of collections based on the file size of the database (this shouldn't affect most users at all)
|
||||
let roughMbPerCollection: CGFloat = 2.5
|
||||
let oldDatabaseSizeBytes: CGFloat = (try? FileManager.default
|
||||
.attributesOfItem(atPath: SUKLegacy.legacyDatabaseFilepath)[.size]
|
||||
.asType(CGFloat.self))
|
||||
.defaulting(to: 0)
|
||||
let roughNumCollections: CGFloat = (((oldDatabaseSizeBytes / 1024) / 1024) / roughMbPerCollection)
|
||||
let startProgress: CGFloat = 0.02
|
||||
let swarmCompleteProgress: CGFloat = 0.90
|
||||
let interEnumerationMaxProgress: CGFloat = ((swarmCompleteProgress - startProgress) * 0.8)
|
||||
let maxCollectionsEstimate: CGFloat = 1000
|
||||
let numCollectionsToTriggerProgressUpdate: CGFloat = 20
|
||||
var swarmCollections: Set<String> = []
|
||||
var collectionIndex: CGFloat = 0
|
||||
var oldProgress: CGFloat = startProgress
|
||||
|
||||
transaction.enumerateCollections { collectionName, _ in
|
||||
if collectionName.starts(with: SSKLegacy.swarmCollectionPrefix) {
|
||||
swarmCollections.insert(collectionName.substring(from: SSKLegacy.swarmCollectionPrefix.count))
|
||||
|
@ -99,10 +100,14 @@ enum _003_YDBToGRDBMigration: Migration {
|
|||
|
||||
collectionIndex += 1
|
||||
|
||||
if collectionIndex.truncatingRemainder(dividingBy: numCollectionsToTriggerProgressUpdate) == 0 {
|
||||
oldProgress = (startProgress + (interEnumerationMaxProgress * (collectionIndex / maxCollectionsEstimate)))
|
||||
GRDBStorage.shared.update(progress: oldProgress, for: self, in: target)
|
||||
}
|
||||
GRDBStorage.shared.update(
|
||||
progress: min(
|
||||
swarmCompleteProgress,
|
||||
((collectionIndex / roughNumCollections) * (swarmCompleteProgress - startProgress))
|
||||
),
|
||||
for: self,
|
||||
in: target
|
||||
)
|
||||
}
|
||||
GRDBStorage.shared.update(progress: swarmCompleteProgress, for: self, in: target)
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ public enum SUKLegacy {
|
|||
|
||||
// MARK: - Database Functions
|
||||
|
||||
private static var legacyDatabaseFilepath: String {
|
||||
public static var legacyDatabaseFilepath: String {
|
||||
let sharedDirUrl: URL = URL(fileURLWithPath: OWSFileSystem.appSharedDataDirectoryPath())
|
||||
|
||||
return sharedDirUrl
|
||||
|
|
|
@ -63,7 +63,12 @@ public final class JobRunner {
|
|||
)
|
||||
let messageReceiveQueue: JobQueue = JobQueue(
|
||||
type: .messageReceive,
|
||||
executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
|
||||
// Explicitly serial as executing concurrently means message receives getting processed at
|
||||
// different speeds which can result in:
|
||||
// • Small batches of messages appearing in the UI before larger batches
|
||||
// • Closed group messages encrypted with updated keys could start parsing before it's key
|
||||
// update message has been processed (ie. guaranteed to fail)
|
||||
executionType: .serial,
|
||||
qos: .default,
|
||||
jobVariants: [
|
||||
jobVariants.remove(.messageReceive)
|
||||
|
|
|
@ -15,6 +15,7 @@ typedef NS_ENUM(NSInteger, ImageFormat) {
|
|||
ImageFormat_Bmp,
|
||||
};
|
||||
|
||||
// FIXME: Refactor all of these to be in Swift against 'Data'
|
||||
@implementation NSData (Image)
|
||||
|
||||
+ (BOOL)ows_isValidImageAtPath:(NSString *)filePath
|
||||
|
|
Loading…
Reference in New Issue