Fix message send job attachment upload handling

This commit is contained in:
Niels Andriesse 2021-05-13 09:38:39 +10:00
parent eb58872b93
commit 3cab81c329
7 changed files with 29 additions and 17 deletions

View file

@ -189,7 +189,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId)
} }
override fun markJobAsFailed(jobId: String) { override fun markJobAsFailedPermanently(jobId: String) {
DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId)
} }

View file

@ -25,18 +25,18 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
fun persistJob(job: Job) { fun persistJob(job: Job) {
val database = databaseHelper.writableDatabase val database = databaseHelper.writableDatabase
val contentValues = ContentValues(4) val contentValues = ContentValues(4)
contentValues.put(jobID, job.id) contentValues.put(jobID, job.id!!)
contentValues.put(jobType, job.getFactoryKey()) contentValues.put(jobType, job.getFactoryKey())
contentValues.put(failureCount, job.failureCount) contentValues.put(failureCount, job.failureCount)
contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize())) contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize()))
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!)) database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf( job.id!! ))
} }
fun markJobAsSucceeded(jobID: String) { fun markJobAsSucceeded(jobID: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID ))
} }
fun markJobAsFailed(jobID: String) { fun markJobAsFailedPermanently(jobID: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID ))
} }
@ -74,7 +74,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
val database = databaseHelper.readableDatabase val database = databaseHelper.readableDatabase
var cursor: android.database.Cursor? = null var cursor: android.database.Cursor? = null
try { try {
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id!! ))
return cursor == null || !cursor.moveToFirst() return cursor == null || !cursor.moveToFirst()
} catch (e: Exception) { } catch (e: Exception) {
// Do nothing // Do nothing

View file

@ -45,7 +45,7 @@ interface StorageProtocol {
// Jobs // Jobs
fun persistJob(job: Job) fun persistJob(job: Job)
fun markJobAsSucceeded(jobId: String) fun markJobAsSucceeded(jobId: String)
fun markJobAsFailed(jobId: String) fun markJobAsFailedPermanently(jobId: String)
fun getAllPendingJobs(type: String): Map<String,Job?> fun getAllPendingJobs(type: String): Map<String,Job?>
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob?

View file

@ -103,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val messageSendJob = storage.getMessageSendJob(messageSendJobID) val messageSendJob = storage.getMessageSendJob(messageSendJobID)
MessageSender.handleFailedMessageSend(this.message, e) MessageSender.handleFailedMessageSend(this.message, e)
if (messageSendJob != null) { if (messageSendJob != null) {
storage.markJobAsFailed(messageSendJobID) storage.markJobAsFailedPermanently(messageSendJobID)
} }
} }

View file

@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsignal.utilities.logging.Log import org.session.libsignal.utilities.logging.Log
import java.lang.IllegalStateException
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors import java.util.concurrent.Executors
@ -47,16 +48,15 @@ class JobQueue : JobDelegate {
while (isActive) { while (isActive) {
for (job in queue) { for (job in queue) {
when (job) { when (job) {
is NotifyPNServerJob, is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
is AttachmentUploadJob,
is MessageSendJob -> txQueue.send(job)
is AttachmentDownloadJob -> attachmentQueue.send(job) is AttachmentDownloadJob -> attachmentQueue.send(job)
else -> rxQueue.send(job) is MessageReceiveJob -> rxQueue.send(job)
else -> throw IllegalStateException("Unexpected job type.")
} }
} }
} }
// job has been cancelled // The job has been cancelled
receiveJob.cancel() receiveJob.cancel()
txJob.cancel() txJob.cancel()
attachmentJob.cancel() attachmentJob.cancel()
@ -123,11 +123,23 @@ class JobQueue : JobDelegate {
} }
override fun handleJobFailed(job: Job, error: Exception) { override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1 // Canceled
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) { if (storage.isJobCanceled(job)) {
return Log.i("Loki", "${job::class.simpleName} canceled.") return Log.i("Loki", "${job::class.simpleName} canceled.")
} }
// Message send jobs waiting for the attachment to upload
if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) {
val retryInterval: Long = 1000 * 4
Log.i("Loki", "Message send job waiting for attachment upload to finish.")
timer.schedule(delay = retryInterval) {
Log.i("Loki", "Retrying ${job::class.simpleName}.")
queue.offer(job)
}
return
}
// Regular job failure
job.failureCount += 1
if (job.failureCount >= job.maxFailureCount) { if (job.failureCount >= job.maxFailureCount) {
handleJobFailedPermanently(job, error) handleJobFailedPermanently(job, error)
} else { } else {
@ -148,7 +160,7 @@ class JobQueue : JobDelegate {
private fun handleJobFailedPermanently(jobId: String) { private fun handleJobFailedPermanently(jobId: String) {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
storage.markJobAsFailed(jobId) storage.markJobAsFailedPermanently(jobId)
} }
private fun getRetryInterval(job: Job): Long { private fun getRetryInterval(job: Job): Long {

View file

@ -13,7 +13,7 @@ import org.session.libsignal.utilities.logging.Log
class MessageSendJob(val message: Message, val destination: Destination) : Job { class MessageSendJob(val message: Message, val destination: Destination) : Job {
object AwaitingUploadException: Exception("Awaiting attachment upload") object AwaitingAttachmentUploadException : Exception("Awaiting attachment upload.")
override var delegate: JobDelegate? = null override var delegate: JobDelegate? = null
override var id: String? = null override var id: String? = null
@ -50,7 +50,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
} }
} }
if (attachmentsToUpload.isNotEmpty()) { if (attachmentsToUpload.isNotEmpty()) {
this.handleFailure(AwaitingUploadException) this.handleFailure(AwaitingAttachmentUploadException)
return return
} // Wait for all attachments to upload before continuing } // Wait for all attachments to upload before continuing
} }

View file

@ -153,7 +153,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
// Get or create thread // Get or create thread
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
?: message.sender!!, message.groupPublicKey, openGroupID) ?: message.sender!!, message.groupPublicKey, openGroupID)
if (threadID < 0) { if (threadID < 0) {
// thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread // thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread