From 3cab81c329437072dc1308681d55d0e546c0bb26 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Thu, 13 May 2021 09:38:39 +1000 Subject: [PATCH] Fix message send job attachment upload handling --- .../securesms/database/Storage.kt | 2 +- .../loki/database/SessionJobDatabase.kt | 8 +++--- .../libsession/messaging/StorageProtocol.kt | 2 +- .../messaging/jobs/AttachmentUploadJob.kt | 2 +- .../libsession/messaging/jobs/JobQueue.kt | 26 ++++++++++++++----- .../messaging/jobs/MessageSendJob.kt | 4 +-- .../ReceivedMessageHandler.kt | 2 +- 7 files changed, 29 insertions(+), 17 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index f93670ae8..f1602a8e1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -189,7 +189,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) } - override fun markJobAsFailed(jobId: String) { + override fun markJobAsFailedPermanently(jobId: String) { DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index d8c072dd5..f684a778a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -25,18 +25,18 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa fun persistJob(job: Job) { val database = databaseHelper.writableDatabase val contentValues = ContentValues(4) - contentValues.put(jobID, job.id) + contentValues.put(jobID, job.id!!) contentValues.put(jobType, job.getFactoryKey()) contentValues.put(failureCount, job.failureCount) contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize())) - database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!)) + database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf( job.id!! )) } fun markJobAsSucceeded(jobID: String) { databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } - fun markJobAsFailed(jobID: String) { + fun markJobAsFailedPermanently(jobID: String) { databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } @@ -74,7 +74,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa val database = databaseHelper.readableDatabase var cursor: android.database.Cursor? = null try { - cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) + cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id!! )) return cursor == null || !cursor.moveToFirst() } catch (e: Exception) { // Do nothing diff --git a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt index da604264d..d850e3066 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt @@ -45,7 +45,7 @@ interface StorageProtocol { // Jobs fun persistJob(job: Job) fun markJobAsSucceeded(jobId: String) - fun markJobAsFailed(jobId: String) + fun markJobAsFailedPermanently(jobId: String) fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index cbdcd42fc..690caf512 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -103,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val messageSendJob = storage.getMessageSendJob(messageSendJobID) MessageSender.handleFailedMessageSend(this.message, e) if (messageSendJob != null) { - storage.markJobAsFailed(messageSendJobID) + storage.markJobAsFailedPermanently(messageSendJobID) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index fab49384f..a2f47556b 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsignal.utilities.logging.Log +import java.lang.IllegalStateException import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -47,16 +48,15 @@ class JobQueue : JobDelegate { while (isActive) { for (job in queue) { when (job) { - is NotifyPNServerJob, - is AttachmentUploadJob, - is MessageSendJob -> txQueue.send(job) + is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job) is AttachmentDownloadJob -> attachmentQueue.send(job) - else -> rxQueue.send(job) + is MessageReceiveJob -> rxQueue.send(job) + else -> throw IllegalStateException("Unexpected job type.") } } } - // job has been cancelled + // The job has been cancelled receiveJob.cancel() txJob.cancel() attachmentJob.cancel() @@ -123,11 +123,23 @@ class JobQueue : JobDelegate { } override fun handleJobFailed(job: Job, error: Exception) { - job.failureCount += 1 + // Canceled val storage = MessagingModuleConfiguration.shared.storage if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.") } + // Message send jobs waiting for the attachment to upload + if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) { + val retryInterval: Long = 1000 * 4 + Log.i("Loki", "Message send job waiting for attachment upload to finish.") + timer.schedule(delay = retryInterval) { + Log.i("Loki", "Retrying ${job::class.simpleName}.") + queue.offer(job) + } + return + } + // Regular job failure + job.failureCount += 1 if (job.failureCount >= job.maxFailureCount) { handleJobFailedPermanently(job, error) } else { @@ -148,7 +160,7 @@ class JobQueue : JobDelegate { private fun handleJobFailedPermanently(jobId: String) { val storage = MessagingModuleConfiguration.shared.storage - storage.markJobAsFailed(jobId) + storage.markJobAsFailedPermanently(jobId) } private fun getRetryInterval(job: Job): Long { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 1a0e4e57f..b93aa13dc 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -13,7 +13,7 @@ import org.session.libsignal.utilities.logging.Log class MessageSendJob(val message: Message, val destination: Destination) : Job { - object AwaitingUploadException: Exception("Awaiting attachment upload") + object AwaitingAttachmentUploadException : Exception("Awaiting attachment upload.") override var delegate: JobDelegate? = null override var id: String? = null @@ -50,7 +50,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } } if (attachmentsToUpload.isNotEmpty()) { - this.handleFailure(AwaitingUploadException) + this.handleFailure(AwaitingAttachmentUploadException) return } // Wait for all attachments to upload before continuing } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 41eb261b9..be331891b 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -153,7 +153,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS // Get or create thread val threadID = storage.getOrCreateThreadIdFor(message.syncTarget - ?: message.sender!!, message.groupPublicKey, openGroupID) + ?: message.sender!!, message.groupPublicKey, openGroupID) if (threadID < 0) { // thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread