Minor job type refactoring

This commit is contained in:
nielsandriesse 2021-05-12 16:17:25 +10:00
parent f5238982c3
commit bb850cf99e
11 changed files with 136 additions and 118 deletions

View File

@ -75,6 +75,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
val userPublicKey = TextSecurePreferences.getLocalNumber(context)!!
val privateChatsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes ->
envelopes.map { envelope ->
// FIXME: Using a job here seems like a bad idea...
MessageReceiveJob(envelope.toByteArray(), false).executeAsync()
}
}

View File

@ -18,7 +18,8 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
const val jobType = "job_type"
const val failureCount = "failure_count"
const val serializedData = "serialized_data"
@JvmStatic val createSessionJobTableCommand = "CREATE TABLE $sessionJobTable ($jobID INTEGER PRIMARY KEY, $jobType STRING, $failureCount INTEGER DEFAULT 0, $serializedData TEXT);"
@JvmStatic val createSessionJobTableCommand
= "CREATE TABLE $sessionJobTable ($jobID INTEGER PRIMARY KEY, $jobType STRING, $failureCount INTEGER DEFAULT 0, $serializedData TEXT);"
}
fun persistJob(job: Job) {
@ -31,40 +32,41 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID))
}
fun markJobAsSucceeded(jobId: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId))
fun markJobAsSucceeded(jobID: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID ))
}
fun markJobAsFailed(jobId: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId))
fun markJobAsFailed(jobID: String) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID ))
}
fun getAllPendingJobs(type: String): Map<String, Job?> {
val database = databaseHelper.readableDatabase
return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor ->
val jobId = cursor.getString(jobID)
return database.getAll(sessionJobTable, "$jobType = ?", arrayOf( type )) { cursor ->
val jobID = cursor.getString(jobID)
try {
jobId to jobFromCursor(cursor)
jobID to jobFromCursor(cursor)
} catch (e: Exception) {
Log.e("Loki", "Error serializing Job of type $type",e)
jobId to null
Log.e("Loki", "Error deserializing job of type: $type.", e)
jobID to null
}
}.toMap()
}
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? {
val database = databaseHelper.readableDatabase
var result = mutableListOf<AttachmentUploadJob>()
database.getAll(sessionJobTable, "$jobType = ?", arrayOf(AttachmentUploadJob.KEY)) { cursor ->
result.add(jobFromCursor(cursor) as AttachmentUploadJob)
val result = mutableListOf<AttachmentUploadJob>()
database.getAll(sessionJobTable, "$jobType = ?", arrayOf( AttachmentUploadJob.KEY )) { cursor ->
val job = jobFromCursor(cursor) as AttachmentUploadJob?
if (job != null) { result.add(job) }
}
return result.firstOrNull { job -> job.attachmentID == attachmentID }
}
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? {
val database = databaseHelper.readableDatabase
return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf(messageSendJobID, MessageSendJob.KEY)) { cursor ->
jobFromCursor(cursor) as MessageSendJob
return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf( messageSendJobID, MessageSendJob.KEY )) { cursor ->
jobFromCursor(cursor) as MessageSendJob?
}
}
@ -72,7 +74,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
val database = databaseHelper.readableDatabase
var cursor: android.database.Cursor? = null
try {
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf(job.id))
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id ))
return cursor != null && cursor.moveToFirst()
} catch (e: Exception) {
// Do nothing
@ -82,10 +84,10 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
return false
}
private fun jobFromCursor(cursor: Cursor): Job {
private fun jobFromCursor(cursor: Cursor): Job? {
val type = cursor.getString(jobType)
val data = SessionJobHelper.dataSerializer.deserialize(cursor.getString(serializedData))
val job = SessionJobHelper.sessionJobInstantiator.instantiate(type, data)
val job = SessionJobHelper.sessionJobInstantiator.instantiate(type, data) ?: return null
job.id = cursor.getString(jobID)
job.failureCount = cursor.getInt(failureCount)
return job

View File

@ -3,7 +3,6 @@ package org.session.libsession.messaging.jobs
import okhttp3.HttpUrl
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.file_server.FileServerAPI
import org.session.libsession.messaging.file_server.FileServerAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.utilities.DotNetAPI
@ -31,8 +30,8 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
val KEY: String = "AttachmentDownloadJob"
// Keys used for database storage
private val KEY_ATTACHMENT_ID = "attachment_id"
private val KEY_TS_INCOMING_MESSAGE_ID = "tsIncoming_message_id"
private val ATTACHMENT_ID_KEY = "attachment_id"
private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id"
}
override fun execute() {
@ -52,18 +51,19 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
try {
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val attachment = messageDataProvider.getDatabaseAttachment(attachmentID)
?: return handleFailure(Error.NoAttachment)
?: return handleFailure(Error.NoAttachment)
messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID)
val tempFile = createTempFile()
val threadId = MessagingModuleConfiguration.shared.storage.getThreadIdForMms(databaseMessageID)
val openGroupV2 = MessagingModuleConfiguration.shared.storage.getV2OpenGroup(threadId.toString())
val stream = if (openGroupV2 == null) {
DownloadUtilities.downloadFile(tempFile, attachment.url, FileServerAPI.maxFileSize, null)
// Assume we're retrieving an attachment for an open group server if the digest is not set
if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile)
else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) {
FileInputStream(tempFile)
} else {
AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
}
} else {
val url = HttpUrl.parse(attachment.url)!!
val fileId = url.pathSegments().last()
@ -100,8 +100,9 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
override fun serialize(): Data {
return Data.Builder().putLong(KEY_ATTACHMENT_ID, attachmentID)
.putLong(KEY_TS_INCOMING_MESSAGE_ID, databaseMessageID)
return Data.Builder()
.putLong(ATTACHMENT_ID_KEY, attachmentID)
.putLong(TS_INCOMING_MESSAGE_ID_KEY, databaseMessageID)
.build();
}
@ -110,8 +111,9 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
class Factory : Job.Factory<AttachmentDownloadJob> {
override fun create(data: Data): AttachmentDownloadJob {
return AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID))
return AttachmentDownloadJob(data.getLong(ATTACHMENT_ID_KEY), data.getLong(TS_INCOMING_MESSAGE_ID_KEY))
}
}
}

View File

@ -30,44 +30,39 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
// Settings
override val maxFailureCount: Int = 20
companion object {
val TAG = AttachmentUploadJob::class.simpleName
val KEY: String = "AttachmentUploadJob"
// Keys used for database storage
private val KEY_ATTACHMENT_ID = "attachment_id"
private val KEY_THREAD_ID = "thread_id"
private val KEY_MESSAGE = "message"
private val KEY_MESSAGE_SEND_JOB_ID = "message_send_job_id"
private val ATTACHMENT_ID_KEY = "attachment_id"
private val THREAD_ID_KEY = "thread_id"
private val MESSAGE_KEY = "message"
private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id"
}
override fun execute() {
try {
val attachment = MessagingModuleConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return handleFailure(Error.NoAttachment)
val usePadding = false
val openGroupV2 = MessagingModuleConfiguration.shared.storage.getV2OpenGroup(threadID)
val openGroup = MessagingModuleConfiguration.shared.storage.getOpenGroup(threadID)
val server = openGroup?.let {
it.server
} ?: openGroupV2?.let {
it.server
} ?: FileServerAPI.shared.server
val server = openGroupV2?.server ?: openGroup?.server ?: FileServerAPI.shared.server
val shouldEncrypt = (openGroup == null && openGroupV2 == null) // Encrypt if this isn't an open group
val attachmentKey = Util.getSecretBytes(64)
val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length
val dataStream = if (usePadding) PaddingInputStream(attachment.inputStream, attachment.length) else attachment.inputStream
val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(paddedLength) else attachment.length
val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory()
val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener)
val uploadResult = if (openGroupV2 == null) FileServerAPI.shared.uploadAttachment(server, attachmentData) else {
val uploadResult = if (openGroupV2 != null) {
val dataBytes = attachmentData.data.readBytes()
val result = OpenGroupAPIV2.upload(dataBytes, openGroupV2.room, openGroupV2.server).get()
DotNetAPI.UploadResult(result, "${openGroupV2.server}/files/$result", byteArrayOf())
} else {
FileServerAPI.shared.uploadAttachment(server, attachmentData)
}
handleSuccess(attachment, attachmentKey, uploadResult)
} catch (e: java.lang.Exception) {
@ -82,7 +77,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
}
private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult) {
Log.w(TAG, "Attachment uploaded successfully.")
Log.d(TAG, "Attachment uploaded successfully.")
delegate?.handleJobSucceeded(this)
MessagingModuleConfiguration.shared.messageDataProvider.updateAttachmentAfterUploadSucceeded(attachmentID, attachment, attachmentKey, uploadResult)
MessagingModuleConfiguration.shared.storage.resumeMessageSendJobIfNeeded(messageSendJobID)
@ -119,10 +114,11 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val output = Output(serializedMessage)
kryo.writeObject(output, message)
output.close()
return Data.Builder().putLong(KEY_ATTACHMENT_ID, attachmentID)
.putString(KEY_THREAD_ID, threadID)
.putByteArray(KEY_MESSAGE, serializedMessage)
.putString(KEY_MESSAGE_SEND_JOB_ID, messageSendJobID)
return Data.Builder()
.putLong(ATTACHMENT_ID_KEY, attachmentID)
.putString(THREAD_ID_KEY, threadID)
.putByteArray(MESSAGE_KEY, serializedMessage)
.putString(MESSAGE_SEND_JOB_ID_KEY, messageSendJobID)
.build();
}
@ -133,12 +129,17 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
class Factory: Job.Factory<AttachmentUploadJob> {
override fun create(data: Data): AttachmentUploadJob {
val serializedMessage = data.getByteArray(KEY_MESSAGE)
val serializedMessage = data.getByteArray(MESSAGE_KEY)
val kryo = Kryo()
val input = Input(serializedMessage)
val message: Message = kryo.readObject(input, Message::class.java)
input.close()
return AttachmentUploadJob(data.getLong(KEY_ATTACHMENT_ID), data.getString(KEY_THREAD_ID)!!, message, data.getString(KEY_MESSAGE_SEND_JOB_ID)!!)
return AttachmentUploadJob(
data.getLong(ATTACHMENT_ID_KEY),
data.getString(THREAD_ID_KEY)!!,
message,
data.getString(MESSAGE_SEND_JOB_ID_KEY)!!
)
}
}
}

View File

@ -8,21 +8,20 @@ interface Job {
val maxFailureCount: Int
companion object {
// Keys used for database storage
private val KEY_ID = "id"
private val KEY_FAILURE_COUNT = "failure_count"
private val ID_KEY = "id"
private val FAILURE_COUNT_KEY = "failure_count"
}
fun execute()
fun serialize(): Data
/**
* Returns the key that can be used to find the relevant factory needed to create your job.
*/
fun getFactoryKey(): String
interface Factory<T : Job> {
fun create(data: Data): T
}
}

View File

@ -44,14 +44,15 @@ class JobQueue : JobDelegate {
}
companion object {
@JvmStatic
val shared: JobQueue by lazy { JobQueue() }
}
private fun Job.canExecuteParallel(): Boolean {
return this.javaClass in arrayOf(
AttachmentUploadJob::class.java,
AttachmentDownloadJob::class.java
AttachmentUploadJob::class.java,
AttachmentDownloadJob::class.java
)
}
@ -68,7 +69,6 @@ class JobQueue : JobDelegate {
val currentTime = System.currentTimeMillis()
jobTimestampMap.putIfAbsent(currentTime, AtomicInteger())
job.id = currentTime.toString() + jobTimestampMap[currentTime]!!.getAndIncrement().toString()
MessagingModuleConfiguration.shared.storage.persistJob(job)
}
@ -78,25 +78,26 @@ class JobQueue : JobDelegate {
return
}
hasResumedPendingJobs = true
val allJobTypes = listOf(AttachmentUploadJob.KEY,
AttachmentDownloadJob.KEY,
MessageReceiveJob.KEY,
MessageSendJob.KEY,
NotifyPNServerJob.KEY
val allJobTypes = listOf(
AttachmentUploadJob.KEY,
AttachmentDownloadJob.KEY,
MessageReceiveJob.KEY,
MessageSendJob.KEY,
NotifyPNServerJob.KEY
)
allJobTypes.forEach { type ->
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
val pendingJobs = mutableListOf<Job>()
for ((id, job) in allPendingJobs) {
if (job == null) {
// job failed to serialize, remove it from the DB
// Job failed to deserialize, remove it from the DB
handleJobFailedPermanently(id)
} else {
pendingJobs.add(job)
}
}
pendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.")
queue.offer(job) // Offer always called on unlimited capacity
}
}
@ -110,15 +111,15 @@ class JobQueue : JobDelegate {
override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1
val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")}
if (job.failureCount == job.maxFailureCount) {
handleJobFailedPermanently(job, error)
} else {
storage.persistJob(job)
val retryInterval = getRetryInterval(job)
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
Log.i("Loki", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
timer.schedule(delay = retryInterval) {
Log.i("Jobs", "Retrying ${job::class.simpleName}.")
Log.i("Loki", "Retrying ${job::class.simpleName}.")
queue.offer(job)
}
}

View File

@ -11,7 +11,6 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 10
companion object {
val TAG = MessageReceiveJob::class.simpleName
@ -20,10 +19,11 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
private val RECEIVE_LOCK = Object()
// Keys used for database storage
private val KEY_DATA = "data"
private val KEY_IS_BACKGROUND_POLL = "is_background_poll"
private val KEY_OPEN_GROUP_MESSAGE_SERVER_ID = "openGroupMessageServerID"
private val KEY_OPEN_GROUP_ID = "open_group_id"
private val DATA_KEY = "data"
// FIXME: We probably shouldn't be using this job when background polling
private val IS_BACKGROUND_POLL_KEY = "is_background_poll"
private val OPEN_GROUP_MESSAGE_SERVER_ID_KEY = "openGroupMessageServerID"
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
override fun execute() {
@ -35,19 +35,18 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
try {
val isRetry: Boolean = failureCount != 0
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, isRetry)
synchronized(RECEIVE_LOCK) {
synchronized(RECEIVE_LOCK) { // FIXME: Do we need this?
MessageReceiver.handle(message, proto, this.openGroupID)
}
this.handleSuccess()
deferred.resolve(Unit)
} catch (e: Exception) {
Log.e(TAG, "Couldn't receive message due to error", e)
val error = e as? MessageReceiver.Error
if (error != null && !error.isRetryable) {
Log.e("Loki", "Message receive job permanently failed due to error", e)
this.handlePermanentFailure(error)
Log.e(TAG, "Couldn't receive message.", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e("Loki", "Message receive job permanently failed.", e)
this.handlePermanentFailure(e)
} else {
Log.e("Loki", "Couldn't receive message due to error", e)
Log.e("Loki", "Couldn't receive message.", e)
this.handleFailure(e)
}
deferred.resolve(Unit) // The promise is just used to keep track of when we're done
@ -68,10 +67,10 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
}
override fun serialize(): Data {
val builder = Data.Builder().putByteArray(KEY_DATA, data)
.putBoolean(KEY_IS_BACKGROUND_POLL, isBackgroundPoll)
openGroupMessageServerID?.let { builder.putLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID, openGroupMessageServerID) }
openGroupID?.let { builder.putString(KEY_OPEN_GROUP_ID, openGroupID) }
val builder = Data.Builder().putByteArray(DATA_KEY, data)
.putBoolean(IS_BACKGROUND_POLL_KEY, isBackgroundPoll)
openGroupMessageServerID?.let { builder.putLong(OPEN_GROUP_MESSAGE_SERVER_ID_KEY, it) }
openGroupID?.let { builder.putString(OPEN_GROUP_ID_KEY, it) }
return builder.build();
}
@ -82,7 +81,12 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
class Factory: Job.Factory<MessageReceiveJob> {
override fun create(data: Data): MessageReceiveJob {
return MessageReceiveJob(data.getByteArray(KEY_DATA), data.getBoolean(KEY_IS_BACKGROUND_POLL), data.getLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID), data.getString(KEY_OPEN_GROUP_ID))
return MessageReceiveJob(
data.getByteArray(DATA_KEY),
data.getBoolean(IS_BACKGROUND_POLL_KEY),
data.getLong(OPEN_GROUP_MESSAGE_SERVER_ID_KEY),
data.getString(OPEN_GROUP_ID_KEY)
)
}
}
}

View File

@ -15,22 +15,22 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 10
companion object {
val TAG = MessageSendJob::class.simpleName
val KEY: String = "MessageSendJob"
// Keys used for database storage
private val KEY_MESSAGE = "message"
private val KEY_DESTINATION = "destination"
private val MESSAGE_KEY = "message"
private val DESTINATION_KEY = "destination"
}
override fun execute() {
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val message = message as? VisibleMessage
message?.let {
if(!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted
if (message != null) {
if (!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted
val attachmentIDs = mutableListOf<Long>()
attachmentIDs.addAll(message.attachmentIDs)
message.quote?.let { it.attachmentID?.let { attachmentID -> attachmentIDs.add(attachmentID) } }
@ -51,9 +51,8 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
this.handleSuccess()
}.fail { exception ->
Log.e(TAG, "Couldn't send message due to error: $exception.")
val e = exception as? MessageSender.Error
e?.let {
if (!e.isRetryable) this.handlePermanentFailure(e)
if (exception is MessageSender.Error) {
if (!exception.isRetryable) { this.handlePermanentFailure(exception) }
}
this.handleFailure(exception)
}
@ -70,8 +69,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
private fun handleFailure(error: Exception) {
Log.w(TAG, "Failed to send $message::class.simpleName.")
val message = message as? VisibleMessage
message?.let {
if(!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted
if (message != null) {
if (!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) {
return // The message has been deleted
}
}
delegate?.handleJobFailed(this, error)
}
@ -80,34 +81,42 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
val kryo = Kryo()
kryo.isRegistrationRequired = false
val output = Output(ByteArray(4096), 10_000_000)
// Message
kryo.writeClassAndObject(output, message)
output.close()
val serializedMessage = output.toBytes()
output.clear()
// Destination
kryo.writeClassAndObject(output, destination)
output.close()
val serializedDestination = output.toBytes()
return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage)
.putByteArray(KEY_DESTINATION, serializedDestination)
.build();
output.clear()
// Serialize
return Data.Builder()
.putByteArray(MESSAGE_KEY, serializedMessage)
.putByteArray(DESTINATION_KEY, serializedDestination)
.build()
}
override fun getFactoryKey(): String {
return KEY
}
class Factory: Job.Factory<MessageSendJob> {
class Factory : Job.Factory<MessageSendJob> {
override fun create(data: Data): MessageSendJob {
val serializedMessage = data.getByteArray(KEY_MESSAGE)
val serializedDestination = data.getByteArray(KEY_DESTINATION)
val serializedMessage = data.getByteArray(MESSAGE_KEY)
val serializedDestination = data.getByteArray(DESTINATION_KEY)
val kryo = Kryo()
var input = Input(serializedMessage)
val message = kryo.readClassAndObject(input) as Message
input.close()
input = Input(serializedDestination)
val destination = kryo.readClassAndObject(input) as Destination
input.close()
// Message
val messageInput = Input(serializedMessage)
val message = kryo.readClassAndObject(messageInput) as Message
messageInput.close()
// Destination
val destinationInput = Input(serializedDestination)
val destination = kryo.readClassAndObject(destinationInput) as Destination
destinationInput.close()
// Return
return MessageSendJob(message, destination)
}
}

View File

@ -21,16 +21,14 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 20
companion object {
val KEY: String = "NotifyPNServerJob"
// Keys used for database storage
private val KEY_MESSAGE = "message"
private val MESSAGE_KEY = "message"
}
// Running
override fun execute() {
val server = PushNotificationAPI.server
val parameters = mapOf( "data" to message.data, "send_to" to message.recipient )
@ -41,10 +39,10 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
OnionRequestAPI.sendOnionRequest(request.build(), server, PushNotificationAPI.serverPublicKey, "/loki/v2/lsrpc").map { json ->
val code = json["code"] as? Int
if (code == null || code == 0) {
Log.d("Loki", "[Loki] Couldn't notify PN server due to error: ${json["message"] as? String ?: "null"}.")
Log.d("Loki", "Couldn't notify PN server due to error: ${json["message"] as? String ?: "null"}.")
}
}.fail { exception ->
Log.d("Loki", "[Loki] Couldn't notify PN server due to error: $exception.")
Log.d("Loki", "Couldn't notify PN server due to error: $exception.")
}
}.success {
handleSuccess()
@ -68,17 +66,17 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
val output = Output(serializedMessage)
kryo.writeObject(output, message)
output.close()
return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage).build();
return Data.Builder().putByteArray(MESSAGE_KEY, serializedMessage).build();
}
override fun getFactoryKey(): String {
return KEY
}
class Factory: Job.Factory<NotifyPNServerJob> {
class Factory : Job.Factory<NotifyPNServerJob> {
override fun create(data: Data): NotifyPNServerJob {
val serializedMessage = data.getByteArray(KEY_MESSAGE)
val serializedMessage = data.getByteArray(MESSAGE_KEY)
val kryo = Kryo()
val input = Input(serializedMessage)
val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java)

View File

@ -2,11 +2,11 @@ package org.session.libsession.messaging.jobs
class SessionJobInstantiator(private val jobFactories: Map<String, Job.Factory<out Job>>) {
fun instantiate(jobFactoryKey: String, data: Data): Job {
fun instantiate(jobFactoryKey: String, data: Data): Job? {
if (jobFactories.containsKey(jobFactoryKey)) {
return jobFactories[jobFactoryKey]?.create(data) ?: throw IllegalStateException("Tried to instantiate a job with key '$jobFactoryKey', but no matching factory was found.")
return jobFactories[jobFactoryKey]?.create(data)
} else {
throw IllegalStateException("Tried to instantiate a job with key '$jobFactoryKey', but no matching factory was found.")
return null
}
}
}

View File

@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs
class SessionJobManagerFactories {
companion object {
fun getSessionJobFactories(): Map<String, Job.Factory<out Job>> {
return mapOf(
AttachmentDownloadJob.KEY to AttachmentDownloadJob.Factory(),