From f870eff45bd691607a27a2a63683c8ba05d0524e Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Mon, 13 Feb 2023 13:28:00 +1100 Subject: [PATCH] fix: make syncJob throttled every 3s and start right away if possible also enable sig_timestamp on mainnet use as every node should have upgraded now --- stylesheets/_modal.scss | 2 - ts/mains/main_node.ts | 2 +- ts/node/sql.ts | 1 - .../apis/snode_api/SnodeRequestTypes.ts | 15 +- ts/session/apis/snode_api/configMessages.ts | 82 --------- ts/session/apis/snode_api/snodeSignatures.ts | 2 + ts/session/profile_manager/ProfileManager.ts | 4 - ts/session/sending/MessageSender.ts | 1 + ts/session/utils/job_runners/JobRunner.ts | 7 +- .../job_runners/jobs/ConfigurationSyncJob.ts | 168 +++++++++++------- 10 files changed, 117 insertions(+), 167 deletions(-) delete mode 100644 ts/session/apis/snode_api/configMessages.ts diff --git a/stylesheets/_modal.scss b/stylesheets/_modal.scss index 2f12235d6..8446da1a3 100644 --- a/stylesheets/_modal.scss +++ b/stylesheets/_modal.scss @@ -132,8 +132,6 @@ .loki-dialog { & ~ .index.inbox { - // filter: blur(2px); // FIXME enable back once modals are moved to react - // currently it cause an issues with toast being on the foreground when a modal is shown transition: filter 0.1s; } diff --git a/ts/mains/main_node.ts b/ts/mains/main_node.ts index a8929e289..0df1e3bc1 100644 --- a/ts/mains/main_node.ts +++ b/ts/mains/main_node.ts @@ -35,7 +35,7 @@ import electronLocalshortcut from 'electron-localshortcut'; const getRealPath = pify(fs.realpath); -// FIXME Hardcoding appId to prevent build failures on release. +// Hardcoding appId to prevent build failures on release. // const appUserModelId = packageJson.build.appId; const appUserModelId = 'com.loki-project.messenger-desktop'; console.log('Set Windows Application User Model ID (AUMID)', { diff --git a/ts/node/sql.ts b/ts/node/sql.ts index 1adfe0c5b..fa0757ebb 100644 --- a/ts/node/sql.ts +++ b/ts/node/sql.ts @@ -445,7 +445,6 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3 // identityPrivateKey, } = formatted; - //FIXME console.warn('FIXME omit(formatted, identityPrivateKey);'); const omited = omit(formatted, 'identityPrivateKey', 'markedAsUnread'); const keys = Object.keys(omited); diff --git a/ts/session/apis/snode_api/SnodeRequestTypes.ts b/ts/session/apis/snode_api/SnodeRequestTypes.ts index e0b14d0af..9900feee5 100644 --- a/ts/session/apis/snode_api/SnodeRequestTypes.ts +++ b/ts/session/apis/snode_api/SnodeRequestTypes.ts @@ -82,23 +82,22 @@ export type GetServiceNodesSubRequest = { }; }; -export type StoreOnNodeParamsNoSig = { - pubkey: string; - ttl: number; - timestamp: number; - data64: string; - namespace: number; -}; - export type StoreOnNodeParams = { pubkey: string; ttl: number; timestamp: number; data: string; namespace: number; + sig_timestamp?: number; signature?: string; pubkey_ed25519?: string; }; + +export type StoreOnNodeParamsNoSig = Pick< + StoreOnNodeParams, + 'pubkey' | 'ttl' | 'timestamp' | 'ttl' | 'namespace' +> & { data64: string }; + export type DeleteFromNodeWithTimestampParams = { timestamp: string | number; } & DeleteSigParameters; diff --git a/ts/session/apis/snode_api/configMessages.ts b/ts/session/apis/snode_api/configMessages.ts deleted file mode 100644 index a1fc030ee..000000000 --- a/ts/session/apis/snode_api/configMessages.ts +++ /dev/null @@ -1,82 +0,0 @@ -// import { isEmpty, sample } from 'lodash'; -// import pRetry from 'p-retry'; -// import { Snode } from '../../../data/data'; -// import { ed25519Str } from '../../onions/onionPath'; -// import { SingleDestinationChanges } from '../../utils/job_runners/jobs/ConfigurationSyncJob'; -// import { doSnodeBatchRequest } from './batchRequest'; -// import { SnodeAPI } from './SNodeAPI'; -// import { getSwarmFor } from './snodePool'; -// import { StoreOnNodeSubRequest } from './SnodeRequestTypes'; - -// function prepareRequest(singleDestChange: SingleDestinationChanges): Array { -// if (isEmpty(singleDestChange) || isEmpty(singleDestChange.messages)) { -// return []; -// } - -// return singleDestChange.messages.map(message => { -// return { method: 'store', params: {} }; -// }); -// } - -/** - * Locally deletes message and deletes message on the network (all nodes that contain the message) - */ -// const sendConfigMessages = async ( -// singleDestChange: SingleDestinationChanges -// ): Promise | null> => { -// if (isEmpty(singleDestChange) || isEmpty(singleDestChange.messages)) { -// return true; -// } -// try { -// const result = await pRetry( -// async () => { -// const swarmToSendTo = await getSwarmFor(singleDestChange.destination); -// const snodeToMakeRequestTo: Snode | undefined = sample(swarmToSendTo); - -// if (!snodeToMakeRequestTo) { -// window?.log?.warn('Cannot networkDeleteMessages, without a valid swarm node.'); -// return null; -// } - -// return pRetry( -// async () => { -// const ret = await doSnodeBatchRequest([{method: 'store', params: {}}]); -// if (!ret) { -// throw new Error( -// `Empty response got for delete on snode ${ed25519Str( -// snodeToMakeRequestTo.pubkey_ed25519 -// )}` -// ); -// } - -// return results; -// } -// }, -// { -// retries: 3, -// minTimeout: SnodeAPI.TEST_getMinTimeout(), -// onFailedAttempt: e => { -// window?.log?.warn( -// `delete INNER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...` -// ); -// }, -// } -// ); -// }, -// { -// retries: 3, -// minTimeout: SnodeAPI.TEST_getMinTimeout(), -// onFailedAttempt: e => { -// window?.log?.warn( -// `delete OUTER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...` -// ); -// }, -// } -// ); - -// return maliciousSnodes; -// } catch (e) { -// window?.log?.warn('failed to delete message on network:', e); -// return null; -// } -// }; diff --git a/ts/session/apis/snode_api/snodeSignatures.ts b/ts/session/apis/snode_api/snodeSignatures.ts index 206feb0e0..dcc49af5a 100644 --- a/ts/session/apis/snode_api/snodeSignatures.ts +++ b/ts/session/apis/snode_api/snodeSignatures.ts @@ -5,6 +5,7 @@ import { GetNetworkTime } from './getNetworkTime'; export type SnodeSignatureResult = { timestamp: number; + sig_timestamp: number; signature: string; pubkey_ed25519: string; pubkey: string; // this is the x25519 key of the pubkey we are doing the request to (ourself for our swarm usually) @@ -81,6 +82,7 @@ async function getSnodeSignatureParams(params: { const signatureBase64 = fromUInt8ArrayToBase64(signature); return { + sig_timestamp: signatureTimestamp, timestamp: signatureTimestamp, signature: signatureBase64, pubkey_ed25519: ourEd25519Key.pubKey, diff --git a/ts/session/profile_manager/ProfileManager.ts b/ts/session/profile_manager/ProfileManager.ts index f036c1bff..a7f2c0299 100644 --- a/ts/session/profile_manager/ProfileManager.ts +++ b/ts/session/profile_manager/ProfileManager.ts @@ -42,10 +42,6 @@ async function updateProfileOfContact( // avoid setting the display name to an invalid value if (existingDisplayName !== displayName && !isEmpty(displayName)) { - console.warn( - `updateProfileOfContact overriding old "${existingDisplayName}: with "${displayName}"` - ); - conversation.set('displayNameInProfile', displayName || undefined); await conversation.commit(); } diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index f7a2857b7..1b79431d0 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -204,6 +204,7 @@ async function sendMessagesDataToSnode( namespace: item.namespace, pubkey: item.pubkey, timestamp: item.timestamp, + sig_timestamp: item.timestamp, ttl: item.ttl, ...signOpts, }; diff --git a/ts/session/utils/job_runners/JobRunner.ts b/ts/session/utils/job_runners/JobRunner.ts index 1e014d890..3e62b0c90 100644 --- a/ts/session/utils/job_runners/JobRunner.ts +++ b/ts/session/utils/job_runners/JobRunner.ts @@ -318,10 +318,9 @@ export class PersistedJobRunner { ); } else { window.log.info( - `Too many failures for ${jobToLogId( - this.jobRunnerType, - nextJob - )} out of nextJob.persistedData.maxAttempts` + `Too many failures for ${jobToLogId(this.jobRunnerType, nextJob)}: ${ + nextJob.persistedData.currentRetry + } out of ${nextJob.persistedData.maxAttempts}` ); } // we cannot restart this job anymore. Remove the entry completely diff --git a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts index 05f3fbddd..b85295c70 100644 --- a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts +++ b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts @@ -23,6 +23,12 @@ import { const defaultMsBetweenRetries = 3000; const defaultMaxAttempts = 3; +/** + * We want to run each of those jobs at least 3seconds apart. + * So every time one of that job finishes, update this timestamp, so we know when adding a new job, what is the next minimun date to run it. + */ +let lastRunConfigSyncJobTimestamp: number | null = null; + export type SingleDestinationChanges = { destination: string; messages: Array; @@ -175,76 +181,84 @@ class ConfigurationSyncJob extends PersistedJob delayBetweenRetries: defaultMsBetweenRetries, maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttempts, currentRetry: isNumber(currentRetry) ? currentRetry : 0, - nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + defaultMsBetweenRetries, + nextAttemptTimestamp: nextAttemptTimestamp || Date.now(), }); } public async run(): Promise { - if (!window.sessionFeatureFlags.useSharedUtilForUserConfig) { + try { + if (!window.sessionFeatureFlags.useSharedUtilForUserConfig) { + return RunJobResult.Success; + } + window.log.debug(`ConfigurationSyncJob starting ${this.persistedData.identifier}`); + + const us = UserUtils.getOurPubKeyStrFromCache(); + const ed25519Key = await UserUtils.getUserED25519KeyPairBytes(); + const conversation = getConversationController().get(us); + if (!us || !conversation || !ed25519Key) { + // we check for ed25519Key because it is needed for authenticated requests + window.log.warn('did not find our own conversation'); + return RunJobResult.PermanentFailure; + } + const name = conversation.get('displayNameInProfile'); + const pointer = conversation.get('avatarPointer'); + const profileKey = conversation.get('profileKey'); + await UserConfigWrapperActions.setName(name || ''); + + if (profileKey && pointer) { + const profileKeyArray = fromHexToArray(profileKey); + await UserConfigWrapperActions.setProfilePicture(pointer, profileKeyArray); + } else { + await UserConfigWrapperActions.setProfilePicture('', new Uint8Array()); + } + + const singleDestChanges = await retrieveSingleDestinationChanges(); + + // If there are no pending changes then the job can just complete (next time something + // is updated we want to try and run immediately so don't scuedule another run in this case) + + if (isEmpty(singleDestChanges)) { + return RunJobResult.Success; + } + + const allResults = await Promise.allSettled( + singleDestChanges.map(async dest => { + const msgs = dest.messages.map(item => { + return { + namespace: item.namespace, + pubkey: item.destination, + timestamp: item.message.timestamp, + ttl: item.message.ttl(), + message: item.message, + }; + }); + const asSet = new Set(dest.allOldHashes); + return MessageSender.sendMessagesToSnode(msgs, dest.destination, asSet); + }) + ); + + // we do a sequence call here. If we do not have the right expected number of results, consider it + + if (!isArray(allResults) || allResults.length !== singleDestChanges.length) { + return RunJobResult.RetryJobIfPossible; + } + + const changes = resultsToSuccessfulChange(allResults, singleDestChanges); + if (isEmpty(changes)) { + return RunJobResult.RetryJobIfPossible; + } + // Now that we have the successful changes, we need to mark them as pushed and + // generate any config dumps which need to be stored + + await buildAndSaveDumpsToDB(changes); return RunJobResult.Success; + } catch (e) { + throw e; + } finally { + // this is a simple way to make sure whatever happens here, we update the lastest timestamp. + // (a finally statement is always executed (no matter if exception or returns in other try/catch block) + this.updateLastTickTimestamp(); } - window.log.debug(`ConfigurationSyncJob starting ${this.persistedData.identifier}`); - - const us = UserUtils.getOurPubKeyStrFromCache(); - const ed25519Key = await UserUtils.getUserED25519KeyPairBytes(); - const conversation = getConversationController().get(us); - if (!us || !conversation || !ed25519Key) { - // we check for ed25519Key because it is needed for authenticated requests - window.log.warn('did not find our own conversation'); - return RunJobResult.PermanentFailure; - } - const name = conversation.get('displayNameInProfile'); - const pointer = conversation.get('avatarPointer'); - const profileKey = conversation.get('profileKey'); - await UserConfigWrapperActions.setName(name || ''); - - if (profileKey && pointer) { - const profileKeyArray = fromHexToArray(profileKey); - await UserConfigWrapperActions.setProfilePicture(pointer, profileKeyArray); - } else { - await UserConfigWrapperActions.setProfilePicture('', new Uint8Array()); - } - - const singleDestChanges = await retrieveSingleDestinationChanges(); - - // If there are no pending changes then the job can just complete (next time something - // is updated we want to try and run immediately so don't scuedule another run in this case) - - if (isEmpty(singleDestChanges)) { - return RunJobResult.Success; - } - - const allResults = await Promise.allSettled( - singleDestChanges.map(async dest => { - const msgs = dest.messages.map(item => { - return { - namespace: item.namespace, - pubkey: item.destination, - timestamp: item.message.timestamp, - ttl: item.message.ttl(), - message: item.message, - }; - }); - const asSet = new Set(dest.allOldHashes); - return MessageSender.sendMessagesToSnode(msgs, dest.destination, asSet); - }) - ); - - // we do a sequence call here. If we do not have the right expected number of results, consider it - - if (!isArray(allResults) || allResults.length !== singleDestChanges.length) { - return RunJobResult.RetryJobIfPossible; - } - - const changes = resultsToSuccessfulChange(allResults, singleDestChanges); - if (isEmpty(changes)) { - return RunJobResult.RetryJobIfPossible; - } - // Now that we have the successful changes, we need to mark them as pushed and - // generate any config dumps which need to be stored - - await buildAndSaveDumpsToDB(changes); - return RunJobResult.Success; } public serializeJob(): ConfigurationSyncPersistedData { @@ -268,6 +282,10 @@ class ConfigurationSyncJob extends PersistedJob public getJobTimeoutMs(): number { return 20000; } + + private updateLastTickTimestamp() { + lastRunConfigSyncJobTimestamp = Date.now(); + } } /** @@ -275,7 +293,27 @@ class ConfigurationSyncJob extends PersistedJob * A ConfigurationSyncJob can only be added if there is none of the same type queued already. */ async function queueNewJobIfNeeded() { - await runners.configurationSyncRunner.addJob(new ConfigurationSyncJob({})); + if ( + !lastRunConfigSyncJobTimestamp || + lastRunConfigSyncJobTimestamp < Date.now() - defaultMsBetweenRetries + ) { + window.log.debug('scheduling conf sync job in asap'); + + // this call will make sure that there is only one configuration sync job at all times + await runners.configurationSyncRunner.addJob( + new ConfigurationSyncJob({ nextAttemptTimestamp: Date.now() }) + ); + } else { + // if we did run at 100, and it is currently 110, diff is 10 + const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0); + // but we want to run every 30, so what we need is actually `30-10` from now = 20 + const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 0); + window.log.debug(`scheduling conf sync job in ${leftBeforeNextTick} ms`); + + await runners.configurationSyncRunner.addJob( + new ConfigurationSyncJob({ nextAttemptTimestamp: Date.now() + leftBeforeNextTick }) + ); + } } export const ConfigurationSync = {