Morgan Pretty 5b6be3912d Fixed an edge-case crash, a couple of minor bugs and made future-proofing tweaks
Fixed a bit of the OnionRequest error handling to better send through server error messages for debugging
Fixed a bug where the initial offset could be negative if the number of messages was less than the page size resulting in a crash
Fixed a crash due to a code path which was thought to be impossible exiting but is actually possible (so just erroring)
Added the 'expire' SnodeAPI endpoint
Removed the 'openGroupServerTimestamp' property (was unused and just added confusion)
Updated the logic to always handle the 'fileId' for uploads/downloads as a string instead of casting it to an Int64
Updated the OpenGroup room parsing to support either Int or String values for image ids
2022-07-12 17:43:52 +10:00

437 lines
18 KiB

// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import SessionSnodeKit
/// Abstract base class for `VisibleMessage` and `ControlMessage`.
public class Message: Codable {
public var id: String?
public var threadId: String?
public var sentTimestamp: UInt64?
public var receivedTimestamp: UInt64?
public var recipient: String?
public var sender: String?
public var groupPublicKey: String?
public var openGroupServerMessageId: UInt64?
public var serverHash: String?
public var ttl: UInt64 { 14 * 24 * 60 * 60 * 1000 }
public var isSelfSendValid: Bool { false }
public var shouldBeRetryable: Bool { false }
// MARK: - Validation
public var isValid: Bool {
if let sentTimestamp = sentTimestamp { guard sentTimestamp > 0 else { return false } }
if let receivedTimestamp = receivedTimestamp { guard receivedTimestamp > 0 else { return false } }
return sender != nil && recipient != nil
// MARK: - Initialization
public init(
id: String? = nil,
threadId: String? = nil,
sentTimestamp: UInt64? = nil,
receivedTimestamp: UInt64? = nil,
recipient: String? = nil,
sender: String? = nil,
groupPublicKey: String? = nil,
openGroupServerMessageId: UInt64? = nil,
serverHash: String? = nil
) { = id
self.threadId = threadId
self.sentTimestamp = sentTimestamp
self.receivedTimestamp = receivedTimestamp
self.recipient = recipient
self.sender = sender
self.groupPublicKey = groupPublicKey
self.openGroupServerMessageId = openGroupServerMessageId
self.serverHash = serverHash
// MARK: - Proto Conversion
public class func fromProto(_ proto: SNProtoContent, sender: String) -> Self? {
preconditionFailure("fromProto(_:sender:) is abstract and must be overridden.")
public func toProto(_ db: Database) -> SNProtoContent? {
preconditionFailure("toProto(_:) is abstract and must be overridden.")
public func setGroupContextIfNeeded(_ db: Database, on dataMessage: SNProtoDataMessage.SNProtoDataMessageBuilder) throws {
let threadId: String = threadId,
(try? ClosedGroup.exists(db, id: threadId)) == true,
let legacyGroupId: Data = "\(SMKLegacy.closedGroupIdPrefix)\(threadId)".data(using: .utf8)
else { return }
// Android needs a group context or it'll interpret the message as a one-to-one message
let groupProto = SNProtoGroupContext.builder(id: legacyGroupId, type: .deliver)
// MARK: - Message Parsing/Processing
public typealias ProcessedMessage = (
threadId: String?,
proto: SNProtoContent,
messageInfo: MessageReceiveJob.Details.MessageInfo
public extension Message {
static let nonThreadMessageId: String = "NON_THREAD_MESSAGE"
enum Variant: String, Codable {
case readReceipt
case typingIndicator
case closedGroupControlMessage
case dataExtractionNotification
case expirationTimerUpdate
case configurationMessage
case unsendRequest
case messageRequestResponse
case visibleMessage
case callMessage
init?(from type: Message) {
switch type {
case is ReadReceipt: self = .readReceipt
case is TypingIndicator: self = .typingIndicator
case is ClosedGroupControlMessage: self = .closedGroupControlMessage
case is DataExtractionNotification: self = .dataExtractionNotification
case is ExpirationTimerUpdate: self = .expirationTimerUpdate
case is ConfigurationMessage: self = .configurationMessage
case is UnsendRequest: self = .unsendRequest
case is MessageRequestResponse: self = .messageRequestResponse
case is VisibleMessage: self = .visibleMessage
case is CallMessage: self = .callMessage
default: return nil
var messageType: Message.Type {
switch self {
case .readReceipt: return ReadReceipt.self
case .typingIndicator: return TypingIndicator.self
case .closedGroupControlMessage: return ClosedGroupControlMessage.self
case .dataExtractionNotification: return DataExtractionNotification.self
case .expirationTimerUpdate: return ExpirationTimerUpdate.self
case .configurationMessage: return ConfigurationMessage.self
case .unsendRequest: return UnsendRequest.self
case .messageRequestResponse: return MessageRequestResponse.self
case .visibleMessage: return VisibleMessage.self
case .callMessage: return CallMessage.self
func decode<CodingKeys: CodingKey>(from container: KeyedDecodingContainer<CodingKeys>, forKey key: CodingKeys) throws -> Message {
switch self {
case .readReceipt: return try container.decode(ReadReceipt.self, forKey: key)
case .typingIndicator: return try container.decode(TypingIndicator.self, forKey: key)
case .closedGroupControlMessage:
return try container.decode(ClosedGroupControlMessage.self, forKey: key)
case .dataExtractionNotification:
return try container.decode(DataExtractionNotification.self, forKey: key)
case .expirationTimerUpdate: return try container.decode(ExpirationTimerUpdate.self, forKey: key)
case .configurationMessage: return try container.decode(ConfigurationMessage.self, forKey: key)
case .unsendRequest: return try container.decode(UnsendRequest.self, forKey: key)
case .messageRequestResponse: return try container.decode(MessageRequestResponse.self, forKey: key)
case .visibleMessage: return try container.decode(VisibleMessage.self, forKey: key)
case .callMessage: return try container.decode(CallMessage.self, forKey: key)
static func createMessageFrom(_ proto: SNProtoContent, sender: String) -> Message? {
// Note: This array is ordered intentionally to ensure the correct types are processed
// and aren't parsed as the wrong type
let prioritisedVariants: [Variant] = [
return prioritisedVariants
.reduce(nil) { prev, variant in
guard prev == nil else { return prev }
return variant.messageType.fromProto(proto, sender: sender)
static func shouldSync(message: Message) -> Bool {
switch message {
case let controlMessage as ClosedGroupControlMessage:
switch controlMessage.kind {
case .new: return true
default: return false
case let callMessage as CallMessage:
switch callMessage.kind {
case .answer, .endCall: return true
default: return false
case is ConfigurationMessage: return true
case is UnsendRequest: return true
default: return false
static func processRawReceivedMessage(
_ db: Database,
rawMessage: SnodeReceivedMessage
) throws -> ProcessedMessage? {
guard let envelope = SNProtoEnvelope.from(rawMessage) else {
throw MessageReceiverError.invalidMessage
do {
let processedMessage: ProcessedMessage? = try processRawReceivedMessage(
envelope: envelope,
serverExpirationTimestamp: (TimeInterval( / 1000),
handleClosedGroupKeyUpdateMessages: true
// Retrieve the number of entries we have for the hash of this message
let numExistingHashes: Int = (try? SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.hash ==
.defaulting(to: 0)
// Try to insert the raw message info into the database (used for both request paging and
// de-duping purposes)
_ = try
// If the above insertion worked then we hadn't processed this message for this specific
// service node, but may have done so for another node - if the hash already existed in
// the database before we inserted it for this node then we can ignore this message as a
// duplicate
guard numExistingHashes == 0 else { throw MessageReceiverError.duplicateMessage }
return processedMessage
catch {
// If we get 'selfSend' or 'duplicateControlMessage' errors then we still want to insert
// the SnodeReceivedMessageInfo to prevent retrieving and attempting to process the same
// message again (as well as ensure the next poll doesn't retrieve the same message)
switch error {
case MessageReceiverError.selfSend, MessageReceiverError.duplicateControlMessage:
_ = try?
default: break
throw error
static func processRawReceivedMessage(
_ db: Database,
serializedData: Data,
serverHash: String?
) throws -> ProcessedMessage? {
guard let envelope = try? SNProtoEnvelope.parseData(serializedData) else {
throw MessageReceiverError.invalidMessage
return try processRawReceivedMessage(
envelope: envelope,
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds),
serverHash: serverHash,
handleClosedGroupKeyUpdateMessages: true
/// This method behaves slightly differently from the other `processRawReceivedMessage` methods as it doesn't
/// insert the "message info" for deduping (we want the poller to re-process the message) and also avoids handling any
/// closed group key update messages (the `NotificationServiceExtension` does this itself)
static func processRawReceivedMessageAsNotification(
_ db: Database,
envelope: SNProtoEnvelope
) throws -> ProcessedMessage? {
let processedMessage: ProcessedMessage? = try processRawReceivedMessage(
envelope: envelope,
serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds),
serverHash: nil,
handleClosedGroupKeyUpdateMessages: false
return processedMessage
static func processReceivedOpenGroupMessage(
_ db: Database,
openGroupId: String,
openGroupServerPublicKey: String,
message: OpenGroupAPI.Message,
data: Data,
dependencies: SMKDependencies = SMKDependencies()
) throws -> ProcessedMessage? {
// Need a sender in order to process the message
guard let sender: String = message.sender else { return nil }
// Note: The `posted` value is in seconds but all messages in the database use milliseconds for timestamps
let envelopeBuilder = SNProtoEnvelope.builder(type: .sessionMessage, timestamp: UInt64(floor(message.posted * 1000)))
guard let envelope = try? else {
throw MessageReceiverError.invalidMessage
return try processRawReceivedMessage(
envelope: envelope,
serverExpirationTimestamp: nil,
serverHash: nil,
openGroupId: openGroupId,
openGroupServerPublicKey: openGroupServerPublicKey,
handleClosedGroupKeyUpdateMessages: false,
dependencies: dependencies
static func processReceivedOpenGroupDirectMessage(
_ db: Database,
openGroupServerPublicKey: String,
message: OpenGroupAPI.DirectMessage,
data: Data,
isOutgoing: Bool? = nil,
otherBlindedPublicKey: String? = nil,
dependencies: SMKDependencies = SMKDependencies()
) throws -> ProcessedMessage? {
// Note: The `posted` value is in seconds but all messages in the database use milliseconds for timestamps
let envelopeBuilder = SNProtoEnvelope.builder(type: .sessionMessage, timestamp: UInt64(floor(message.posted * 1000)))
guard let envelope = try? else {
throw MessageReceiverError.invalidMessage
return try processRawReceivedMessage(
envelope: envelope,
serverExpirationTimestamp: nil,
serverHash: nil,
openGroupId: nil, // Explicitly null since it shouldn't be handled as an open group message
openGroupServerPublicKey: openGroupServerPublicKey,
isOutgoing: isOutgoing,
otherBlindedPublicKey: otherBlindedPublicKey,
handleClosedGroupKeyUpdateMessages: false,
dependencies: dependencies
private static func processRawReceivedMessage(
_ db: Database,
envelope: SNProtoEnvelope,
serverExpirationTimestamp: TimeInterval?,
serverHash: String?,
openGroupId: String? = nil,
openGroupMessageServerId: Int64? = nil,
openGroupServerPublicKey: String? = nil,
isOutgoing: Bool? = nil,
otherBlindedPublicKey: String? = nil,
handleClosedGroupKeyUpdateMessages: Bool,
dependencies: SMKDependencies = SMKDependencies()
) throws -> ProcessedMessage? {
let (message, proto, threadId) = try MessageReceiver.parse(
envelope: envelope,
serverExpirationTimestamp: serverExpirationTimestamp,
openGroupId: openGroupId,
openGroupMessageServerId: openGroupMessageServerId,
openGroupServerPublicKey: openGroupServerPublicKey,
isOutgoing: isOutgoing,
otherBlindedPublicKey: otherBlindedPublicKey,
dependencies: dependencies
message.serverHash = serverHash
// Ignore invalid messages and hashes for messages we have previously handled
guard let variant: Message.Variant = Message.Variant(from: message) else {
throw MessageReceiverError.invalidMessage
/// **Note:** We want to immediately handle any `ClosedGroupControlMessage` with the kind `encryptionKeyPair` as
/// we need the keyPair in storage in order to be able to parse and messages which were signed with the new key (also no need to add
/// these as jobs as they will be fully handled in here)
if handleClosedGroupKeyUpdateMessages {
switch message {
case let closedGroupControlMessage as ClosedGroupControlMessage:
switch closedGroupControlMessage.kind {
case .encryptionKeyPair:
try MessageReceiver.handleClosedGroupControlMessage(db, closedGroupControlMessage)
return nil
default: break
default: break
// Prevent ControlMessages from being handled multiple times if not supported
do {
try ControlMessageProcessRecord(
threadId: threadId,
message: message,
serverExpirationTimestamp: serverExpirationTimestamp
catch {
// We want to custom handle this
if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error {
throw MessageReceiverError.duplicateControlMessage
throw error
return (
try MessageReceiveJob.Details.MessageInfo(
message: message,
variant: variant,
proto: proto
// MARK: - Mutation
internal extension Message {
func with(sentTimestamp: UInt64) -> Message {
self.sentTimestamp = sentTimestamp
return self