refactor: fix the closed group message sending and remove sig_timestamp

This commit is contained in:
0x330a 2023-10-05 17:14:32 +11:00
parent 0f7b17b384
commit c013a276f1
No known key found for this signature in database
GPG Key ID: 267811D6E6A2698C
10 changed files with 212 additions and 131 deletions

View File

@ -238,7 +238,7 @@ class ConfigFactory(
val (userSk, _) = maybeGetUserInfo() ?: return null
GroupKeysConfig.newInstance(
userSk,
Hex.fromStringCondensed(groupSessionId.hexString()),
Hex.fromStringCondensed(groupSessionId.publicKey),
sk,
info = info,
members = members

View File

@ -32,22 +32,35 @@ import org.thoughtcrime.securesms.database.GroupDatabase
import org.thoughtcrime.securesms.database.ThreadDatabase
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
import java.util.Timer
import java.util.concurrent.ConcurrentLinkedDeque
object ConfigurationMessageUtilities {
private val debouncer = WindowDebouncer(3000, Timer())
private val destinationUpdater = Any()
private val pendingDestinations = ConcurrentLinkedDeque<Destination>()
private fun scheduleConfigSync(destination: Destination) {
synchronized(destinationUpdater) {
pendingDestinations.add(destination)
}
debouncer.publish {
// don't schedule job if we already have one
val storage = MessagingModuleConfiguration.shared.storage
val currentStorageJob = storage.getConfigSyncJob(destination)
if (currentStorageJob != null) {
(currentStorageJob as ConfigurationSyncJob).shouldRunAgain.set(true)
return@publish
val destinations = synchronized(destinationUpdater) {
val objects = pendingDestinations.toList()
pendingDestinations.clear()
objects
}
destinations.forEach { destination ->
val currentStorageJob = storage.getConfigSyncJob(destination)
if (currentStorageJob != null) {
(currentStorageJob as ConfigurationSyncJob).shouldRunAgain.set(true)
return@publish
}
val newConfigSync = ConfigurationSyncJob(destination)
JobQueue.shared.add(newConfigSync)
}
val newConfigSync = ConfigurationSyncJob(destination)
JobQueue.shared.add(newConfigSync)
}
}

View File

@ -42,25 +42,17 @@ add_library( # Sets the name of the library.
# Provides a relative path to your source file(s).
${SOURCES})
if (LINUX)
message("Linux machine detected")
set(JAVA_INCLUDE_PATH "$ENV{JAVA_HOME}/include;$ENV{JAVA_HOME}/include/linux")
find_package(JNI REQUIRED)
include_directories(${JAVA_INCLUDE_PATH})
endif()
# Searches for a specified prebuilt library and stores the path as a
# variable. Because CMake includes system libraries in the search path by
# default, you only need to specify the name of the public NDK library
# you want to add. CMake verifies that the library exists before
# completing its build.
#find_library( # Sets the name of the path variable.
# log-lib
#
# # Specifies the name of the NDK library that
# # you want CMake to locate.
# log)
find_library( # Sets the name of the path variable.
log-lib
# Specifies the name of the NDK library that
# you want CMake to locate.
log)
# Specifies libraries CMake should link to your target library. You
# can link multiple libraries, such as libraries you define in this
@ -73,5 +65,4 @@ target_link_libraries( # Specifies the target library.
libsession::crypto
# Links the target library to the log library
# included in the NDK.
)
#${log-lib})
${log-lib})

View File

@ -1,6 +1,7 @@
#include "util.h"
//#include <sodium/crypto_sign.h>
#include "../../../libsession-util/external/libsodium-internal/src/libsodium/include/sodium/crypto_sign.h"
#include <string>
#include <sodium/crypto_sign.h>
namespace util {
@ -334,6 +335,12 @@ Java_org_session_libsignal_utilities_Namespace_ENCRYPTION_1KEYS(JNIEnv *env, job
return (int) session::config::Namespace::GroupKeys;
}
extern "C"
JNIEXPORT jint JNICALL
Java_org_session_libsignal_utilities_Namespace_CLOSED_1GROUP_1MESSAGES(JNIEnv *env, jobject thiz) {
return (int) session::config::Namespace::GroupMessages;
}
extern "C"
JNIEXPORT void JNICALL
Java_network_loki_messenger_libsession_1util_Config_free(JNIEnv *env, jobject thiz) {

View File

@ -21,11 +21,13 @@ import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.SessionId
import java.util.concurrent.atomic.AtomicBoolean
class InvalidDestination: Exception("Trying to push configs somewhere other than our swarm or a closed group")
class InvalidContactDestination: Exception("Trying to push to non-user config swarm")
class InvalidDestination :
Exception("Trying to push configs somewhere other than our swarm or a closed group")
class InvalidContactDestination : Exception("Trying to push to non-user config swarm")
// only contact (self) and closed group destinations will be supported
data class ConfigurationSyncJob(val destination: Destination): Job {
data class ConfigurationSyncJob(val destination: Destination) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
@ -34,65 +36,106 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
val shouldRunAgain = AtomicBoolean(false)
data class ConfigMessageInformation(val batch: SnodeBatchRequestInfo, val config: Config, val seqNo: Long?) // seqNo will be null for keys type
data class ConfigMessageInformation(
val batch: SnodeBatchRequestInfo,
val config: Config,
val seqNo: Long?
) // seqNo will be null for keys type
data class SyncInformation(val configs: List<ConfigMessageInformation>, val toDelete: List<String>)
data class SyncInformation(
val configs: List<ConfigMessageInformation>,
val toDelete: List<String>
)
private fun destinationConfigs(delegate: JobDelegate,
dispatcherName: String,
configFactoryProtocol: ConfigFactoryProtocol): SyncInformation {
private fun destinationConfigs(
delegate: JobDelegate,
dispatcherName: String,
configFactoryProtocol: ConfigFactoryProtocol
): SyncInformation {
val toDelete = mutableListOf<String>()
val configsRequiringPush = if (destination is Destination.ClosedGroup) {
val sentTimestamp = SnodeAPI.nowWithOffset
// destination is a closed group, get all configs requiring push here
val groupId = SessionId.from(destination.publicKey)
val configsRequiringPush =
if (destination is Destination.ClosedGroup) {
val sentTimestamp = SnodeAPI.nowWithOffset
// destination is a closed group, get all configs requiring push here
val groupId = SessionId.from(destination.publicKey)
val signingKey = configFactoryProtocol.userGroups!!.getClosedGroup(destination.publicKey)!!.signingKey()
val signingKey =
configFactoryProtocol.userGroups!!.getClosedGroup(
destination.publicKey
)!!
.signingKey()
val keys = configFactoryProtocol.getGroupKeysConfig(groupId)!!
val info = configFactoryProtocol.getGroupInfoConfig(groupId)!!
val members = configFactoryProtocol.getGroupMemberConfig(groupId)!!
val keys = configFactoryProtocol.getGroupKeysConfig(groupId)!!
val info = configFactoryProtocol.getGroupInfoConfig(groupId)!!
val members = configFactoryProtocol.getGroupMemberConfig(groupId)!!
val requiringPush = listOf(keys, info, members).filter {
when (it) {
is GroupKeysConfig -> it.pendingConfig()?.isNotEmpty() == true
is ConfigBase -> it.needsPush()
else -> false
val requiringPush =
listOf(keys, info, members).filter {
when (it) {
is GroupKeysConfig -> it.pendingConfig()?.isNotEmpty() == true
is ConfigBase -> it.needsPush()
else -> false
}
}
// free the objects that were created but won't be used after this point
// in case any of the configs don't need pushing, they won't be freed later
(listOf(keys, info, members) subtract requiringPush).forEach(Config::free)
requiringPush.map { config ->
val (push, seqNo, obsoleteHashes) =
if (config is GroupKeysConfig) {
ConfigPush(
config.pendingConfig()!!,
0,
emptyList()
) // should not be null from filter step previous
} else if (config is ConfigBase) {
config.push()
} else
throw IllegalArgumentException(
"Got a non group keys or config base object for config sync"
)
toDelete += obsoleteHashes
val message =
SnodeMessage(
destination.publicKey,
Base64.encodeBytes(push),
SnodeMessage.CONFIG_TTL,
sentTimestamp
)
ConfigMessageInformation(
SnodeAPI.buildAuthenticatedStoreBatchInfo(
config.namespace(),
message,
signingKey
),
config,
seqNo
)
}
} else {
// assume our own user as check already takes place in `execute` for our own key
// if contact
configFactoryProtocol.getUserConfigs().filter { it.needsPush() }.map { config ->
val (bytes, seqNo, obsoleteHashes) = config.push()
toDelete += obsoleteHashes
val message =
messageForConfig(config, bytes, seqNo)
?: throw NullPointerException(
"SnodeBatchRequest message was null, check group keys exists"
)
ConfigMessageInformation(message, config, seqNo)
}
}
}
// free the objects that were created but won't be used after this point
// in case any of the configs don't need pushing, they won't be freed later
(listOf(keys,info,members) subtract requiringPush).forEach(Config::free)
requiringPush.map { config ->
val (push, seqNo, obsoleteHashes) = if (config is GroupKeysConfig) {
ConfigPush(config.pendingConfig()!!, 0, emptyList()) // should not be null from filter step previous
} else if (config is ConfigBase) {
config.push()
} else throw IllegalArgumentException("Got a non group keys or config base object for config sync")
toDelete += obsoleteHashes
val message = SnodeMessage(destination.publicKey, Base64.encodeBytes(push), SnodeMessage.CONFIG_TTL, sentTimestamp)
ConfigMessageInformation(SnodeAPI.buildAuthenticatedStoreBatchInfo(config.namespace(), message, signingKey), config, seqNo)
}
} else {
// assume our own user as check already takes place in `execute` for our own key if contact
configFactoryProtocol.getUserConfigs().filter { it.needsPush() }.map { config ->
val (bytes, seqNo, obsoleteHashes) = config.push()
toDelete += obsoleteHashes
val message = messageForConfig(config, bytes, seqNo)
?: throw NullPointerException("SnodeBatchRequest message was null, check group keys exists")
ConfigMessageInformation(message, config, seqNo)
}
}
return SyncInformation(configsRequiringPush, toDelete)
}
private fun messageForConfig(
config: ConfigBase,
bytes: ByteArray,
seqNo: Long
config: ConfigBase,
bytes: ByteArray,
seqNo: Long
): SnodeBatchRequestInfo? {
val message = SharedConfigurationMessage(config.protoKindFor(), bytes, seqNo)
val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true)
@ -104,22 +147,32 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
val userPublicKey = storage.getUserPublicKey()
val delegate = delegate ?: return Log.e("ConfigurationSyncJob", "No Delegate")
if (destination !is Destination.ClosedGroup && (destination !is Destination.Contact || destination.publicKey != userPublicKey)) {
if (destination !is Destination.ClosedGroup &&
(destination !is Destination.Contact ||
destination.publicKey != userPublicKey)
) {
return delegate.handleJobFailedPermanently(this, dispatcherName, InvalidDestination())
}
// configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc
// configFactory singleton instance will come in handy for modifying hashes and fetching
// configs for namespace etc
val configFactory = MessagingModuleConfiguration.shared.configFactory
// allow null results here so the list index matches configsRequiringPush
val (batchObjects, toDeleteHashes) = destinationConfigs(delegate, dispatcherName, configFactory)
val (batchObjects, toDeleteHashes) =
destinationConfigs(delegate, dispatcherName, configFactory)
if (batchObjects.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName)
val toDeleteRequest = toDeleteHashes.let { toDeleteFromAllNamespaces ->
if (toDeleteFromAllNamespaces.isEmpty()) null
else SnodeAPI.buildAuthenticatedDeleteBatchInfo(destination.destinationPublicKey(), toDeleteFromAllNamespaces)
}
val toDeleteRequest =
toDeleteHashes.let { toDeleteFromAllNamespaces ->
if (toDeleteFromAllNamespaces.isEmpty()) null
else
SnodeAPI.buildAuthenticatedDeleteBatchInfo(
destination.destinationPublicKey(),
toDeleteFromAllNamespaces
)
}
val allRequests = mutableListOf<SnodeBatchRequestInfo>()
allRequests += batchObjects.map { (request) -> request }
@ -129,14 +182,15 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
Log.d(TAG, "Including delete request for current hashes")
}
val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
val batchResponse =
SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
try {
val rawResponses = batchResponse.get()
@ -147,25 +201,32 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
batchObjects.forEachIndexed { index, (message, config, seqNo) ->
val response = responseList[index]
val responseBody = response["body"] as? RawResponse
val insertHash = responseBody?.get("hash") as? String ?: run {
Log.w(TAG, "No hash returned for the configuration in namespace ${config.namespace()}")
return@forEachIndexed
}
val insertHash =
responseBody?.get("hash") as? String
?: run {
Log.w(
TAG,
"No hash returned for the configuration in namespace ${config.namespace()}"
)
return@forEachIndexed
}
Log.d(TAG, "Hash ${insertHash.take(4)} returned from store request for new config")
// confirm pushed seqno
if (config is ConfigBase) {
seqNo?.let {
config.confirmPushed(it, insertHash)
}
seqNo?.let { config.confirmPushed(it, insertHash) }
}
Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
Log.d(
TAG,
"Successfully removed the deleted hashes from ${config.javaClass.simpleName}"
)
// dump and write config after successful
if (config is ConfigBase && config.needsDump()) { // usually this will be true?
configFactory.persist(config, (message.params["timestamp"] as String).toLong())
} else if (config is GroupKeysConfig && config.needsDump()) {
Log.d("Loki", "Should persist the GroupKeysConfig")
}
if (destination is Destination.ClosedGroup) {
config.free() // after they are used, free the temporary group configs
}
@ -181,22 +242,24 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
}
}
fun Destination.destinationPublicKey(): String = when (this) {
is Destination.Contact -> publicKey
is Destination.ClosedGroup -> publicKey
else -> throw NullPointerException("Not public key for this destination")
}
fun Destination.destinationPublicKey(): String =
when (this) {
is Destination.Contact -> publicKey
is Destination.ClosedGroup -> publicKey
else -> throw NullPointerException("Not public key for this destination")
}
override fun serialize(): Data {
val (type, address) = when (destination) {
is Destination.Contact -> CONTACT_TYPE to destination.publicKey
is Destination.ClosedGroup -> GROUP_TYPE to destination.publicKey
else -> return Data.EMPTY
}
val (type, address) =
when (destination) {
is Destination.Contact -> CONTACT_TYPE to destination.publicKey
is Destination.ClosedGroup -> GROUP_TYPE to destination.publicKey
else -> return Data.EMPTY
}
return Data.Builder()
.putInt(DESTINATION_TYPE_KEY, type)
.putString(DESTINATION_ADDRESS_KEY, address)
.build()
.putInt(DESTINATION_TYPE_KEY, type)
.putString(DESTINATION_ADDRESS_KEY, address)
.build()
}
override fun getFactoryKey(): String = KEY
@ -212,21 +275,23 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
// type mappings
const val CONTACT_TYPE = 1
const val GROUP_TYPE = 2
}
class Factory: Job.Factory<ConfigurationSyncJob> {
class Factory : Job.Factory<ConfigurationSyncJob> {
override fun create(data: Data): ConfigurationSyncJob? {
if (!data.hasInt(DESTINATION_TYPE_KEY) || !data.hasString(DESTINATION_ADDRESS_KEY)) return null
if (!data.hasInt(DESTINATION_TYPE_KEY) || !data.hasString(DESTINATION_ADDRESS_KEY))
return null
val address = data.getString(DESTINATION_ADDRESS_KEY)
val destination = when (data.getInt(DESTINATION_TYPE_KEY)) {
CONTACT_TYPE -> Destination.Contact(address)
GROUP_TYPE -> Destination.ClosedGroup(address)
else -> return null
}
val destination =
when (data.getInt(DESTINATION_TYPE_KEY)) {
CONTACT_TYPE -> Destination.Contact(address)
GROUP_TYPE -> Destination.ClosedGroup(address)
else -> return null
}
return ConfigurationSyncJob(destination)
}
}
}
}

View File

@ -97,9 +97,11 @@ object MessageReceiver {
if (sessionId.prefix == IdPrefix.GROUP) {
val configFactory = MessagingModuleConfiguration.shared.configFactory
configFactory.getGroupKeysConfig(sessionId)?.use { config ->
plaintext = config.decrypt(ciphertext.toByteArray())
sender = userPublicKey
groupPublicKey = envelope.source
config.decrypt(ciphertext.toByteArray())?.let { (decrypted, senderSessionId) ->
plaintext = decrypted
sender = senderSessionId.hexString()
groupPublicKey = envelope.source
}
}
if (plaintext == null) {
throw Error.DecryptionFailed

View File

@ -138,7 +138,7 @@ object MessageSender {
is Destination.ClosedGroup -> {
val groupKeys = configFactory.getGroupKeysConfig(SessionId.from(destination.publicKey)) ?: throw Error.NoKeyPair
groupKeys.use { keys ->
keys.encrypt(plaintext)
keys.encrypt(proto.toByteArray())
}
}
else -> throw IllegalStateException("Destination should not be open group.")
@ -200,6 +200,7 @@ object MessageSender {
&& forkInfo.defaultRequiresAuth() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP())
destination is Destination.LegacyClosedGroup
&& forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP(), Namespace.DEFAULT())
destination is Destination.ClosedGroup -> listOf(Namespace.CLOSED_GROUP_MESSAGES())
else -> listOf(Namespace.DEFAULT())
}
namespaces.map { namespace ->

View File

@ -57,7 +57,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
companion object {
const val POLL_INTERVAL = 3_000L
const val ENABLE_LOGGING = false
const val ENABLE_LOGGING = true
}
private var isRunning: Boolean = false
@ -109,12 +109,10 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
val membersIndex = 2
val messageIndex = 3
val requiresSync = info.needsPush() || members.needsPush() || keys.needsRekey() || keys.pendingConfig() != null
val messagePoll = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode,
closedGroupSessionId.hexString(),
Namespace.DEFAULT(),
Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
group.signingKey()
) ?: return null
@ -171,6 +169,8 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
}
}
val requiresSync = info.needsPush() || members.needsPush() || keys.needsRekey() || keys.pendingConfig() != null
configFactoryProtocol.saveGroupConfigs(keys, info, members)
keys.free()
info.free()
@ -222,7 +222,7 @@ class ClosedGroupPoller(private val executor: CoroutineScope,
if (ENABLE_LOGGING) Log.d("ClosedGroupPoller", "Merged $hash for info on ${closedGroupSessionId.hexString()}")
}
if (messages.isNotEmpty()) {
MessagingModuleConfiguration.shared.storage.notifyConfigUpdates(infoConfig) // TODO: figure this out
MessagingModuleConfiguration.shared.storage.notifyConfigUpdates(infoConfig)
}
}

View File

@ -709,8 +709,9 @@ object SnodeAPI {
val parameters = message.toJSON().toMutableMap<String,Any>()
parameters += mapOf(
"sig_timestamp" to sigTimestamp,
"signature" to Base64.encodeBytes(signature)
"timestamp" to sigTimestamp,
"signature" to Base64.encodeBytes(signature),
"namespace" to namespace
)
getSingleTargetSnode(pubKey).bind { targetSnode ->

View File

@ -11,4 +11,5 @@ object Namespace {
external fun CLOSED_GROUP_INFO(): Int
external fun CLOSED_GROUP_MEMBERS(): Int
external fun ENCRYPTION_KEYS(): Int
external fun CLOSED_GROUP_MESSAGES(): Int
}