mirror of
https://github.com/oxen-io/session-desktop.git
synced 2023-12-14 02:12:57 +01:00
feat: add job result enum with success, retry and permanent fail
This commit is contained in:
parent
08a15b210a
commit
90804491d6
|
@ -48,7 +48,6 @@ import { initializeLibSessionUtilWrappers } from '../../session/utils/libsession
|
||||||
import { isDarkTheme } from '../../state/selectors/theme';
|
import { isDarkTheme } from '../../state/selectors/theme';
|
||||||
import { ThemeStateType } from '../../themes/constants/colors';
|
import { ThemeStateType } from '../../themes/constants/colors';
|
||||||
import { switchThemeTo } from '../../themes/switchTheme';
|
import { switchThemeTo } from '../../themes/switchTheme';
|
||||||
import { AvatarDownloadJob } from '../../session/utils/job_runners/jobs/AvatarDownloadJob';
|
|
||||||
import { runners } from '../../session/utils/job_runners/JobRunner';
|
import { runners } from '../../session/utils/job_runners/JobRunner';
|
||||||
|
|
||||||
const Section = (props: { type: SectionType }) => {
|
const Section = (props: { type: SectionType }) => {
|
||||||
|
@ -64,20 +63,6 @@ const Section = (props: { type: SectionType }) => {
|
||||||
const handleClick = async () => {
|
const handleClick = async () => {
|
||||||
/* tslint:disable:no-void-expression */
|
/* tslint:disable:no-void-expression */
|
||||||
if (type === SectionType.Profile) {
|
if (type === SectionType.Profile) {
|
||||||
const us = UserUtils.getOurPubKeyStrFromCache();
|
|
||||||
const ourConvo = getConversationController().get(us);
|
|
||||||
|
|
||||||
const job = new AvatarDownloadJob({
|
|
||||||
conversationId: us,
|
|
||||||
currentRetry: 0,
|
|
||||||
delayBetweenRetries: 3000,
|
|
||||||
maxAttempts: 3,
|
|
||||||
profileKeyHex: ourConvo.get('profileKey') || null,
|
|
||||||
profilePictureUrl: ourConvo.get('avatarPointer') || null,
|
|
||||||
});
|
|
||||||
await runners.avatarDownloadRunner.loadJobsFromDb();
|
|
||||||
runners.avatarDownloadRunner.startProcessing();
|
|
||||||
await runners.avatarDownloadRunner.addJob(job);
|
|
||||||
dispatch(editProfileModal({}));
|
dispatch(editProfileModal({}));
|
||||||
} else if (type === SectionType.ColorMode) {
|
} else if (type === SectionType.ColorMode) {
|
||||||
const currentTheme = String(window.Events.getThemeSetting());
|
const currentTheme = String(window.Events.getThemeSetting());
|
||||||
|
@ -214,6 +199,9 @@ const doAppStartUp = async () => {
|
||||||
// this generates the key to encrypt attachments locally
|
// this generates the key to encrypt attachments locally
|
||||||
await Data.generateAttachmentKeyIfEmpty();
|
await Data.generateAttachmentKeyIfEmpty();
|
||||||
|
|
||||||
|
await runners.avatarDownloadRunner.loadJobsFromDb();
|
||||||
|
runners.avatarDownloadRunner.startProcessing();
|
||||||
|
|
||||||
// trigger a sync message if needed for our other devices
|
// trigger a sync message if needed for our other devices
|
||||||
void triggerSyncIfNeeded();
|
void triggerSyncIfNeeded();
|
||||||
void getSwarmPollingInstance().start();
|
void getSwarmPollingInstance().start();
|
||||||
|
|
|
@ -410,7 +410,6 @@ function getConversationCount() {
|
||||||
// tslint:disable-next-line: max-func-body-length
|
// tslint:disable-next-line: max-func-body-length
|
||||||
function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3.Database) {
|
function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3.Database) {
|
||||||
const formatted = assertValidConversationAttributes(data);
|
const formatted = assertValidConversationAttributes(data);
|
||||||
console.warn('formatted', formatted);
|
|
||||||
|
|
||||||
const {
|
const {
|
||||||
id,
|
id,
|
||||||
|
|
|
@ -2,11 +2,7 @@ import { to_hex } from 'libsodium-wrappers-sumo';
|
||||||
import { isEmpty } from 'lodash';
|
import { isEmpty } from 'lodash';
|
||||||
import { getConversationController } from '../conversations';
|
import { getConversationController } from '../conversations';
|
||||||
import { UserUtils } from '../utils';
|
import { UserUtils } from '../utils';
|
||||||
import { runners } from '../utils/job_runners/JobRunner';
|
import { AvatarDownload } from '../utils/job_runners/jobs/AvatarDownloadJob';
|
||||||
import {
|
|
||||||
AvatarDownloadJob,
|
|
||||||
shouldAddAvatarDownloadJob,
|
|
||||||
} from '../utils/job_runners/jobs/AvatarDownloadJob';
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This can be used to update our conversation display name with the given name right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it
|
* This can be used to update our conversation display name with the given name right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it
|
||||||
|
@ -16,18 +12,14 @@ async function updateOurProfileSync(
|
||||||
profileUrl: string | null,
|
profileUrl: string | null,
|
||||||
profileKey: Uint8Array | null
|
profileKey: Uint8Array | null
|
||||||
) {
|
) {
|
||||||
const ourConvo = getConversationController().get(UserUtils.getOurPubKeyStrFromCache());
|
const us = UserUtils.getOurPubKeyStrFromCache();
|
||||||
|
const ourConvo = getConversationController().get(us);
|
||||||
if (!ourConvo?.id) {
|
if (!ourConvo?.id) {
|
||||||
window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid');
|
window?.log?.warn('[profileupdate] Cannot update our profile without convo associated');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
return updateProfileOfContact(
|
return updateProfileOfContact(us, displayName, profileUrl, profileKey);
|
||||||
UserUtils.getOurPubKeyStrFromCache(),
|
|
||||||
displayName,
|
|
||||||
profileUrl,
|
|
||||||
profileKey
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -56,15 +48,12 @@ async function updateProfileOfContact(
|
||||||
// add an avatar download job only if needed
|
// add an avatar download job only if needed
|
||||||
|
|
||||||
const profileKeyHex = !profileKey || isEmpty(profileKey) ? null : to_hex(profileKey);
|
const profileKeyHex = !profileKey || isEmpty(profileKey) ? null : to_hex(profileKey);
|
||||||
if (shouldAddAvatarDownloadJob({ pubkey, profileUrl, profileKeyHex })) {
|
|
||||||
const avatarDownloadJob = new AvatarDownloadJob({
|
|
||||||
conversationId: pubkey,
|
|
||||||
profileKeyHex,
|
|
||||||
profilePictureUrl: profileUrl || null,
|
|
||||||
});
|
|
||||||
|
|
||||||
await runners.avatarDownloadRunner.addJob(avatarDownloadJob);
|
await AvatarDownload.addAvatarDownloadJobIfNeeded({
|
||||||
}
|
profileKeyHex,
|
||||||
|
profileUrl,
|
||||||
|
pubkey,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export const ProfileManager = {
|
export const ProfileManager = {
|
||||||
|
|
|
@ -3,7 +3,7 @@ import {
|
||||||
FakeSleepForJob,
|
FakeSleepForJob,
|
||||||
FakeSleepForMultiJob,
|
FakeSleepForMultiJob,
|
||||||
} from '../../../test/session/unit/utils/job_runner/FakeSleepForJob';
|
} from '../../../test/session/unit/utils/job_runner/FakeSleepForJob';
|
||||||
import { AvatarDownloadJob } from './jobs/AvatarDownloadJob';
|
import { AvatarDownload } from './jobs/AvatarDownloadJob';
|
||||||
import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob';
|
import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob';
|
||||||
import { PersistedJob, TypeOfPersistedData } from './PersistedJob';
|
import { PersistedJob, TypeOfPersistedData } from './PersistedJob';
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ export function persistedJobFromData<T extends TypeOfPersistedData>(
|
||||||
return (new ConfigurationSyncJob(data) as unknown) as PersistedJob<T>;
|
return (new ConfigurationSyncJob(data) as unknown) as PersistedJob<T>;
|
||||||
|
|
||||||
case 'AvatarDownloadJobType':
|
case 'AvatarDownloadJobType':
|
||||||
return (new AvatarDownloadJob(data) as unknown) as PersistedJob<T>;
|
return (new AvatarDownload.AvatarDownloadJob(data) as unknown) as PersistedJob<T>;
|
||||||
case 'FakeSleepForJobType':
|
case 'FakeSleepForJobType':
|
||||||
return (new FakeSleepForJob(data) as unknown) as PersistedJob<T>;
|
return (new FakeSleepForJob(data) as unknown) as PersistedJob<T>;
|
||||||
case 'FakeSleepForJobMultiType':
|
case 'FakeSleepForJobMultiType':
|
||||||
|
|
|
@ -6,6 +6,7 @@ import {
|
||||||
AvatarDownloadPersistedData,
|
AvatarDownloadPersistedData,
|
||||||
ConfigurationSyncPersistedData,
|
ConfigurationSyncPersistedData,
|
||||||
PersistedJob,
|
PersistedJob,
|
||||||
|
RunJobResult,
|
||||||
TypeOfPersistedData,
|
TypeOfPersistedData,
|
||||||
} from './PersistedJob';
|
} from './PersistedJob';
|
||||||
|
|
||||||
|
@ -19,13 +20,6 @@ export type StartProcessingResult = 'job_in_progress' | 'job_deferred' | 'job_st
|
||||||
|
|
||||||
export type AddJobResult = 'job_deferred' | 'job_started';
|
export type AddJobResult = 'job_deferred' | 'job_started';
|
||||||
|
|
||||||
export type JobEventListener = {
|
|
||||||
onJobSuccess: (job: TypeOfPersistedData) => void;
|
|
||||||
onJobDeferred: (job: TypeOfPersistedData) => void;
|
|
||||||
onJobError: (job: TypeOfPersistedData) => void;
|
|
||||||
onJobStarted: (job: TypeOfPersistedData) => void;
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is used to plan jobs and make sure they are retried until the success.
|
* This class is used to plan jobs and make sure they are retried until the success.
|
||||||
* By having a specific type, we can find the logic to be run by that type of job.
|
* By having a specific type, we can find the logic to be run by that type of job.
|
||||||
|
@ -43,11 +37,9 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
||||||
private readonly jobRunnerType: JobRunnerType;
|
private readonly jobRunnerType: JobRunnerType;
|
||||||
private nextJobStartTimer: NodeJS.Timeout | null = null;
|
private nextJobStartTimer: NodeJS.Timeout | null = null;
|
||||||
private currentJob: PersistedJob<T> | null = null;
|
private currentJob: PersistedJob<T> | null = null;
|
||||||
private readonly jobEventsListener: JobEventListener | null;
|
|
||||||
|
|
||||||
constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) {
|
constructor(jobRunnerType: JobRunnerType, _jobEventsListener: null) {
|
||||||
this.jobRunnerType = jobRunnerType;
|
this.jobRunnerType = jobRunnerType;
|
||||||
this.jobEventsListener = jobEventsListener;
|
|
||||||
window?.log?.warn(`new runner of type ${jobRunnerType} built`);
|
window?.log?.warn(`new runner of type ${jobRunnerType} built`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,10 +88,8 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
||||||
.map(k => k.serializeJob());
|
.map(k => k.serializeJob());
|
||||||
|
|
||||||
const addJobChecks = job.addJobCheck(serializedNonRunningJobs);
|
const addJobChecks = job.addJobCheck(serializedNonRunningJobs);
|
||||||
if (addJobChecks === 'skipAsJobTypeAlreadyPresent') {
|
if (addJobChecks === 'skipAddSameJobPresent') {
|
||||||
window.log.warn(
|
window.log.warn(`addjobCheck returned "${addJobChecks}" so not adding it`);
|
||||||
`job runner has already a job with type:"${job.persistedData.jobType}" planned so not adding another one`
|
|
||||||
);
|
|
||||||
return 'type_exists';
|
return 'type_exists';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,6 +278,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
||||||
this.planNextJob();
|
this.planNextJob();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
let success: RunJobResult | null = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (this.currentJob) {
|
if (this.currentJob) {
|
||||||
|
@ -295,11 +286,9 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
||||||
}
|
}
|
||||||
this.currentJob = nextJob;
|
this.currentJob = nextJob;
|
||||||
|
|
||||||
this.jobEventsListener?.onJobStarted(this.currentJob.serializeJob());
|
success = await this.currentJob.runJob();
|
||||||
|
|
||||||
const success = await this.currentJob.runJob();
|
if (success !== RunJobResult.Success) {
|
||||||
|
|
||||||
if (!success) {
|
|
||||||
throw new Error(`job ${nextJob.persistedData.identifier} failed`);
|
throw new Error(`job ${nextJob.persistedData.identifier} failed`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,30 +296,23 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
||||||
this.deleteJobsByIdentifier([this.currentJob.persistedData.identifier]);
|
this.deleteJobsByIdentifier([this.currentJob.persistedData.identifier]);
|
||||||
await this.writeJobsToDB();
|
await this.writeJobsToDB();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (nextJob.persistedData.currentRetry >= nextJob.persistedData.maxAttempts - 1) {
|
if (
|
||||||
|
success === RunJobResult.PermanentFailure ||
|
||||||
|
nextJob.persistedData.currentRetry >= nextJob.persistedData.maxAttempts - 1
|
||||||
|
) {
|
||||||
// we cannot restart this job anymore. Remove the entry completely
|
// we cannot restart this job anymore. Remove the entry completely
|
||||||
this.deleteJobsByIdentifier([nextJob.persistedData.identifier]);
|
this.deleteJobsByIdentifier([nextJob.persistedData.identifier]);
|
||||||
if (this.jobEventsListener && this.currentJob) {
|
|
||||||
this.jobEventsListener.onJobError(this.currentJob.serializeJob());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
nextJob.persistedData.currentRetry = nextJob.persistedData.currentRetry + 1;
|
nextJob.persistedData.currentRetry = nextJob.persistedData.currentRetry + 1;
|
||||||
// that job can be restarted. Plan a retry later with the already defined retry
|
// that job can be restarted. Plan a retry later with the already defined retry
|
||||||
nextJob.persistedData.nextAttemptTimestamp =
|
nextJob.persistedData.nextAttemptTimestamp =
|
||||||
Date.now() + nextJob.persistedData.delayBetweenRetries;
|
Date.now() + nextJob.persistedData.delayBetweenRetries;
|
||||||
if (this.jobEventsListener && this.currentJob) {
|
|
||||||
this.jobEventsListener.onJobDeferred(this.currentJob.serializeJob());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// in any case, either we removed a job or changed one of the timestamp.
|
// in any case, either we removed a job or changed one of the timestamp.
|
||||||
// so sort the list again, and persist it
|
// so sort the list again, and persist it
|
||||||
this.sortJobsList();
|
this.sortJobsList();
|
||||||
await this.writeJobsToDB();
|
await this.writeJobsToDB();
|
||||||
} finally {
|
} finally {
|
||||||
if (this.jobEventsListener && this.currentJob) {
|
|
||||||
this.jobEventsListener.onJobSuccess(this.currentJob.serializeJob());
|
|
||||||
}
|
|
||||||
|
|
||||||
this.currentJob = null;
|
this.currentJob = null;
|
||||||
|
|
||||||
// start the next job if there is any to be started now, or just plan the wakeup of our runner for the right time.
|
// start the next job if there is any to be started now, or just plan the wakeup of our runner for the right time.
|
||||||
|
@ -341,7 +323,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
||||||
private assertIsInitialized() {
|
private assertIsInitialized() {
|
||||||
if (!this.isInit) {
|
if (!this.isInit) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
'persisted job runner was not initlized yet. Call initWithData with what you have persisted first'
|
'persisted job runner was not initlized yet. Call loadJobsFromDb with what you have persisted first'
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,18 @@ export type TypeOfPersistedData =
|
||||||
| FakeSleepJobData
|
| FakeSleepJobData
|
||||||
| FakeSleepForMultiJobData;
|
| FakeSleepForMultiJobData;
|
||||||
|
|
||||||
|
export type AddJobCheckReturn =
|
||||||
|
| 'skipAddSameJobPresent'
|
||||||
|
| 'removeJobsFromQueue'
|
||||||
|
| 'sameJobDataAlreadyInQueue'
|
||||||
|
| null;
|
||||||
|
|
||||||
|
export enum RunJobResult {
|
||||||
|
Success = 1,
|
||||||
|
RetryJobIfPossible = 2,
|
||||||
|
PermanentFailure = 3,
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class can be used to save and run jobs from the database.
|
* This class can be used to save and run jobs from the database.
|
||||||
* Every child class must take the minimum amount of arguments, and make sure they are unlikely to change.
|
* Every child class must take the minimum amount of arguments, and make sure they are unlikely to change.
|
||||||
|
@ -55,7 +67,7 @@ export type TypeOfPersistedData =
|
||||||
export abstract class PersistedJob<T extends PersistedJobData> {
|
export abstract class PersistedJob<T extends PersistedJobData> {
|
||||||
public persistedData: T;
|
public persistedData: T;
|
||||||
|
|
||||||
private runningPromise: Promise<boolean> | null = null;
|
private runningPromise: Promise<RunJobResult> | null = null;
|
||||||
|
|
||||||
public constructor(data: T) {
|
public constructor(data: T) {
|
||||||
if (data.maxAttempts < 1) {
|
if (data.maxAttempts < 1) {
|
||||||
|
@ -109,13 +121,11 @@ export abstract class PersistedJob<T extends PersistedJobData> {
|
||||||
|
|
||||||
public abstract nonRunningJobsToRemove(jobs: Array<T>): Array<T>;
|
public abstract nonRunningJobsToRemove(jobs: Array<T>): Array<T>;
|
||||||
|
|
||||||
public abstract addJobCheck(
|
public abstract addJobCheck(jobs: Array<T>): AddJobCheckReturn;
|
||||||
jobs: Array<T>
|
|
||||||
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null;
|
|
||||||
|
|
||||||
public addJobCheckSameTypePresent(jobs: Array<T>): 'skipAsJobTypeAlreadyPresent' | null {
|
public addJobCheckSameTypePresent(jobs: Array<T>): 'skipAddSameJobPresent' | null {
|
||||||
return jobs.some(j => j.jobType === this.persistedData.jobType)
|
return jobs.some(j => j.jobType === this.persistedData.jobType)
|
||||||
? 'skipAsJobTypeAlreadyPresent'
|
? 'skipAddSameJobPresent'
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,7 +136,7 @@ export abstract class PersistedJob<T extends PersistedJobData> {
|
||||||
*
|
*
|
||||||
* Note: you should check the this.isAborted() to know if you should cancel the current processing of your logic.
|
* Note: you should check the this.isAborted() to know if you should cancel the current processing of your logic.
|
||||||
*/
|
*/
|
||||||
protected abstract run(): Promise<boolean>;
|
protected abstract run(): Promise<RunJobResult>;
|
||||||
|
|
||||||
protected serializeBase(): T {
|
protected serializeBase(): T {
|
||||||
return cloneDeep(this.persistedData);
|
return cloneDeep(this.persistedData);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { isEmpty, isEqual, isNumber } from 'lodash';
|
import { isEmpty, isEqual, isNumber, isString } from 'lodash';
|
||||||
import { v4 } from 'uuid';
|
import { v4 } from 'uuid';
|
||||||
import { UserUtils } from '../..';
|
import { UserUtils } from '../..';
|
||||||
import { downloadAttachment } from '../../../../receiver/attachments';
|
import { downloadAttachment } from '../../../../receiver/attachments';
|
||||||
|
@ -8,7 +8,13 @@ import { autoScaleForIncomingAvatar } from '../../../../util/attachmentsUtil';
|
||||||
import { decryptProfile } from '../../../../util/crypto/profileEncrypter';
|
import { decryptProfile } from '../../../../util/crypto/profileEncrypter';
|
||||||
import { getConversationController } from '../../../conversations';
|
import { getConversationController } from '../../../conversations';
|
||||||
import { fromHexToArray } from '../../String';
|
import { fromHexToArray } from '../../String';
|
||||||
import { AvatarDownloadPersistedData, PersistedJob } from '../PersistedJob';
|
import { runners } from '../JobRunner';
|
||||||
|
import {
|
||||||
|
AddJobCheckReturn,
|
||||||
|
AvatarDownloadPersistedData,
|
||||||
|
PersistedJob,
|
||||||
|
RunJobResult,
|
||||||
|
} from '../PersistedJob';
|
||||||
|
|
||||||
const defaultMsBetweenRetries = 10000;
|
const defaultMsBetweenRetries = 10000;
|
||||||
const defaultMaxAttemps = 3;
|
const defaultMaxAttemps = 3;
|
||||||
|
@ -16,7 +22,7 @@ const defaultMaxAttemps = 3;
|
||||||
/**
|
/**
|
||||||
* Returns true if given those details we should add an Avatar Download Job to the list of jobs to run
|
* Returns true if given those details we should add an Avatar Download Job to the list of jobs to run
|
||||||
*/
|
*/
|
||||||
export function shouldAddAvatarDownloadJob({
|
function shouldAddAvatarDownloadJob({
|
||||||
profileKeyHex,
|
profileKeyHex,
|
||||||
profileUrl,
|
profileUrl,
|
||||||
pubkey,
|
pubkey,
|
||||||
|
@ -36,22 +42,47 @@ export function shouldAddAvatarDownloadJob({
|
||||||
window.log.warn('shouldAddAvatarDownloadJob can only be used for private convos currently');
|
window.log.warn('shouldAddAvatarDownloadJob can only be used for private convos currently');
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (profileUrl && !isEmpty(profileKeyHex)) {
|
const prevPointer = conversation.get('avatarPointer');
|
||||||
const prevPointer = conversation.get('avatarPointer');
|
|
||||||
const needsUpdate = !prevPointer || !isEqual(prevPointer, profileUrl);
|
|
||||||
|
|
||||||
return needsUpdate;
|
if (!isEmpty(profileUrl) && !isEmpty(profileKeyHex) && !isEqual(prevPointer, profileUrl)) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function addAvatarDownloadJobIfNeeded({
|
||||||
|
profileKeyHex,
|
||||||
|
profileUrl,
|
||||||
|
pubkey,
|
||||||
|
}: {
|
||||||
|
pubkey: string;
|
||||||
|
profileUrl: string | null | undefined;
|
||||||
|
profileKeyHex: string | null | undefined;
|
||||||
|
}) {
|
||||||
|
if (profileKeyHex && shouldAddAvatarDownloadJob({ pubkey, profileUrl, profileKeyHex })) {
|
||||||
|
const avatarDownloadJob = new AvatarDownloadJob({
|
||||||
|
conversationId: pubkey,
|
||||||
|
profileKeyHex,
|
||||||
|
profilePictureUrl: profileUrl || null,
|
||||||
|
});
|
||||||
|
window.log.debug(
|
||||||
|
`addAvatarDownloadJobIfNeeded: adding job download for ${pubkey}:${profileUrl}:${profileKeyHex} `
|
||||||
|
);
|
||||||
|
await runners.avatarDownloadRunner.addJob(avatarDownloadJob);
|
||||||
|
} else {
|
||||||
|
window.log.debug(
|
||||||
|
`addAvatarDownloadJobIfNeeded: no download required for ${pubkey}:${profileUrl}:${profileKeyHex} `
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This job can be used to add the downloading of the avatar of a conversation to the list of jobs to be run.
|
* This job can be used to add the downloading of the avatar of a conversation to the list of jobs to be run.
|
||||||
* The conversationId is used as identifier so we can only have a single job per conversation.
|
* The conversationId is used as identifier so we can only have a single job per conversation.
|
||||||
* When the jobRunners starts this job, the job first checks if a download is required or not (avatarPointer changed and wasn't already downloaded).
|
* When the jobRunners starts this job, the job first checks if a download is required or not (avatarPointer changed and wasn't already downloaded).
|
||||||
* If yes, it downloads the new avatar, decrypt it and store it before updating the conversation with the new url,profilekey and local file storage.
|
* If yes, it downloads the new avatar, decrypt it and store it before updating the conversation with the new url,profilekey and local file storage.
|
||||||
*/
|
*/
|
||||||
export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData> {
|
class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData> {
|
||||||
constructor({
|
constructor({
|
||||||
conversationId,
|
conversationId,
|
||||||
nextAttemptTimestamp,
|
nextAttemptTimestamp,
|
||||||
|
@ -86,7 +117,7 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
}
|
}
|
||||||
|
|
||||||
// tslint:disable-next-line: cyclomatic-complexity
|
// tslint:disable-next-line: cyclomatic-complexity
|
||||||
public async run(): Promise<boolean> {
|
public async run(): Promise<RunJobResult> {
|
||||||
const convoId = this.persistedData.conversationId;
|
const convoId = this.persistedData.conversationId;
|
||||||
|
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
|
@ -95,7 +126,7 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
|
|
||||||
if (!this.persistedData.identifier || !convoId) {
|
if (!this.persistedData.identifier || !convoId) {
|
||||||
// return true so we do not retry this task.
|
// return true so we do not retry this task.
|
||||||
return true;
|
return RunJobResult.PermanentFailure;
|
||||||
}
|
}
|
||||||
|
|
||||||
let conversation = getConversationController().get(convoId);
|
let conversation = getConversationController().get(convoId);
|
||||||
|
@ -103,11 +134,11 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
// return true so we do not retry this task.
|
// return true so we do not retry this task.
|
||||||
window.log.warn('AvatarDownloadJob did not corresponding conversation');
|
window.log.warn('AvatarDownloadJob did not corresponding conversation');
|
||||||
|
|
||||||
return true;
|
return RunJobResult.PermanentFailure;
|
||||||
}
|
}
|
||||||
if (!conversation.isPrivate()) {
|
if (!conversation.isPrivate()) {
|
||||||
window.log.warn('AvatarDownloadJob can only be used for private convos currently');
|
window.log.warn('AvatarDownloadJob can only be used for private convos currently');
|
||||||
return true;
|
return RunJobResult.PermanentFailure;
|
||||||
}
|
}
|
||||||
let changes = false;
|
let changes = false;
|
||||||
|
|
||||||
|
@ -120,7 +151,7 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
// return true so we do not retry this task.
|
// return true so we do not retry this task.
|
||||||
window.log.warn('AvatarDownloadJob shouldAddAvatarDownloadJob said no');
|
window.log.warn('AvatarDownloadJob shouldAddAvatarDownloadJob said no');
|
||||||
|
|
||||||
return true;
|
return RunJobResult.PermanentFailure;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.persistedData.profilePictureUrl && this.persistedData.profileKeyHex) {
|
if (this.persistedData.profilePictureUrl && this.persistedData.profileKeyHex) {
|
||||||
|
@ -162,16 +193,23 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
({ path } = upgraded);
|
({ path } = upgraded);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
window?.log?.error(`[profileupdate] Could not decrypt profile image: ${e}`);
|
window?.log?.error(`[profileupdate] Could not decrypt profile image: ${e}`);
|
||||||
|
return RunJobResult.RetryJobIfPossible; // so we retry this job
|
||||||
}
|
}
|
||||||
|
|
||||||
conversation.set({ avatarInProfile: path || undefined });
|
conversation.set({ avatarInProfile: path || undefined });
|
||||||
|
|
||||||
changes = true;
|
changes = true;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
if (isString(e.message) && (e.message as string).includes('404')) {
|
||||||
|
window.log.warn(
|
||||||
|
`[profileupdate] Failed to download attachment at ${this.persistedData.profilePictureUrl}. We got 404 error: "${e.message}"`
|
||||||
|
);
|
||||||
|
return RunJobResult.PermanentFailure;
|
||||||
|
}
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
`[profileupdate] Failed to download attachment at ${this.persistedData.profilePictureUrl}. Maybe it expired? ${e.message}`
|
`[profileupdate] Failed to download attachment at ${this.persistedData.profilePictureUrl}. Maybe it expired? ${e.message}`
|
||||||
);
|
);
|
||||||
// do not return here, we still want to update the display name even if the avatar failed to download
|
return RunJobResult.RetryJobIfPossible;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -210,7 +248,7 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
}
|
}
|
||||||
|
|
||||||
// return true so this job is marked as a success
|
// return true so this job is marked as a success
|
||||||
return true;
|
return RunJobResult.Success;
|
||||||
}
|
}
|
||||||
|
|
||||||
public serializeJob(): AvatarDownloadPersistedData {
|
public serializeJob(): AvatarDownloadPersistedData {
|
||||||
|
@ -222,12 +260,27 @@ export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData>
|
||||||
return jobs.filter(j => j.conversationId === this.persistedData.conversationId);
|
return jobs.filter(j => j.conversationId === this.persistedData.conversationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public addJobCheck(
|
public addJobCheck(jobs: Array<AvatarDownloadPersistedData>): AddJobCheckReturn {
|
||||||
jobs: Array<AvatarDownloadPersistedData>
|
// avoid adding the same job if the exact same one is already planned
|
||||||
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
|
const hasSameJob = jobs.some(j => {
|
||||||
|
return (
|
||||||
|
j.conversationId === this.persistedData.conversationId &&
|
||||||
|
j.profileKeyHex === this.persistedData.profileKeyHex &&
|
||||||
|
j.profilePictureUrl === this.persistedData.profilePictureUrl
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (hasSameJob) {
|
||||||
|
return 'skipAddSameJobPresent';
|
||||||
|
}
|
||||||
if (this.nonRunningJobsToRemove(jobs).length) {
|
if (this.nonRunningJobsToRemove(jobs).length) {
|
||||||
return 'removeJobsFromQueue';
|
return 'removeJobsFromQueue';
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const AvatarDownload = {
|
||||||
|
AvatarDownloadJob,
|
||||||
|
addAvatarDownloadJobIfNeeded,
|
||||||
|
};
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
import { v4 } from 'uuid';
|
import { v4 } from 'uuid';
|
||||||
import { sleepFor } from '../../Promise';
|
import { sleepFor } from '../../Promise';
|
||||||
import { ConfigurationSyncPersistedData, PersistedJob } from '../PersistedJob';
|
import {
|
||||||
|
AddJobCheckReturn,
|
||||||
|
ConfigurationSyncPersistedData,
|
||||||
|
PersistedJob,
|
||||||
|
RunJobResult,
|
||||||
|
} from '../PersistedJob';
|
||||||
|
|
||||||
const defaultMsBetweenRetries = 3000;
|
const defaultMsBetweenRetries = 3000;
|
||||||
|
|
||||||
|
@ -22,7 +27,7 @@ export class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersiste
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async run() {
|
public async run(): Promise<RunJobResult> {
|
||||||
// blablha do everything from the notion page, and if success, return true.
|
// blablha do everything from the notion page, and if success, return true.
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" `
|
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" `
|
||||||
|
@ -33,7 +38,7 @@ export class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersiste
|
||||||
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done and returning failed `
|
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done and returning failed `
|
||||||
);
|
);
|
||||||
|
|
||||||
return false;
|
return RunJobResult.RetryJobIfPossible;
|
||||||
}
|
}
|
||||||
|
|
||||||
public serializeJob(): ConfigurationSyncPersistedData {
|
public serializeJob(): ConfigurationSyncPersistedData {
|
||||||
|
@ -41,9 +46,7 @@ export class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersiste
|
||||||
return fromParent;
|
return fromParent;
|
||||||
}
|
}
|
||||||
|
|
||||||
public addJobCheck(
|
public addJobCheck(jobs: Array<ConfigurationSyncPersistedData>): AddJobCheckReturn {
|
||||||
jobs: Array<ConfigurationSyncPersistedData>
|
|
||||||
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
|
|
||||||
return this.addJobCheckSameTypePresent(jobs);
|
return this.addJobCheckSameTypePresent(jobs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,9 +2,11 @@ import { isNumber } from 'lodash';
|
||||||
import { v4 } from 'uuid';
|
import { v4 } from 'uuid';
|
||||||
import { sleepFor } from '../../../../../session/utils/Promise';
|
import { sleepFor } from '../../../../../session/utils/Promise';
|
||||||
import {
|
import {
|
||||||
|
AddJobCheckReturn,
|
||||||
FakeSleepForMultiJobData,
|
FakeSleepForMultiJobData,
|
||||||
FakeSleepJobData,
|
FakeSleepJobData,
|
||||||
PersistedJob,
|
PersistedJob,
|
||||||
|
RunJobResult,
|
||||||
} from '../../../../../session/utils/job_runners/PersistedJob';
|
} from '../../../../../session/utils/job_runners/PersistedJob';
|
||||||
|
|
||||||
export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData> {
|
export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData> {
|
||||||
|
@ -34,7 +36,7 @@ export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async run() {
|
public async run(): Promise<RunJobResult> {
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}". sleeping for ${this.persistedData.sleepDuration} & returning ${this.persistedData.returnResult} `
|
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}". sleeping for ${this.persistedData.sleepDuration} & returning ${this.persistedData.returnResult} `
|
||||||
);
|
);
|
||||||
|
@ -42,7 +44,10 @@ export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData>
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
`${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning success `
|
`${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning success `
|
||||||
);
|
);
|
||||||
return this.persistedData.returnResult;
|
if (this.persistedData.returnResult) {
|
||||||
|
return RunJobResult.Success;
|
||||||
|
}
|
||||||
|
return RunJobResult.RetryJobIfPossible;
|
||||||
}
|
}
|
||||||
|
|
||||||
public serializeJob(): FakeSleepForMultiJobData {
|
public serializeJob(): FakeSleepForMultiJobData {
|
||||||
|
@ -52,9 +57,7 @@ export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData>
|
||||||
/**
|
/**
|
||||||
* For the fakesleep for multi, we want to allow as many job as we want, so this returns null
|
* For the fakesleep for multi, we want to allow as many job as we want, so this returns null
|
||||||
*/
|
*/
|
||||||
public addJobCheck(
|
public addJobCheck(_jobs: Array<FakeSleepForMultiJobData>): AddJobCheckReturn {
|
||||||
_jobs: Array<FakeSleepForMultiJobData>
|
|
||||||
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,7 +92,7 @@ export class FakeSleepForJob extends PersistedJob<FakeSleepJobData> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async run() {
|
public async run(): Promise<RunJobResult> {
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" `
|
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" `
|
||||||
);
|
);
|
||||||
|
@ -97,16 +100,14 @@ export class FakeSleepForJob extends PersistedJob<FakeSleepJobData> {
|
||||||
window.log.warn(
|
window.log.warn(
|
||||||
`${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning failed `
|
`${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning failed `
|
||||||
);
|
);
|
||||||
return false;
|
return RunJobResult.RetryJobIfPossible;
|
||||||
}
|
}
|
||||||
|
|
||||||
public serializeJob(): FakeSleepJobData {
|
public serializeJob(): FakeSleepJobData {
|
||||||
return super.serializeBase();
|
return super.serializeBase();
|
||||||
}
|
}
|
||||||
|
|
||||||
public addJobCheck(
|
public addJobCheck(jobs: Array<FakeSleepJobData>): AddJobCheckReturn {
|
||||||
jobs: Array<FakeSleepJobData>
|
|
||||||
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
|
|
||||||
return this.addJobCheckSameTypePresent(jobs);
|
return this.addJobCheckSameTypePresent(jobs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,15 +2,11 @@ import { expect } from 'chai';
|
||||||
import _, { isUndefined } from 'lodash';
|
import _, { isUndefined } from 'lodash';
|
||||||
import Sinon from 'sinon';
|
import Sinon from 'sinon';
|
||||||
import { v4 } from 'uuid';
|
import { v4 } from 'uuid';
|
||||||
import {
|
import { PersistedJobRunner } from '../../../../../session/utils/job_runners/JobRunner';
|
||||||
JobEventListener,
|
|
||||||
PersistedJobRunner,
|
|
||||||
} from '../../../../../session/utils/job_runners/JobRunner';
|
|
||||||
import { FakeSleepForJob, FakeSleepForMultiJob } from './FakeSleepForJob';
|
import { FakeSleepForJob, FakeSleepForMultiJob } from './FakeSleepForJob';
|
||||||
import {
|
import {
|
||||||
FakeSleepForMultiJobData,
|
FakeSleepForMultiJobData,
|
||||||
FakeSleepJobData,
|
FakeSleepJobData,
|
||||||
TypeOfPersistedData,
|
|
||||||
} from '../../../../../session/utils/job_runners/PersistedJob';
|
} from '../../../../../session/utils/job_runners/PersistedJob';
|
||||||
import { sleepFor } from '../../../../../session/utils/Promise';
|
import { sleepFor } from '../../../../../session/utils/Promise';
|
||||||
import { stubData } from '../../../../test-utils/utils';
|
import { stubData } from '../../../../test-utils/utils';
|
||||||
|
@ -55,31 +51,13 @@ describe('JobRunner', () => {
|
||||||
let clock: Sinon.SinonFakeTimers;
|
let clock: Sinon.SinonFakeTimers;
|
||||||
let runner: PersistedJobRunner<FakeSleepJobData>;
|
let runner: PersistedJobRunner<FakeSleepJobData>;
|
||||||
let runnerMulti: PersistedJobRunner<FakeSleepForMultiJobData>;
|
let runnerMulti: PersistedJobRunner<FakeSleepForMultiJobData>;
|
||||||
let jobEventsListener: JobEventListener;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
getItemById = stubData('getItemById');
|
getItemById = stubData('getItemById');
|
||||||
stubData('createOrUpdateItem');
|
stubData('createOrUpdateItem');
|
||||||
clock = Sinon.useFakeTimers({ shouldAdvanceTime: true });
|
clock = Sinon.useFakeTimers({ shouldAdvanceTime: true });
|
||||||
jobEventsListener = {
|
runner = new PersistedJobRunner<FakeSleepJobData>('FakeSleepForJob', null);
|
||||||
onJobDeferred: (_job: TypeOfPersistedData) => {
|
runnerMulti = new PersistedJobRunner<FakeSleepForMultiJobData>('FakeSleepForMultiJob', null);
|
||||||
// window.log.warn('listener got deferred for job ', job);
|
|
||||||
},
|
|
||||||
onJobSuccess: (_job: TypeOfPersistedData) => {
|
|
||||||
// window.log.warn('listener got success for job ', job);
|
|
||||||
},
|
|
||||||
onJobError: (_job: TypeOfPersistedData) => {
|
|
||||||
// window.log.warn('listener got error for job ', job);
|
|
||||||
},
|
|
||||||
onJobStarted: (_job: TypeOfPersistedData) => {
|
|
||||||
// window.log.warn('listener got started for job ', job);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
runner = new PersistedJobRunner<FakeSleepJobData>('FakeSleepForJob', jobEventsListener);
|
|
||||||
runnerMulti = new PersistedJobRunner<FakeSleepForMultiJobData>(
|
|
||||||
'FakeSleepForMultiJob',
|
|
||||||
jobEventsListener
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
|
@ -95,8 +73,9 @@ describe('JobRunner', () => {
|
||||||
id: '',
|
id: '',
|
||||||
value: JSON.stringify([]),
|
value: JSON.stringify([]),
|
||||||
});
|
});
|
||||||
|
const job = getFakeSleepForJob(123);
|
||||||
|
|
||||||
await runner.loadJobsFromDb();
|
await runner.addJob(job);
|
||||||
throw new Error('PLOP'); // the line above should throw something else
|
throw new Error('PLOP'); // the line above should throw something else
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
expect(e.message).to.not.eq('PLOP');
|
expect(e.message).to.not.eq('PLOP');
|
||||||
|
|
Loading…
Reference in a new issue