Fixed a few small issues found when testing

Fixed a couple of issues with the ConfigurationSyncJob logic
Moved the proto parsing out of the MessageReceiveJob write block (to reduce time blocking writes)
Removed difficulty from the SendMessageResponse (deprecated and removed)
This commit is contained in:
Morgan Pretty 2023-01-18 10:56:18 +11:00
parent af9a135c08
commit 0abb09c0cf
6 changed files with 66 additions and 27 deletions

View File

@ -104,10 +104,11 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
public private(set) lazy var threadData: SessionThreadViewModel = SessionThreadViewModel(
threadId: self.threadId,
threadVariant: self.initialThreadVariant,
threadIsNoteToSelf: (self.threadId == getUserHexEncodedPublicKey()),
currentUserIsClosedGroupMember: (self.initialThreadVariant != .closedGroup ?
nil :
Storage.shared.read { db in
try GroupMember
GroupMember
.filter(GroupMember.Columns.groupId == self.threadId)
.filter(GroupMember.Columns.profileId == getUserHexEncodedPublicKey(db))
.filter(GroupMember.Columns.role == GroupMember.Role.standard)

View File

@ -110,10 +110,17 @@ public enum ConfigurationSyncJob: JobExecutor {
.collect()
.eraseToAnyPublisher()
}
.map { (responses: [HTTP.BatchResponse]) -> [SuccessfulChange] in
.flatMap { (responses: [HTTP.BatchResponse]) -> AnyPublisher<[SuccessfulChange], Error> in
// We make a sequence call for this so it's possible to get fewer responses than
// expected so if that happens fail and re-run later
guard responses.count == pendingSwarmConfigChanges.count else {
return Fail(error: HTTPError.invalidResponse)
.eraseToAnyPublisher()
}
// Process the response data into an easy to understand for (this isn't strictly
// needed but the code gets convoluted without this)
zip(responses, pendingSwarmConfigChanges)
let successfulChanges: [SuccessfulChange] = zip(responses, pendingSwarmConfigChanges)
.compactMap { (batchResponse: HTTP.BatchResponse, pendingSwarmChange: SingleDestinationChanges) -> [SuccessfulChange]? in
let maybePublicKey: String? = {
switch pendingSwarmChange.destination {
@ -145,6 +152,7 @@ public enum ConfigurationSyncJob: JobExecutor {
guard
let subResponse: HTTP.BatchSubResponse<SendMessagesResponse> = (next.response as? HTTP.BatchSubResponse<SendMessagesResponse>),
200...299 ~= subResponse.code,
!subResponse.failedToParseBody,
let sendMessageResponse: SendMessagesResponse = subResponse.body
else { return }
@ -162,6 +170,10 @@ public enum ConfigurationSyncJob: JobExecutor {
}
}
.flatMap { $0 }
return Just(successfulChanges)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
.map { (successfulChanges: [SuccessfulChange]) -> [ConfigDump] in
// Now that we have the successful changes, we need to mark them as pushed and
@ -189,6 +201,13 @@ public enum ConfigurationSyncJob: JobExecutor {
}
}
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
failure(job, error, false)
}
},
receiveValue: { (configDumps: [ConfigDump]) in
// Flag to indicate whether the job should be finished or will run again
var shouldFinishCurrentJob: Bool = false
@ -209,12 +228,17 @@ public enum ConfigurationSyncJob: JobExecutor {
let existingJob: Job = try? Job
.filter(Job.Columns.id != job.id)
.filter(Job.Columns.variant == Job.Variant.configurationSync)
.fetchOne(db),
!JobRunner.isCurrentlyRunning(existingJob)
.fetchOne(db)
{
_ = try existingJob
.with(nextRunTimestamp: nextRunTimestamp)
.saved(db)
// If the next job isn't currently running then delay it's start time
// until the 'nextRunTimestamp'
if !JobRunner.isCurrentlyRunning(existingJob) {
_ = try existingJob
.with(nextRunTimestamp: nextRunTimestamp)
.saved(db)
}
// If there is another job then we should finish this one
shouldFinishCurrentJob = true
return job
}
@ -302,10 +326,10 @@ public extension ConfigurationSyncJob {
@discardableResult static func createOrUpdateIfNeeded(_ db: Database) -> Job {
// Try to get an existing job (if there is one that's not running)
if
let existingJob: Job = try? Job
let existingJobs: [Job] = try? Job
.filter(Job.Columns.variant == Job.Variant.configurationSync)
.fetchOne(db),
!JobRunner.isCurrentlyRunning(existingJob)
.fetchAll(db),
let existingJob: Job = existingJobs.first(where: { !JobRunner.isCurrentlyRunning($0) })
{
return existingJob
}

View File

@ -25,9 +25,24 @@ public enum MessageReceiveJob: JobExecutor {
}
var updatedJob: Job = job
var leastSevereError: Error?
let nonConfigMessages: [Details.MessageInfo] = details.messages
var lastError: Error?
var remainingMessagesToProcess: [Details.MessageInfo] = []
let nonConfigMessages: [(info: Details.MessageInfo, proto: SNProtoContent)] = details.messages
.filter { $0.variant != .sharedConfigMessage }
.compactMap { messageInfo -> (info: Details.MessageInfo, proto: SNProtoContent)? in
do {
return (messageInfo, try SNProtoContent.parseData(messageInfo.serializedProtoData))
}
catch {
SNLog("Couldn't receive message due to error: \(error)")
lastError = error
// We failed to process this message but it is a retryable error
// so add it to the list to re-process
remainingMessagesToProcess.append(messageInfo)
return nil
}
}
let sharedConfigMessages: [SharedConfigMessage] = details.messages
.compactMap { $0.message as? SharedConfigMessage }
@ -40,14 +55,12 @@ public enum MessageReceiveJob: JobExecutor {
)
// Handle the remaining messages
var remainingMessagesToProcess: [Details.MessageInfo] = []
for messageInfo in nonConfigMessages {
for (messageInfo, protoContent) in nonConfigMessages {
do {
try MessageReceiver.handle(
db,
message: messageInfo.message,
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
associatedWithProto: protoContent,
openGroupId: nil
)
}
@ -71,7 +84,7 @@ public enum MessageReceiveJob: JobExecutor {
default:
SNLog("Couldn't receive message due to error: \(error)")
leastSevereError = error
lastError = error
// We failed to process this message but it is a retryable error
// so add it to the list to re-process
@ -94,12 +107,12 @@ public enum MessageReceiveJob: JobExecutor {
}
// Handle the result
switch leastSevereError {
switch lastError {
case let error as MessageReceiverError where !error.isRetryable:
failure(updatedJob, error, true)
case .some(let error):
failure(updatedJob, error, false) // TODO: Confirm the 'noKeyPair' errors here aren't an issue
failure(updatedJob, error, false)
case .none:
success(updatedJob, false)

View File

@ -4,12 +4,10 @@ import Foundation
public class SendMessagesResponse: SnodeRecursiveResponse<SendMessagesResponse.SwarmItem> {
private enum CodingKeys: String, CodingKey {
case difficulty
case hash
case swarm
}
public let difficulty: Int64
public let hash: String
// MARK: - Initialization
@ -17,7 +15,6 @@ public class SendMessagesResponse: SnodeRecursiveResponse<SendMessagesResponse.S
required init(from decoder: Decoder) throws {
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
difficulty = try container.decode(Int64.self, forKey: .difficulty)
hash = try container.decode(String.self, forKey: .hash)
try super.init(from: decoder)

View File

@ -549,7 +549,7 @@ public final class SnodeAPI {
var requests: [SnodeAPI.BatchRequest.Info] = targetedMessages
.map { message, namespace in
// Check if this namespace requires authentication
guard namespace.requiresReadAuthentication else {
guard namespace.requiresWriteAuthentication else {
return BatchRequest.Info(
request: SnodeRequest(
endpoint: .sendMessage,
@ -618,7 +618,7 @@ public final class SnodeAPI {
using: dependencies
)
.eraseToAnyPublisher()
.decoded(as: responseTypes, using: dependencies)
.decoded(as: responseTypes, requireAllResults: false, using: dependencies)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)

View File

@ -81,6 +81,7 @@ public extension Decodable {
public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error {
func decoded(
as types: HTTP.BatchResponseTypes,
requireAllResults: Bool = true,
using dependencies: Dependencies = Dependencies()
) -> AnyPublisher<HTTP.BatchResponse, Error> {
self
@ -101,7 +102,7 @@ public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure
case let anyArray as [Any]:
dataArray = anyArray.compactMap { try? JSONSerialization.data(withJSONObject: $0) }
guard dataArray.count == types.count else {
guard !requireAllResults || dataArray.count == types.count else {
return Fail(error: HTTPError.parsingFailed)
.eraseToAnyPublisher()
}
@ -110,7 +111,10 @@ public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure
guard
let resultsArray: [Data] = (anyDict["results"] as? [Any])?
.compactMap({ try? JSONSerialization.data(withJSONObject: $0) }),
resultsArray.count == types.count
(
!requireAllResults ||
resultsArray.count == types.count
)
else {
return Fail(error: HTTPError.parsingFailed)
.eraseToAnyPublisher()