Compare commits
2 Commits
98fd834367
...
5cfbb8405c
Author | SHA1 | Date |
---|---|---|
Audric Ackermann | 5cfbb8405c | |
Audric Ackermann | 82c6f0897b |
|
@ -127,7 +127,7 @@ export const ReadableMessage = (props: ReadableMessageProps) => {
|
|||
dispatch(showScrollToBottomButton(false));
|
||||
getConversationController()
|
||||
.get(selectedConversationKey)
|
||||
?.markConversationRead(receivedAt || 0); // TODOLATER this should be `sentAt || serverTimestamp` I think
|
||||
?.markConversationRead({ newestUnreadDate: receivedAt || 0, fromConfigMessage: false }); // TODOLATER this should be `sentAt || serverTimestamp` I think
|
||||
|
||||
dispatch(markConversationFullyRead(selectedConversationKey));
|
||||
} else if (inView === false) {
|
||||
|
@ -172,7 +172,7 @@ export const ReadableMessage = (props: ReadableMessageProps) => {
|
|||
if (foundSentAt) {
|
||||
getConversationController()
|
||||
.get(selectedConversationKey)
|
||||
?.markConversationRead(foundSentAt, Date.now());
|
||||
?.markConversationRead({ newestUnreadDate: foundSentAt, fromConfigMessage: false });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
// eslint:disable: no-require-imports no-var-requires one-variable-per-declaration no-void-expression function-name
|
||||
|
||||
import _ from 'lodash';
|
||||
import _, { isEmpty } from 'lodash';
|
||||
import { MessageResultProps } from '../components/search/MessageSearchResults';
|
||||
import { ConversationModel } from '../models/conversation';
|
||||
import { ConversationAttributes } from '../models/conversationAttributes';
|
||||
|
@ -307,6 +307,14 @@ async function getMessageById(
|
|||
return new MessageModel(message);
|
||||
}
|
||||
|
||||
async function getMessagesById(ids: Array<string>): Promise<Array<MessageModel>> {
|
||||
const messages = await channels.getMessagesById(ids);
|
||||
if (!messages || isEmpty(messages)) {
|
||||
return [];
|
||||
}
|
||||
return messages.map((msg: any) => new MessageModel(msg));
|
||||
}
|
||||
|
||||
async function getMessageByServerId(
|
||||
conversationId: string,
|
||||
serverId: number,
|
||||
|
@ -361,12 +369,12 @@ async function getUnreadByConversation(
|
|||
async function getUnreadDisappearingByConversation(
|
||||
conversationId: string,
|
||||
sentBeforeTimestamp: number
|
||||
): Promise<MessageCollection> {
|
||||
): Promise<Array<MessageModel>> {
|
||||
const messages = await channels.getUnreadDisappearingByConversation(
|
||||
conversationId,
|
||||
sentBeforeTimestamp
|
||||
);
|
||||
return new MessageCollection(messages);
|
||||
return new MessageCollection(messages).models;
|
||||
}
|
||||
|
||||
async function markAllAsReadByConversationNoExpiration(
|
||||
|
@ -816,6 +824,7 @@ export const Data = {
|
|||
cleanUpExpirationTimerUpdateHistory,
|
||||
getMessageIdsFromServerIds,
|
||||
getMessageById,
|
||||
getMessagesById,
|
||||
getMessagesBySenderAndSentAt,
|
||||
getMessageByServerId,
|
||||
filterAlreadyFetchedOpengroupMessage,
|
||||
|
|
|
@ -53,6 +53,7 @@ const channelsToMake = new Set([
|
|||
'getMessagesBySenderAndSentAt',
|
||||
'getMessageIdsFromServerIds',
|
||||
'getMessageById',
|
||||
'getMessagesById',
|
||||
'getMessagesBySentAt',
|
||||
'getMessageByServerId',
|
||||
'getExpiredMessages',
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
import { compact } from 'lodash';
|
||||
import { SessionButtonColor } from '../../components/basic/SessionButton';
|
||||
import { Data } from '../../data/data';
|
||||
import { ConversationModel } from '../../models/conversation';
|
||||
import { MessageModel } from '../../models/message';
|
||||
import { getMessageQueue } from '../../session';
|
||||
import { deleteSogsMessageByServerIds } from '../../session/apis/open_group_api/sogsv3/sogsV3DeleteMessages';
|
||||
import { SnodeAPI } from '../../session/apis/snode_api/SNodeAPI';
|
||||
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
|
||||
import { getConversationController } from '../../session/conversations';
|
||||
import { UnsendMessage } from '../../session/messages/outgoing/controlMessage/UnsendMessage';
|
||||
import { ed25519Str } from '../../session/onions/onionPath';
|
||||
import { SnodeAPI } from '../../session/apis/snode_api/SNodeAPI';
|
||||
import { PubKey } from '../../session/types';
|
||||
import { ToastUtils, UserUtils } from '../../session/utils';
|
||||
import { resetSelectedMessageIds } from '../../state/ducks/conversations';
|
||||
import { updateConfirmModal } from '../../state/ducks/modalDialog';
|
||||
import { SessionButtonColor } from '../../components/basic/SessionButton';
|
||||
import { deleteSogsMessageByServerIds } from '../../session/apis/open_group_api/sogsv3/sogsV3DeleteMessages';
|
||||
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
|
||||
|
||||
/**
|
||||
* Deletes messages for everyone in a 1-1 or everyone in a closed group conversation.
|
||||
|
@ -204,7 +204,6 @@ export async function deleteMessageLocallyOnly({
|
|||
} else {
|
||||
// just mark the message as deleted but still show in conversation
|
||||
await message.markAsDeleted();
|
||||
await message.markMessageAsRead(Date.now());
|
||||
}
|
||||
conversation.updateLastMessage();
|
||||
}
|
||||
|
|
|
@ -116,6 +116,10 @@ async function startJobRunners() {
|
|||
runners.avatarDownloadRunner.startProcessing();
|
||||
await runners.configurationSyncRunner.loadJobsFromDb();
|
||||
runners.configurationSyncRunner.startProcessing();
|
||||
await runners.updateMsgExpiryRunner.loadJobsFromDb();
|
||||
runners.updateMsgExpiryRunner.startProcessing();
|
||||
await runners.fetchSwarmMsgExpiryRunner.loadJobsFromDb();
|
||||
runners.fetchSwarmMsgExpiryRunner.startProcessing();
|
||||
}
|
||||
|
||||
// We need this 'first' check because we don't want to start the app up any other time
|
||||
|
|
|
@ -47,7 +47,6 @@ import {
|
|||
actions as conversationActions,
|
||||
conversationsChanged,
|
||||
markConversationFullyRead,
|
||||
MessageModelPropsWithoutConvoProps,
|
||||
messagesDeleted,
|
||||
ReduxConversationType,
|
||||
} from '../state/ducks/conversations';
|
||||
|
@ -120,6 +119,8 @@ import {
|
|||
|
||||
import { DisappearingMessages } from '../session/disappearing_messages';
|
||||
import { DisappearingMessageConversationModeType } from '../session/disappearing_messages/types';
|
||||
import { FetchMsgExpirySwarm } from '../session/utils/job_runners/jobs/FetchMsgExpirySwarmJob';
|
||||
import { UpdateMsgExpirySwarm } from '../session/utils/job_runners/jobs/UpdateMsgExpirySwarmJob';
|
||||
import { ReleasedFeatures } from '../util/releaseFeature';
|
||||
import { markAttributesAsReadIfNeeded } from './messageFactory';
|
||||
|
||||
|
@ -138,7 +139,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
|
|||
public updateLastMessage: () => any;
|
||||
public throttledBumpTyping: () => void;
|
||||
public throttledNotify: (message: MessageModel) => void;
|
||||
public markConversationRead: (newestUnreadDate: number, readAt?: number) => void;
|
||||
public markConversationRead: (opts: {
|
||||
newestUnreadDate: number;
|
||||
fromConfigMessage?: boolean;
|
||||
}) => void;
|
||||
public initialPromise: any;
|
||||
|
||||
private typingRefreshTimer?: NodeJS.Timeout | null;
|
||||
|
@ -1143,7 +1147,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
|
|||
}
|
||||
|
||||
// otherwise, do it the slow and expensive way
|
||||
await this.markConversationReadBouncy(Date.now());
|
||||
await this.markConversationReadBouncy({ newestUnreadDate: Date.now() });
|
||||
}
|
||||
|
||||
public getUsInThatConversation() {
|
||||
|
@ -1814,7 +1818,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
|
|||
* This call is not debounced and can be quite heavy, so only call it when handling config messages updates
|
||||
*/
|
||||
public async markReadFromConfigMessage(newestUnreadDate: number) {
|
||||
return this.markConversationReadBouncy(newestUnreadDate);
|
||||
return this.markConversationReadBouncy({ newestUnreadDate, fromConfigMessage: true });
|
||||
}
|
||||
|
||||
private async sendMessageJob(message: MessageModel) {
|
||||
|
@ -2066,7 +2070,14 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
|
|||
}
|
||||
}
|
||||
|
||||
private async markConversationReadBouncy(newestUnreadDate: number, readAt: number = Date.now()) {
|
||||
private async markConversationReadBouncy({
|
||||
newestUnreadDate,
|
||||
fromConfigMessage = false,
|
||||
}: {
|
||||
newestUnreadDate: number;
|
||||
fromConfigMessage?: boolean;
|
||||
}) {
|
||||
const readAt = Date.now();
|
||||
const conversationId = this.id;
|
||||
Notifications.clearByConversationID(conversationId);
|
||||
|
||||
|
@ -2079,32 +2090,39 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
|
|||
|
||||
// Build the list of updated message models so we can mark them all as read on a single sqlite call
|
||||
const readDetails = [];
|
||||
const msgsIdsToUpdateExpireOnSwarm: Array<string> = [];
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
for (const nowRead of oldUnreadNowRead) {
|
||||
nowRead.markMessageReadNoCommit(readAt);
|
||||
const shouldUpdateSwarmExpiry = nowRead.markMessageReadNoCommit(readAt);
|
||||
if (shouldUpdateSwarmExpiry) {
|
||||
msgsIdsToUpdateExpireOnSwarm.push(nowRead.get('id') as string);
|
||||
}
|
||||
|
||||
const validTimestamp = nowRead.get('sent_at') || nowRead.get('serverTimestamp');
|
||||
if (nowRead.get('source') && validTimestamp && isFinite(validTimestamp)) {
|
||||
const sentAt = nowRead.get('sent_at') || nowRead.get('serverTimestamp');
|
||||
if (nowRead.get('source') && sentAt && isFinite(sentAt)) {
|
||||
readDetails.push({
|
||||
sender: nowRead.get('source'),
|
||||
timestamp: validTimestamp,
|
||||
timestamp: sentAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
const oldUnreadNowReadAttrs = oldUnreadNowRead.map(m => m.attributes);
|
||||
if (oldUnreadNowReadAttrs?.length) {
|
||||
await Data.saveMessages(oldUnreadNowReadAttrs);
|
||||
}
|
||||
const allProps: Array<MessageModelPropsWithoutConvoProps> = [];
|
||||
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
for (const nowRead of oldUnreadNowRead) {
|
||||
allProps.push(nowRead.getMessageModelProps());
|
||||
}
|
||||
|
||||
if (allProps.length) {
|
||||
window.inboxStore?.dispatch(conversationActions.messagesChanged(allProps));
|
||||
if (!isEmpty(msgsIdsToUpdateExpireOnSwarm)) {
|
||||
if (fromConfigMessage) {
|
||||
// when we mark a message as read through a convo volatile update,
|
||||
// it means those messages have already an up to date expiry on the server side
|
||||
// so we can just fetch those expiries for all the hashes we are marking as read, and trust it.
|
||||
await FetchMsgExpirySwarm.queueNewJobIfNeeded(msgsIdsToUpdateExpireOnSwarm);
|
||||
} else {
|
||||
await UpdateMsgExpirySwarm.queueNewJobIfNeeded(msgsIdsToUpdateExpireOnSwarm);
|
||||
}
|
||||
}
|
||||
// save all the attributes in a single call
|
||||
await Data.saveMessages(oldUnreadNowRead.map(m => m.attributes));
|
||||
// trigger all the ui updates in a single call
|
||||
window.inboxStore?.dispatch(
|
||||
conversationActions.messagesChanged(oldUnreadNowRead.map(m => m.getMessageModelProps()))
|
||||
);
|
||||
|
||||
await this.commit();
|
||||
|
||||
|
@ -2527,6 +2545,7 @@ async function cleanUpExpireHistoryFromConvo(conversationId: string, isPrivate:
|
|||
conversationId,
|
||||
isPrivate
|
||||
);
|
||||
console.warn('cleanUpExpirationTimerUpdateHistory', conversationId, isPrivate, updateIdsRemoved);
|
||||
|
||||
window.inboxStore.dispatch(
|
||||
messagesDeleted(updateIdsRemoved.map(m => ({ conversationKey: conversationId, messageId: m })))
|
||||
|
|
|
@ -539,8 +539,8 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
props.isDeleted = this.get('isDeleted');
|
||||
}
|
||||
|
||||
if (this.get('messageHash')) {
|
||||
props.messageHash = this.get('messageHash');
|
||||
if (this.getMessageHash()) {
|
||||
props.messageHash = this.getMessageHash();
|
||||
}
|
||||
if (this.get('received_at')) {
|
||||
props.receivedAt = this.get('received_at');
|
||||
|
@ -824,8 +824,14 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
reacts: undefined,
|
||||
reactsIndex: undefined,
|
||||
});
|
||||
await this.markMessageAsRead(Date.now());
|
||||
// we can ignore the result of that markMessageReadNoCommit as it would only be used
|
||||
// to refresh the expiry of it(but it is already marked as "deleted", so we don't care)
|
||||
this.markMessageReadNoCommit(Date.now());
|
||||
await this.commit();
|
||||
// the line below makes sure that getNextExpiringMessage will find this message as expiring.
|
||||
// getNextExpiringMessage is used on app start to clean already expired messages which should have been removed already, but are not
|
||||
await this.setToExpire();
|
||||
await this.getConversation()?.refreshInMemoryDetails();
|
||||
}
|
||||
|
||||
// One caller today: event handler for the 'Retry Send' entry on right click of a failed send message
|
||||
|
@ -1096,17 +1102,16 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
return id;
|
||||
}
|
||||
|
||||
public async markMessageAsRead(readAt: number) {
|
||||
this.markMessageReadNoCommit(readAt);
|
||||
await this.commit();
|
||||
// the line below makes sure that getNextExpiringMessage will find this message as expiring.
|
||||
// getNextExpiringMessage is used on app start to clean already expired messages which should have been removed already, but are not
|
||||
await this.setToExpire();
|
||||
/**
|
||||
* Mark a message as read if it was not already read.
|
||||
* @param readAt the timestamp at which this message was read
|
||||
* @returns true if the message was marked as read, and if its expiry should be updated on the swarm, false otherwise
|
||||
*/
|
||||
public markMessageReadNoCommit(readAt: number): boolean {
|
||||
if (!this.isUnread()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await this.getConversation()?.refreshInMemoryDetails();
|
||||
}
|
||||
|
||||
public markMessageReadNoCommit(readAt: number) {
|
||||
this.set({ unread: READ_MESSAGE_STATE.read });
|
||||
|
||||
const convo = this.getConversation();
|
||||
|
@ -1123,27 +1128,28 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
|
||||
if (expirationMode === 'legacy' || expirationMode === 'deleteAfterRead') {
|
||||
if (this.isIncoming() && !this.isExpiring()) {
|
||||
// NOTE We want to trigger disappearing now and then the TTL can update itself while it is running. Otherwise the UI is blocked until the request is completed.
|
||||
void DisappearingMessages.updateMessageExpiryOnSwarm(
|
||||
this,
|
||||
'markMessageReadNoCommit()',
|
||||
true
|
||||
);
|
||||
}
|
||||
if (!this.getExpirationStartTimestamp()) {
|
||||
this.set({
|
||||
expirationStartTimestamp: DisappearingMessages.setExpirationStartTimestamp(
|
||||
expirationMode,
|
||||
readAt,
|
||||
'markMessageReadNoCommit',
|
||||
this.get('id')
|
||||
),
|
||||
});
|
||||
// only if that message has not started to expire already, set its "start expiry".
|
||||
// this is because a message can have a expire start timestamp set when receiving it, if the convo volatile said that the message was read by another device.
|
||||
if (!this.getExpirationStartTimestamp()) {
|
||||
this.set({
|
||||
expirationStartTimestamp: DisappearingMessages.setExpirationStartTimestamp(
|
||||
expirationMode,
|
||||
readAt,
|
||||
'markMessageReadNoCommit',
|
||||
this.get('id')
|
||||
),
|
||||
});
|
||||
// return true, we want to update/refresh the real expiry of this message from the swarm
|
||||
return true;
|
||||
}
|
||||
// return true, we want to update/refresh the real expiry of this message from the swarm
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Notifications.clearByMessageId(this.id);
|
||||
return false;
|
||||
}
|
||||
|
||||
public isExpiring() {
|
||||
|
@ -1151,26 +1157,18 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
}
|
||||
|
||||
public isExpired() {
|
||||
return this.msTilExpire() <= 0;
|
||||
}
|
||||
|
||||
public msTilExpire() {
|
||||
if (!this.isExpiring()) {
|
||||
return Infinity;
|
||||
return false;
|
||||
}
|
||||
const now = Date.now();
|
||||
const start = this.getExpirationStartTimestamp();
|
||||
if (!start) {
|
||||
return Infinity;
|
||||
return false;
|
||||
}
|
||||
const delta = this.getExpireTimer() * 1000;
|
||||
let msFromNow = start + delta - now;
|
||||
if (msFromNow < 0) {
|
||||
msFromNow = 0;
|
||||
}
|
||||
return msFromNow;
|
||||
const msFromNow = start + delta - now;
|
||||
return msFromNow < 0;
|
||||
}
|
||||
|
||||
public async setToExpire() {
|
||||
if (this.isExpiring() && !this.getExpiresAt()) {
|
||||
const start = this.getExpirationStartTimestamp();
|
||||
|
@ -1185,8 +1183,8 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
this.set({
|
||||
expires_at: expiresAt,
|
||||
});
|
||||
const id = this.get('id');
|
||||
if (id) {
|
||||
|
||||
if (this.get('id')) {
|
||||
await this.commit();
|
||||
}
|
||||
|
||||
|
@ -1378,6 +1376,10 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
|
|||
return this.get('expires_at');
|
||||
}
|
||||
|
||||
public getMessageHash() {
|
||||
return this.get('messageHash');
|
||||
}
|
||||
|
||||
public getExpirationTimerUpdate() {
|
||||
return this.get('expirationTimerUpdate');
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ export interface MessageAttributes {
|
|||
expirationType?: DisappearingMessageType;
|
||||
/** in seconds, 0 means no expiration */
|
||||
expireTimer: number;
|
||||
/** in milliseconds */
|
||||
expirationStartTimestamp: number;
|
||||
expires_at?: number;
|
||||
expirationTimerUpdate?: ExpirationTimerUpdate;
|
||||
|
|
|
@ -1091,6 +1091,19 @@ function getMessageById(id: string) {
|
|||
return jsonToObject(row.json);
|
||||
}
|
||||
|
||||
function getMessagesById(ids: Array<string>) {
|
||||
if (!isArray(ids)) {
|
||||
throw new Error('getMessagesById expect an array of strings');
|
||||
}
|
||||
const rows = assertGlobalInstance()
|
||||
.prepare(`SELECT json FROM ${MESSAGES_TABLE} WHERE id IN ( ${ids.map(() => '?').join(', ')} );`)
|
||||
.all(ids);
|
||||
if (!rows || isEmpty(rows)) {
|
||||
return null;
|
||||
}
|
||||
return map(rows, row => jsonToObject(row.json));
|
||||
}
|
||||
|
||||
// serverIds are not unique so we need the conversationId
|
||||
function getMessageByServerId(conversationId: string, serverId: number) {
|
||||
const row = assertGlobalInstance()
|
||||
|
@ -2469,6 +2482,7 @@ export const sqlNode = {
|
|||
getMessagesBySenderAndSentAt,
|
||||
getMessageIdsFromServerIds,
|
||||
getMessageById,
|
||||
getMessagesById,
|
||||
getMessagesBySentAt,
|
||||
getMessageByServerId,
|
||||
getSeenMessagesByHashList,
|
||||
|
|
|
@ -17,7 +17,6 @@ import { getOpenGroupManager } from '../session/apis/open_group_api/opengroupV2/
|
|||
import { OpenGroupUtils } from '../session/apis/open_group_api/utils';
|
||||
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
|
||||
import { getSwarmPollingInstance } from '../session/apis/snode_api';
|
||||
import { getExpiriesFromSnode } from '../session/apis/snode_api/getExpiriesRequest';
|
||||
import { getConversationController } from '../session/conversations';
|
||||
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
|
||||
import { Profile, ProfileManager } from '../session/profile_manager/ProfileManager';
|
||||
|
@ -42,6 +41,7 @@ import {
|
|||
setLastProfileUpdateTimestamp,
|
||||
} from '../util/storage';
|
||||
// eslint-disable-next-line import/no-unresolved, import/extensions
|
||||
import { FetchMsgExpirySwarm } from '../session/utils/job_runners/jobs/FetchMsgExpirySwarmJob';
|
||||
import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions';
|
||||
import {
|
||||
ContactsWrapperActions,
|
||||
|
@ -700,55 +700,25 @@ async function applyConvoVolatileUpdateFromWrapper(
|
|||
}
|
||||
|
||||
try {
|
||||
const canBeDeleteAfterRead = foundConvo && !foundConvo.isMe() && foundConvo.isPrivate();
|
||||
// TODO legacy messages support will be removed in a future release
|
||||
if (
|
||||
canBeDeleteAfterRead &&
|
||||
(foundConvo.getExpirationMode() === 'deleteAfterRead' ||
|
||||
foundConvo.getExpirationMode() === 'legacy') &&
|
||||
foundConvo.getExpireTimer() > 0
|
||||
) {
|
||||
const messages2Expire = await Data.getUnreadDisappearingByConversation(
|
||||
if (foundConvo.isPrivate() && !foundConvo.isMe() && foundConvo.getExpireTimer() > 0) {
|
||||
const messagesExpiring = await Data.getUnreadDisappearingByConversation(
|
||||
convoId,
|
||||
lastReadMessageTimestamp
|
||||
);
|
||||
|
||||
if (messages2Expire.length) {
|
||||
const messageHashes = compact(
|
||||
messages2Expire
|
||||
.filter(
|
||||
m =>
|
||||
m.getExpirationType() !== undefined &&
|
||||
m.getExpirationType() !== 'deleteAfterSend' &&
|
||||
m.getExpireTimer() > 0
|
||||
)
|
||||
.map(m => m.get('messageHash'))
|
||||
);
|
||||
const currentExpiryTimestamps = await getExpiriesFromSnode({
|
||||
messageHashes,
|
||||
timestamp: lastReadMessageTimestamp,
|
||||
});
|
||||
const messagesExpiringAfterRead = messagesExpiring.filter(
|
||||
m => m.getExpirationType() === 'deleteAfterRead' && m.getExpireTimer() > 0
|
||||
);
|
||||
|
||||
if (currentExpiryTimestamps.length) {
|
||||
for (let index = 0; index < messages2Expire.length; index++) {
|
||||
if (currentExpiryTimestamps[index] === -1) {
|
||||
window.log.debug(
|
||||
`[applyConvoVolatileUpdateFromWrapper] invalid expiry value returned from snode. We will keep the local value of ${messages2Expire
|
||||
.get(index)
|
||||
.get('id')}.\nmessageHash: ${messageHashes[index]},`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
messages2Expire.at(index).set('expires_at', currentExpiryTimestamps[index]);
|
||||
}
|
||||
}
|
||||
const messageIdsToFetchExpiriesFor = compact(messagesExpiringAfterRead.map(m => m.id));
|
||||
|
||||
if (messageIdsToFetchExpiriesFor.length) {
|
||||
await FetchMsgExpirySwarm.queueNewJobIfNeeded(messageIdsToFetchExpiriesFor);
|
||||
}
|
||||
}
|
||||
|
||||
// window.log.debug(
|
||||
// `applyConvoVolatileUpdateFromWrapper: ${convoId}: forcedUnread:${forcedUnread}, lastReadMessage:${lastReadMessageTimestamp}`
|
||||
// );
|
||||
// this should mark all the messages sent before fromWrapper.lastRead as read and update the unreadCount
|
||||
// this mark all the messages sent before fromWrapper.lastRead as read and update the unreadCount
|
||||
await foundConvo.markReadFromConfigMessage(lastReadMessageTimestamp);
|
||||
// this commits to the DB, if needed
|
||||
await foundConvo.markAsUnread(forcedUnread, true);
|
||||
|
|
|
@ -19,7 +19,6 @@ import { showMessageRequestBannerOutsideRedux } from '../state/ducks/userConfig'
|
|||
import { getHideMessageRequestBannerOutsideRedux } from '../state/selectors/userConfig';
|
||||
import { GoogleChrome } from '../util';
|
||||
import { LinkPreviews } from '../util/linkPreviews';
|
||||
import { ReleasedFeatures } from '../util/releaseFeature';
|
||||
|
||||
function contentTypeSupported(type: string): boolean {
|
||||
const Chrome = GoogleChrome;
|
||||
|
@ -174,14 +173,6 @@ async function processProfileKeyNoCommit(
|
|||
}
|
||||
}
|
||||
|
||||
function handleSyncedReceiptsNoCommit(message: MessageModel, conversation: ConversationModel) {
|
||||
// If the newly received message is from us, we assume that we've seen the messages up until that point
|
||||
const sentTimestamp = message.get('sent_at');
|
||||
if (sentTimestamp) {
|
||||
conversation.markConversationRead(sentTimestamp);
|
||||
}
|
||||
}
|
||||
|
||||
export type RegularMessageType = Pick<
|
||||
SignalService.DataMessage,
|
||||
| 'attachments'
|
||||
|
@ -285,19 +276,7 @@ async function handleRegularMessage(
|
|||
// should only occur after isOutgoing request as it relies on didApproveMe being false.
|
||||
await conversation.setDidApproveMe(true);
|
||||
}
|
||||
} else if (type === 'outgoing') {
|
||||
const userConfigLibsession = await ReleasedFeatures.checkIsUserConfigFeatureReleased();
|
||||
|
||||
if (!userConfigLibsession) {
|
||||
// we want to do this for all types of conversations, not just private chats
|
||||
handleSyncedReceiptsNoCommit(message, conversation);
|
||||
|
||||
if (conversation.isPrivate()) {
|
||||
await conversation.setIsApproved(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const conversationActiveAt = conversation.get('active_at');
|
||||
if (
|
||||
!conversationActiveAt ||
|
||||
|
@ -362,7 +341,10 @@ async function markConvoAsReadIfOutgoingMessage(
|
|||
await message.commit();
|
||||
}
|
||||
}
|
||||
conversation.markConversationRead(sentAt);
|
||||
conversation.markConversationRead({
|
||||
newestUnreadDate: sentAt,
|
||||
fromConfigMessage: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,3 +179,7 @@ export type NotEmptyArrayOfBatchResults = NonEmptyArray<{
|
|||
code: number;
|
||||
body: Record<string, any>;
|
||||
}>;
|
||||
|
||||
export type WithShortenOrExtend = { shortenOrExtend: 'shorten' | 'extend' | '' };
|
||||
|
||||
export const MAX_SUBREQUESTS_COUNT = 20;
|
||||
|
|
|
@ -2,7 +2,11 @@ import { isArray } from 'lodash';
|
|||
import { Snode } from '../../../data/data';
|
||||
import { processOnionRequestErrorAtDestination, SnodeResponse } from './onions';
|
||||
import { snodeRpc } from './sessionRpc';
|
||||
import { NotEmptyArrayOfBatchResults, SnodeApiSubRequests } from './SnodeRequestTypes';
|
||||
import {
|
||||
MAX_SUBREQUESTS_COUNT,
|
||||
NotEmptyArrayOfBatchResults,
|
||||
SnodeApiSubRequests,
|
||||
} from './SnodeRequestTypes';
|
||||
|
||||
/**
|
||||
* This is the equivalent to the batch send on sogs. The target node runs each sub request and returns a list of all the sub status and bodies.
|
||||
|
@ -21,6 +25,14 @@ export async function doSnodeBatchRequest(
|
|||
associatedWith: string | null,
|
||||
method: 'batch' | 'sequence' = 'batch'
|
||||
): Promise<NotEmptyArrayOfBatchResults> {
|
||||
if (subRequests.length > MAX_SUBREQUESTS_COUNT) {
|
||||
window.log.error(
|
||||
`batch subRequests count cannot be more than ${MAX_SUBREQUESTS_COUNT}. Got ${subRequests.length}`
|
||||
);
|
||||
throw new Error(
|
||||
`batch subRequests count cannot be more than ${MAX_SUBREQUESTS_COUNT}. Got ${subRequests.length}`
|
||||
);
|
||||
}
|
||||
const result = await snodeRpc({
|
||||
method,
|
||||
params: { requests: subRequests },
|
||||
|
|
|
@ -1,5 +1,15 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
import { isEmpty, sample } from 'lodash';
|
||||
import {
|
||||
chunk,
|
||||
compact,
|
||||
difference,
|
||||
flatten,
|
||||
isArray,
|
||||
isEmpty,
|
||||
isNumber,
|
||||
sample,
|
||||
uniqBy,
|
||||
} from 'lodash';
|
||||
import pRetry from 'p-retry';
|
||||
import { Snode } from '../../../data/data';
|
||||
import { getSodiumRenderer } from '../../crypto';
|
||||
|
@ -7,16 +17,19 @@ import { StringUtils, UserUtils } from '../../utils';
|
|||
import { fromBase64ToArray, fromHexToArray } from '../../utils/String';
|
||||
import { EmptySwarmError } from '../../utils/errors';
|
||||
import { SeedNodeAPI } from '../seed_node_api';
|
||||
import { UpdateExpiryOnNodeSubRequest } from './SnodeRequestTypes';
|
||||
import {
|
||||
MAX_SUBREQUESTS_COUNT,
|
||||
UpdateExpiryOnNodeSubRequest,
|
||||
WithShortenOrExtend,
|
||||
} from './SnodeRequestTypes';
|
||||
import { doSnodeBatchRequest } from './batchRequest';
|
||||
import { GetNetworkTime } from './getNetworkTime';
|
||||
import { getSwarmFor } from './snodePool';
|
||||
import { SnodeSignature } from './snodeSignatures';
|
||||
import { ExpireMessageResultItem, ExpireMessagesResultsContent } from './types';
|
||||
|
||||
export type verifyExpireMsgsResponseSignatureProps = ExpireMessageResultItem & {
|
||||
pubkey: string;
|
||||
snodePubkey: any;
|
||||
snodePubkey: string;
|
||||
messageHashes: Array<string>;
|
||||
};
|
||||
|
||||
|
@ -99,13 +112,14 @@ export async function processExpireRequestResponse(
|
|||
const signature = swarm[nodeKey].signature;
|
||||
|
||||
if (!updatedHashes || !expiry || !signature) {
|
||||
window.log.warn(
|
||||
`[processExpireRequestResponse] Missing arguments on ${
|
||||
targetNode.pubkey_ed25519
|
||||
} so we will ignore this result on (${nodeKey}) and move onto the next node.\n${JSON.stringify(
|
||||
swarm[nodeKey]
|
||||
)}`
|
||||
);
|
||||
// most likely just a timeout from one of the swarm members
|
||||
// window.log.debug(
|
||||
// `[processExpireRequestResponse] Missing arguments on ${
|
||||
// targetNode.pubkey_ed25519
|
||||
// } so we will ignore this result on (${nodeKey}) and move onto the next node.\n${JSON.stringify(
|
||||
// swarm[nodeKey]
|
||||
// )}`
|
||||
// );
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -134,24 +148,23 @@ export async function processExpireRequestResponse(
|
|||
return results;
|
||||
}
|
||||
|
||||
async function expireOnNodes(
|
||||
targetNode: Snode,
|
||||
expireRequest: UpdateExpiryOnNodeSubRequest
|
||||
): Promise<number> {
|
||||
try {
|
||||
const result = await doSnodeBatchRequest(
|
||||
[expireRequest],
|
||||
targetNode,
|
||||
4000,
|
||||
expireRequest.params.pubkey,
|
||||
'batch'
|
||||
);
|
||||
type UpdatedExpiryWithHashes = { messageHashes: Array<string>; updatedExpiryMs: number };
|
||||
type UpdatedExpiryWithHash = { messageHash: string; updatedExpiryMs: number };
|
||||
|
||||
if (!result || result.length !== 1) {
|
||||
async function updateExpiryOnNodes(
|
||||
targetNode: Snode,
|
||||
ourPubKey: string,
|
||||
expireRequests: Array<UpdateExpiryOnNodeSubRequest>
|
||||
): Promise<Array<UpdatedExpiryWithHash>> {
|
||||
try {
|
||||
const result = await doSnodeBatchRequest(expireRequests, targetNode, 4000, ourPubKey, 'batch');
|
||||
|
||||
if (!result || result.length !== expireRequests.length) {
|
||||
window.log.error(
|
||||
`There was an issue with the results. updateExpiryOnNodes ${targetNode.ip}:${targetNode.port}. expected length or results ${expireRequests.length} but got ${result.length}`
|
||||
);
|
||||
throw Error(
|
||||
`There was an issue with the results. sessionRpc ${targetNode.ip}:${
|
||||
targetNode.port
|
||||
} expireRequest ${JSON.stringify(expireRequest)}`
|
||||
`There was an issue with the results. updateExpiryOnNodes ${targetNode.ip}:${targetNode.port}`
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -163,30 +176,63 @@ async function expireOnNodes(
|
|||
throw Error(`result is not 200 but ${firstResult.code}`);
|
||||
}
|
||||
|
||||
const bodyFirstResult = firstResult.body;
|
||||
const expirationResults = await processExpireRequestResponse(
|
||||
expireRequest.params.pubkey,
|
||||
targetNode,
|
||||
bodyFirstResult.swarm as ExpireMessagesResultsContent,
|
||||
expireRequest.params.messages
|
||||
// Note: expirationResults is an array of `Map<snode pubkeys, {msgHashes,expiry}>` changed/unchanged which have a valid signature
|
||||
const expirationResults: Array<ExpireRequestResponseResults> = await Promise.all(
|
||||
expireRequests.map((request, index) => {
|
||||
const bodyIndex = result[index]?.body?.swarm;
|
||||
if (!bodyIndex || isEmpty(bodyIndex)) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return processExpireRequestResponse(
|
||||
ourPubKey,
|
||||
targetNode,
|
||||
bodyIndex as ExpireMessagesResultsContent,
|
||||
request.params.messages
|
||||
);
|
||||
})
|
||||
);
|
||||
const firstExpirationResult = Object.entries(expirationResults).at(0);
|
||||
if (!firstExpirationResult) {
|
||||
throw new Error('firstExpirationResult is null');
|
||||
|
||||
const changesValid: Array<UpdatedExpiryWithHashes> = [];
|
||||
// then we need to iterate over each subrequests result to find the snodes which reporting a valid update of the expiry
|
||||
|
||||
for (let index = 0; index < expirationResults.length; index++) {
|
||||
// the 0 gets the first snode of the swarm (they all report the same *sig verified* changes).
|
||||
// the 1 discard the snode_pk entry and access the request result (i.e. a record with hashes: [string] and the expiry)
|
||||
const expirationResult = Object.entries(expirationResults?.[index])?.[0]?.[1];
|
||||
if (
|
||||
!expirationResult ||
|
||||
isEmpty(expirationResult) ||
|
||||
!isArray(expirationResult.hashes) ||
|
||||
!isNumber(expirationResult.expiry)
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
changesValid.push({
|
||||
messageHashes: expirationResult.hashes,
|
||||
updatedExpiryMs: expirationResult.expiry,
|
||||
});
|
||||
}
|
||||
|
||||
const messageHash = firstExpirationResult[0];
|
||||
const expiry = firstExpirationResult[1].expiry;
|
||||
|
||||
if (!expiry || !messageHash) {
|
||||
throw new Error(
|
||||
`Something is wrong with the firstExpirationResult: ${JSON.stringify(
|
||||
JSON.stringify(firstExpirationResult)
|
||||
)}`
|
||||
);
|
||||
const hashesRequestedButNotInResults = difference(
|
||||
flatten(expireRequests.map(m => m.params.messages)),
|
||||
flatten(changesValid.map(c => c.messageHashes))
|
||||
);
|
||||
if (!isEmpty(hashesRequestedButNotInResults)) {
|
||||
// we requested hashes which are not part of the result. They most likely expired already so let's mark those messages as expiring now.
|
||||
changesValid.push({
|
||||
messageHashes: hashesRequestedButNotInResults,
|
||||
updatedExpiryMs: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
return expiry;
|
||||
const expiryWithIndividualHash: Array<UpdatedExpiryWithHash> = flatten(
|
||||
changesValid.map(change =>
|
||||
change.messageHashes.map(h => ({ messageHash: h, updatedExpiryMs: change.updatedExpiryMs }))
|
||||
)
|
||||
);
|
||||
window.log.debug('update expiry expiryWithIndividualHash: ', expiryWithIndividualHash);
|
||||
return expiryWithIndividualHash;
|
||||
} catch (err) {
|
||||
// NOTE batch requests have their own retry logic which includes abort errors that will break our retry logic so we need to catch them and throw regular errors
|
||||
if (err instanceof pRetry.AbortError) {
|
||||
|
@ -197,122 +243,198 @@ async function expireOnNodes(
|
|||
}
|
||||
}
|
||||
|
||||
export type ExpireMessageOnSnodeProps = {
|
||||
messageHash: string;
|
||||
expireTimer: number;
|
||||
extend?: boolean;
|
||||
shorten?: boolean;
|
||||
};
|
||||
export type ExpireMessageWithTimerOnSnodeProps = {
|
||||
messageHashes: Array<string>;
|
||||
expireTimerMs: number;
|
||||
readAt: number;
|
||||
} & WithShortenOrExtend;
|
||||
|
||||
export async function buildExpireRequest(
|
||||
props: ExpireMessageOnSnodeProps
|
||||
): Promise<UpdateExpiryOnNodeSubRequest | null> {
|
||||
const { messageHash, expireTimer, extend, shorten } = props;
|
||||
export type ExpireMessageWithExpiryOnSnodeProps = Pick<
|
||||
ExpireMessageWithTimerOnSnodeProps,
|
||||
'messageHashes'
|
||||
> &
|
||||
WithShortenOrExtend & {
|
||||
expiryMs: number;
|
||||
};
|
||||
|
||||
if (extend && shorten) {
|
||||
window.log.error(
|
||||
'[buildExpireRequest] We cannot extend and shorten a message at the same time',
|
||||
messageHash
|
||||
);
|
||||
return null;
|
||||
/**
|
||||
* Exported for testing for testing only. Used to shorten/extend expiries of an array of array of messagehashes.
|
||||
* @param expireDetails the subrequest to do
|
||||
* @returns
|
||||
*/
|
||||
export async function buildExpireRequestBatchExpiry(
|
||||
expireDetails: Array<ExpireMessageWithExpiryOnSnodeProps>
|
||||
) {
|
||||
if (expireDetails.length > MAX_SUBREQUESTS_COUNT) {
|
||||
throw new Error(`batch request can only have ${MAX_SUBREQUESTS_COUNT} subrequests at most`);
|
||||
}
|
||||
const results = await Promise.all(expireDetails.map(m => buildExpireRequestSingleExpiry(m)));
|
||||
return compact(results);
|
||||
}
|
||||
|
||||
// NOTE empty string means we want to hardcode the expiry to a TTL value, otherwise it's a shorten or extension of the TTL
|
||||
const shortenOrExtend = shorten ? 'shorten' : extend ? 'extend' : ('' as const);
|
||||
|
||||
export async function buildExpireRequestSingleExpiry(
|
||||
expireDetails: ExpireMessageWithExpiryOnSnodeProps
|
||||
): Promise<UpdateExpiryOnNodeSubRequest | null> {
|
||||
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
|
||||
if (!ourPubKey) {
|
||||
window.log.error('[buildExpireRequest] No pubkey found', messageHash);
|
||||
window.log.error('[buildExpireRequestSingleExpiry] No user pubkey');
|
||||
return null;
|
||||
}
|
||||
const { messageHashes, expiryMs, shortenOrExtend } = expireDetails;
|
||||
|
||||
// NOTE for shortenOrExtend, '' means we want to hardcode the expiry to a TTL value, otherwise it's a shorten or extension of the TTL
|
||||
|
||||
const expiry = GetNetworkTime.getNowWithNetworkOffset() + expireTimer;
|
||||
const signResult = await SnodeSignature.generateUpdateExpirySignature({
|
||||
shortenOrExtend,
|
||||
timestamp: expiry,
|
||||
messageHashes: [messageHash],
|
||||
timestamp: expiryMs,
|
||||
messageHashes,
|
||||
});
|
||||
|
||||
if (!signResult) {
|
||||
window.log.error(
|
||||
`[buildExpireRequest] SnodeSignature.generateUpdateExpirySignature returned an empty result ${messageHash}`
|
||||
`[buildExpireRequestSingleExpiry] SnodeSignature.generateUpdateExpirySignature returned an empty result`
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
const expireParams: UpdateExpiryOnNodeSubRequest = {
|
||||
method: 'expire',
|
||||
return {
|
||||
method: 'expire' as const,
|
||||
params: {
|
||||
pubkey: ourPubKey,
|
||||
pubkey_ed25519: signResult.pubkey_ed25519.toUpperCase(),
|
||||
messages: [messageHash],
|
||||
expiry,
|
||||
extend: extend || undefined,
|
||||
shorten: shorten || undefined,
|
||||
messages: messageHashes,
|
||||
expiry: expiryMs,
|
||||
extend: shortenOrExtend === 'extend' || undefined,
|
||||
shorten: shortenOrExtend === 'shorten' || undefined,
|
||||
signature: signResult?.signature,
|
||||
},
|
||||
};
|
||||
|
||||
return expireParams;
|
||||
}
|
||||
|
||||
type GroupedBySameExpiry = Record<string, Array<string>>;
|
||||
|
||||
function getBatchExpiryChunk({
|
||||
expiryChunk,
|
||||
groupedBySameExpiry,
|
||||
shortenOrExtend,
|
||||
}: {
|
||||
expiryChunk: Array<string>;
|
||||
} & WithShortenOrExtend & { groupedBySameExpiry: GroupedBySameExpiry }) {
|
||||
const expiryDetails: Array<ExpireMessageWithExpiryOnSnodeProps> = expiryChunk.map(expiryStr => {
|
||||
const expiryMs = parseInt(expiryStr, 10);
|
||||
const msgHashesForThisExpiry = groupedBySameExpiry[expiryStr];
|
||||
|
||||
return {
|
||||
expiryMs,
|
||||
messageHashes: msgHashesForThisExpiry,
|
||||
shortenOrExtend,
|
||||
};
|
||||
});
|
||||
|
||||
return buildExpireRequestBatchExpiry(expiryDetails);
|
||||
}
|
||||
|
||||
function groupMsgByExpiry(expiringDetails: ExpiringDetails) {
|
||||
const hashesWithExpiry = uniqBy(
|
||||
expiringDetails.map(m => ({
|
||||
messageHash: m.messageHash,
|
||||
expiry: m.expireTimerMs + m.readAt,
|
||||
})),
|
||||
n => n.messageHash
|
||||
);
|
||||
|
||||
const groupedBySameExpiry: GroupedBySameExpiry = {};
|
||||
for (let index = 0; index < hashesWithExpiry.length; index++) {
|
||||
const { expiry, messageHash } = hashesWithExpiry[index];
|
||||
const expiryStr = `${expiry}`;
|
||||
if (!groupedBySameExpiry[expiryStr]) {
|
||||
groupedBySameExpiry[expiryStr] = [];
|
||||
}
|
||||
groupedBySameExpiry[expiryStr].push(messageHash);
|
||||
}
|
||||
return groupedBySameExpiry;
|
||||
}
|
||||
|
||||
export type ExpiringDetails = Array<
|
||||
{ messageHash: string } & Pick<ExpireMessageWithTimerOnSnodeProps, 'readAt' | 'expireTimerMs'>
|
||||
>;
|
||||
|
||||
/**
|
||||
* Sends an 'expire' request to the user's swarm for a specific message.
|
||||
* This supports both extending and shortening a message's TTL.
|
||||
* The returned TTL should be assigned to the message to expire.
|
||||
* @param messageHash the hash of the message to expire
|
||||
* @param readAt when that message was read on this device (network timestamp offset is removed later)
|
||||
* @param expireTimer amount of time until we expire the message from now in milliseconds
|
||||
* @param extend whether to extend the message's TTL
|
||||
* @param shorten whether to shorten the message's TTL
|
||||
* @returns the TTL of the message as set by the server
|
||||
*/
|
||||
export async function expireMessageOnSnode(
|
||||
props: ExpireMessageOnSnodeProps
|
||||
): Promise<number | null> {
|
||||
const { messageHash } = props;
|
||||
|
||||
export async function expireMessagesOnSnode(
|
||||
expiringDetails: ExpiringDetails,
|
||||
options: WithShortenOrExtend
|
||||
): Promise<Array<{ messageHash: string; updatedExpiryMs: number }>> {
|
||||
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
|
||||
if (!ourPubKey) {
|
||||
window.log.error('[expireMessageOnSnode] No pubkey found', messageHash);
|
||||
return null;
|
||||
throw new Error('[expireMessageOnSnode] No pubkey found');
|
||||
}
|
||||
|
||||
let snode: Snode | undefined;
|
||||
|
||||
try {
|
||||
const expireRequestParams = await buildExpireRequest(props);
|
||||
if (!expireRequestParams) {
|
||||
throw new Error(`Failed to build expire request ${JSON.stringify(props)}`);
|
||||
}
|
||||
let newTTL = null;
|
||||
// key is a string even if it is really a number because Object.keys only knows strings...
|
||||
const groupedBySameExpiry = groupMsgByExpiry(expiringDetails);
|
||||
const chunkedExpiries = chunk(Object.keys(groupedBySameExpiry), MAX_SUBREQUESTS_COUNT); // chunking because the batch endpoint only allow MAX_SUBREQUESTS_COUNT subrequests per requests
|
||||
|
||||
await pRetry(
|
||||
async () => {
|
||||
const swarm = await getSwarmFor(ourPubKey);
|
||||
snode = sample(swarm);
|
||||
if (!snode) {
|
||||
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
|
||||
}
|
||||
newTTL = await expireOnNodes(snode, expireRequestParams);
|
||||
},
|
||||
{
|
||||
retries: 3,
|
||||
factor: 2,
|
||||
minTimeout: SeedNodeAPI.getMinTimeout(),
|
||||
onFailedAttempt: e => {
|
||||
window?.log?.warn(
|
||||
`[expireMessageOnSnode] expire message on snode attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... Error: ${e.message}`
|
||||
);
|
||||
},
|
||||
}
|
||||
// TODO after the next storage server fork we will get a new endpoint allowing to batch
|
||||
// update expiries even when they are * not * the same for all the message hashes.
|
||||
// But currently we can't access it that endpoint, so we need to keep this hacky way for now.
|
||||
// groupby expiries ( expireTimer+ readAt), then batch them with a limit of MAX_SUBREQUESTS_COUNT batch calls per batch requests, then do those in parralel, for now.
|
||||
const expireRequestsParams = await Promise.all(
|
||||
chunkedExpiries.map(chk =>
|
||||
getBatchExpiryChunk({
|
||||
expiryChunk: chk,
|
||||
groupedBySameExpiry,
|
||||
shortenOrExtend: options.shortenOrExtend,
|
||||
})
|
||||
)
|
||||
);
|
||||
if (!expireRequestsParams || isEmpty(expireRequestsParams)) {
|
||||
throw new Error(`Failed to build expire request`);
|
||||
}
|
||||
|
||||
// we most likely will only have a single chunk, so this is a bit of over engineered.
|
||||
// if any of those requests fails, make sure to not consider
|
||||
const allSettled = await Promise.allSettled(
|
||||
expireRequestsParams.map(chunkRequest =>
|
||||
pRetry(
|
||||
async () => {
|
||||
const swarm = await getSwarmFor(ourPubKey);
|
||||
snode = sample(swarm);
|
||||
if (!snode) {
|
||||
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
|
||||
}
|
||||
return updateExpiryOnNodes(snode, ourPubKey, chunkRequest);
|
||||
},
|
||||
{
|
||||
retries: 3,
|
||||
factor: 2,
|
||||
minTimeout: SeedNodeAPI.getMinTimeout(),
|
||||
onFailedAttempt: e => {
|
||||
window?.log?.warn(
|
||||
`[expireMessageOnSnode] expire message on snode attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... Error: ${e.message}`
|
||||
);
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
return newTTL;
|
||||
return flatten(compact(allSettled.map(m => (m.status === 'fulfilled' ? m.value : null))));
|
||||
} catch (e) {
|
||||
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
|
||||
window?.log?.warn(
|
||||
`[expireMessageOnSnode] ${e.code ? `${e.code} ` : ''}${e.message ||
|
||||
e} by ${ourPubKey} for ${messageHash} via snode:${snodeStr}`
|
||||
`[expireMessageOnSnode] ${e.code || ''}${e.message ||
|
||||
e} by ${ourPubKey} via snode:${snodeStr}`
|
||||
);
|
||||
throw e;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
import { isEmpty, sample } from 'lodash';
|
||||
import { isFinite, isNil, isNumber, sample } from 'lodash';
|
||||
import pRetry from 'p-retry';
|
||||
import { Snode } from '../../../data/data';
|
||||
import { UserUtils } from '../../utils';
|
||||
|
@ -7,6 +7,7 @@ import { EmptySwarmError } from '../../utils/errors';
|
|||
import { SeedNodeAPI } from '../seed_node_api';
|
||||
import { GetExpiriesFromNodeSubRequest } from './SnodeRequestTypes';
|
||||
import { doSnodeBatchRequest } from './batchRequest';
|
||||
import { GetNetworkTime } from './getNetworkTime';
|
||||
import { getSwarmFor } from './snodePool';
|
||||
import { SnodeSignature } from './snodeSignatures';
|
||||
import { GetExpiriesResultsContent } from './types';
|
||||
|
@ -14,44 +15,27 @@ import { GetExpiriesResultsContent } from './types';
|
|||
export type GetExpiriesRequestResponseResults = Record<string, number>;
|
||||
|
||||
export async function processGetExpiriesRequestResponse(
|
||||
targetNode: Snode,
|
||||
_targetNode: Snode,
|
||||
expiries: GetExpiriesResultsContent,
|
||||
messageHashes: Array<string>
|
||||
): Promise<GetExpiriesRequestResponseResults> {
|
||||
if (isEmpty(expiries)) {
|
||||
if (isNil(expiries)) {
|
||||
throw Error(
|
||||
`[processGetExpiriesRequestResponse] Expiries are missing! ${JSON.stringify(messageHashes)}`
|
||||
`[processGetExpiriesRequestResponse] Expiries are nul/undefined! ${JSON.stringify(
|
||||
messageHashes
|
||||
)}`
|
||||
);
|
||||
}
|
||||
|
||||
const results: GetExpiriesRequestResponseResults = {};
|
||||
|
||||
for (const messageHash of Object.keys(expiries)) {
|
||||
if (!expiries[messageHash]) {
|
||||
window.log.warn(
|
||||
`[processGetExpiriesRequestResponse] Expiries result failure on ${
|
||||
targetNode.pubkey_ed25519
|
||||
} for messageHash ${messageHash}\n${JSON.stringify(
|
||||
expiries[messageHash]
|
||||
)} moving to the next node`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Note: we iterate over the hash we've requested and not the one we received,
|
||||
// because a message which expired already is not in the result at all (and we need to force it to be expired)
|
||||
for (const messageHash of messageHashes) {
|
||||
const expiryMs = expiries[messageHash];
|
||||
|
||||
if (!expiryMs) {
|
||||
window.log.warn(
|
||||
`[processGetExpiriesRequestResponse] Missing expiry value on ${
|
||||
targetNode.pubkey_ed25519
|
||||
} so we will ignore this result (${messageHash}) and move onto the next node.\n${JSON.stringify(
|
||||
expiries[messageHash]
|
||||
)}`
|
||||
);
|
||||
results[messageHash] = -1; // explicit failure value
|
||||
} else {
|
||||
if (expiries[messageHash] && isNumber(expiryMs) && isFinite(expiryMs)) {
|
||||
results[messageHash] = expiryMs;
|
||||
}
|
||||
} // not adding the Date.now() fallback here as it is done in the caller of this function
|
||||
}
|
||||
|
||||
return results;
|
||||
|
@ -60,7 +44,7 @@ export async function processGetExpiriesRequestResponse(
|
|||
async function getExpiriesFromNodes(
|
||||
targetNode: Snode,
|
||||
expireRequest: GetExpiriesFromNodeSubRequest
|
||||
): Promise<Array<number>> {
|
||||
) {
|
||||
try {
|
||||
const result = await doSnodeBatchRequest(
|
||||
[expireRequest],
|
||||
|
@ -83,22 +67,27 @@ async function getExpiriesFromNodes(
|
|||
const firstResult = result[0];
|
||||
|
||||
if (firstResult.code !== 200) {
|
||||
throw Error(`result is not 200 but ${firstResult.code}`);
|
||||
throw Error(`getExpiriesFromNodes result is not 200 but ${firstResult.code}`);
|
||||
}
|
||||
|
||||
const bodyFirstResult = firstResult.body;
|
||||
debugger;
|
||||
// expirationResults is a record of {messageHash: currentExpiry}
|
||||
const expirationResults = await processGetExpiriesRequestResponse(
|
||||
targetNode,
|
||||
bodyFirstResult.expiries as GetExpiriesResultsContent,
|
||||
firstResult.body.expiries as GetExpiriesResultsContent,
|
||||
expireRequest.params.messages
|
||||
);
|
||||
|
||||
if (!Object.keys(expirationResults).length) {
|
||||
throw new Error('expirationResults is empty');
|
||||
}
|
||||
// Note: even if expirationResults is empty we need to process the results.
|
||||
// The status code is 200, so if the results is empty, it means all those messages already expired.
|
||||
|
||||
const expiryTimestamps: Array<number> = Object.values(expirationResults);
|
||||
return expiryTimestamps;
|
||||
// Note: a hash which already expired on the server is not going to be returned. So we force it's fetchedExpiry to be now() to make it expire asap
|
||||
const expiriesWithForcedExpiried = expireRequest.params.messages.map(messageHash => ({
|
||||
messageHash,
|
||||
fetchedExpiry: expirationResults?.[messageHash] || Date.now(),
|
||||
}));
|
||||
|
||||
return expiriesWithForcedExpiried;
|
||||
} catch (err) {
|
||||
// NOTE batch requests have their own retry logic which includes abort errors that will break our retry logic so we need to catch them and throw regular errors
|
||||
if (err instanceof pRetry.AbortError) {
|
||||
|
@ -111,13 +100,12 @@ async function getExpiriesFromNodes(
|
|||
|
||||
export type GetExpiriesFromSnodeProps = {
|
||||
messageHashes: Array<string>;
|
||||
timestamp: number;
|
||||
};
|
||||
|
||||
export async function buildGetExpiriesRequest(
|
||||
props: GetExpiriesFromSnodeProps
|
||||
): Promise<GetExpiriesFromNodeSubRequest | null> {
|
||||
const { messageHashes, timestamp } = props;
|
||||
export async function buildGetExpiriesRequest({
|
||||
messageHashes,
|
||||
}: GetExpiriesFromSnodeProps): Promise<GetExpiriesFromNodeSubRequest | null> {
|
||||
const timestamp = GetNetworkTime.getNowWithNetworkOffset();
|
||||
|
||||
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
|
||||
if (!ourPubKey) {
|
||||
|
@ -159,11 +147,7 @@ export async function buildGetExpiriesRequest(
|
|||
* @param timestamp the time (ms) the request was initiated, must be within ±60s of the current time so using the server time is recommended.
|
||||
* @returns an arrray of the expiry timestamps (TTL) for the given messages
|
||||
*/
|
||||
export async function getExpiriesFromSnode(
|
||||
props: GetExpiriesFromSnodeProps
|
||||
): Promise<Array<number>> {
|
||||
const { messageHashes } = props;
|
||||
|
||||
export async function getExpiriesFromSnode({ messageHashes }: GetExpiriesFromSnodeProps) {
|
||||
// FIXME There is a bug in the snode code that requires at least 2 messages to be requested. Will be fixed in next storage server release
|
||||
if (messageHashes.length === 1) {
|
||||
messageHashes.push('fakehash');
|
||||
|
@ -178,21 +162,19 @@ export async function getExpiriesFromSnode(
|
|||
let snode: Snode | undefined;
|
||||
|
||||
try {
|
||||
const expireRequestParams = await buildGetExpiriesRequest(props);
|
||||
const expireRequestParams = await buildGetExpiriesRequest({ messageHashes });
|
||||
if (!expireRequestParams) {
|
||||
throw new Error(`Failed to build get_expiries request ${JSON.stringify(props)}`);
|
||||
throw new Error(`Failed to build get_expiries request ${JSON.stringify({ messageHashes })}`);
|
||||
}
|
||||
|
||||
let expiryTimestamps: Array<number> = [];
|
||||
|
||||
await pRetry(
|
||||
const fetchedExpiries = await pRetry(
|
||||
async () => {
|
||||
const swarm = await getSwarmFor(ourPubKey);
|
||||
snode = sample(swarm);
|
||||
if (!snode) {
|
||||
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
|
||||
}
|
||||
expiryTimestamps = await getExpiriesFromNodes(snode, expireRequestParams);
|
||||
return getExpiriesFromNodes(snode, expireRequestParams);
|
||||
},
|
||||
{
|
||||
retries: 3,
|
||||
|
@ -206,7 +188,7 @@ export async function getExpiriesFromSnode(
|
|||
}
|
||||
);
|
||||
|
||||
return expiryTimestamps;
|
||||
return fetchedExpiries;
|
||||
} catch (e) {
|
||||
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
|
||||
window?.log?.warn(
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { getSodiumRenderer } from '../../crypto';
|
||||
import { StringUtils, UserUtils } from '../../utils';
|
||||
import { fromHexToArray, fromUInt8ArrayToBase64 } from '../../utils/String';
|
||||
import { WithShortenOrExtend } from './SnodeRequestTypes';
|
||||
import { GetNetworkTime } from './getNetworkTime';
|
||||
|
||||
export type SnodeSignatureResult = {
|
||||
|
@ -104,10 +105,9 @@ async function generateUpdateExpirySignature({
|
|||
timestamp,
|
||||
messageHashes,
|
||||
}: {
|
||||
shortenOrExtend: 'extend' | 'shorten' | '';
|
||||
timestamp: number;
|
||||
messageHashes: Array<string>;
|
||||
}): Promise<{ signature: string; pubkey_ed25519: string } | null> {
|
||||
} & WithShortenOrExtend): Promise<{ signature: string; pubkey_ed25519: string } | null> {
|
||||
const ourEd25519Key = await UserUtils.getUserED25519KeyPair();
|
||||
|
||||
if (!ourEd25519Key) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { isNumber, throttle, uniq } from 'lodash';
|
||||
import { isEmpty, isNumber, throttle, uniq } from 'lodash';
|
||||
import { messagesExpired } from '../../state/ducks/conversations';
|
||||
import { initWallClockListener } from '../../util/wallClockListener';
|
||||
|
||||
|
@ -8,7 +8,7 @@ import { READ_MESSAGE_STATE } from '../../models/conversationAttributes';
|
|||
import { MessageModel } from '../../models/message';
|
||||
import { SignalService } from '../../protobuf';
|
||||
import { ReleasedFeatures } from '../../util/releaseFeature';
|
||||
import { expireMessageOnSnode } from '../apis/snode_api/expireRequest';
|
||||
import { ExpiringDetails, expireMessagesOnSnode } from '../apis/snode_api/expireRequest';
|
||||
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
|
||||
import { getConversationController } from '../conversations';
|
||||
import { isValidUnixTimestamp } from '../utils/Timestamps';
|
||||
|
@ -40,6 +40,8 @@ async function destroyMessagesAndUpdateRedux(
|
|||
// Delete any attachments
|
||||
for (let i = 0; i < messageIds.length; i++) {
|
||||
/* eslint-disable no-await-in-loop */
|
||||
// TODO make this use getMessagesById and not getMessageById
|
||||
|
||||
const message = await Data.getMessageById(messageIds[i]);
|
||||
await message?.cleanup();
|
||||
/* eslint-enable no-await-in-loop */
|
||||
|
@ -77,10 +79,22 @@ async function destroyExpiredMessages() {
|
|||
messages.forEach(expired => {
|
||||
window.log.info('Message expired', {
|
||||
sentAt: expired.get('sent_at'),
|
||||
hash: expired.getMessageHash(),
|
||||
});
|
||||
});
|
||||
|
||||
await destroyMessagesAndUpdateRedux(messagesExpiredDetails);
|
||||
const convosToRefresh = uniq(messagesExpiredDetails.map(m => m.conversationKey));
|
||||
await Promise.all(
|
||||
convosToRefresh.map(async c => {
|
||||
getConversationController()
|
||||
.get(c)
|
||||
?.updateLastMessage();
|
||||
return getConversationController()
|
||||
.get(c)
|
||||
?.refreshInMemoryDetails();
|
||||
})
|
||||
);
|
||||
} catch (error) {
|
||||
window.log.error(
|
||||
'destroyExpiredMessages: Error deleting expired messages',
|
||||
|
@ -108,7 +122,11 @@ async function checkExpiringMessages() {
|
|||
}
|
||||
|
||||
const ms = expiresAt - Date.now();
|
||||
window.log.info(`message expires in ${ms}ms, or ${ms / 1000}s, or ${ms / (3600 * 1000)}h`);
|
||||
window.log.info(
|
||||
`message with hash:${next.getMessageHash()} expires in ${ms}ms, or ${Math.floor(
|
||||
ms / 1000
|
||||
)}s, or ${Math.floor(ms / (3600 * 1000))}h`
|
||||
);
|
||||
|
||||
let wait = expiresAt - Date.now();
|
||||
|
||||
|
@ -516,60 +534,72 @@ async function checkHasOutdatedDisappearingMessageClient(
|
|||
}
|
||||
}
|
||||
|
||||
async function updateMessageExpiryOnSwarm(
|
||||
message: MessageModel,
|
||||
callLocation?: string, // this is for debugging purposes
|
||||
shouldCommit?: boolean
|
||||
) {
|
||||
if (callLocation) {
|
||||
// window.log.debug(`[updateMessageExpiryOnSwarm] called from: ${callLocation} `);
|
||||
}
|
||||
async function updateMessageExpiriesOnSwarm(messages: Array<MessageModel>) {
|
||||
const expiringDetails: ExpiringDetails = [];
|
||||
|
||||
if (!message.getExpirationType() || !message.getExpireTimer()) {
|
||||
window.log.debug(
|
||||
`[updateMessageExpiryOnSwarm] Message ${message.get(
|
||||
'messageHash'
|
||||
)} has no expirationType or expireTimer set. Ignoring`
|
||||
);
|
||||
return message;
|
||||
}
|
||||
|
||||
const messageHash = message.get('messageHash');
|
||||
if (!messageHash) {
|
||||
window.log.debug(
|
||||
`[updateMessageExpiryOnSwarm] Missing messageHash messageId: ${message.get('id')}`
|
||||
);
|
||||
return message;
|
||||
}
|
||||
|
||||
const newTTL = await expireMessageOnSnode({
|
||||
messageHash,
|
||||
expireTimer: message.getExpireTimer() * 1000,
|
||||
shorten: true,
|
||||
});
|
||||
const expiresAt = message.getExpiresAt();
|
||||
|
||||
if (newTTL && newTTL !== expiresAt) {
|
||||
message.set({
|
||||
expires_at: newTTL,
|
||||
});
|
||||
|
||||
if (shouldCommit) {
|
||||
await message.commit();
|
||||
messages.forEach(msg => {
|
||||
const hash = msg.getMessageHash();
|
||||
const timestampStarted = msg.getExpirationStartTimestamp();
|
||||
const timerSeconds = msg.getExpireTimer();
|
||||
const disappearingType = msg.getExpirationType();
|
||||
if (
|
||||
!hash ||
|
||||
!timestampStarted ||
|
||||
timestampStarted <= 0 ||
|
||||
!timerSeconds ||
|
||||
timerSeconds <= 0 ||
|
||||
disappearingType !== 'deleteAfterRead' || // this is very important as a message not stored on the swarm will be assumed expired, and so deleted locally!
|
||||
!msg.isIncoming() // this is very important as a message not stored on the swarm will be assumed expired, and so deleted locally!
|
||||
) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
window.log.debug(
|
||||
`[updateMessageExpiryOnSwarm]\nmessageHash ${messageHash} has no new TTL.${
|
||||
expiresAt
|
||||
? `\nKeeping the old one ${expiresAt}which expires at ${new Date(
|
||||
expiresAt
|
||||
).toUTCString()}`
|
||||
: ''
|
||||
}`
|
||||
);
|
||||
}
|
||||
expiringDetails.push({
|
||||
messageHash: hash,
|
||||
expireTimerMs: timerSeconds * 1000,
|
||||
readAt: timestampStarted,
|
||||
});
|
||||
});
|
||||
|
||||
return message;
|
||||
if (isEmpty(expiringDetails)) {
|
||||
window.log.debug(`[updateMessageExpiriesOnSwarm] no expiringDetails to update`);
|
||||
return;
|
||||
}
|
||||
window.log.debug('updateMessageExpiriesOnSwarm: expiringDetails', expiringDetails);
|
||||
|
||||
const newTTLs = await expireMessagesOnSnode(expiringDetails, { shortenOrExtend: 'shorten' });
|
||||
const updatedMsgModels: Array<MessageModel> = [];
|
||||
newTTLs.forEach(m => {
|
||||
const message = messages.find(model => model.getMessageHash() === m.messageHash);
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
|
||||
const newTTLms = m.updatedExpiryMs;
|
||||
const realReadAt = newTTLms - message.getExpireTimer() * 1000;
|
||||
if (
|
||||
newTTLms &&
|
||||
(newTTLms !== message.getExpiresAt() ||
|
||||
message.get('expirationStartTimestamp') !== realReadAt) &&
|
||||
message.getExpireTimer()
|
||||
) {
|
||||
window.log.debug(`updateMessageExpiriesOnSwarm: setting for msg hash ${m.messageHash}:`, {
|
||||
expires_at: newTTLms,
|
||||
expirationStartTimestamp: realReadAt,
|
||||
unread: READ_MESSAGE_STATE.read,
|
||||
});
|
||||
message.set({
|
||||
expires_at: newTTLms,
|
||||
expirationStartTimestamp: realReadAt,
|
||||
unread: READ_MESSAGE_STATE.read,
|
||||
});
|
||||
|
||||
updatedMsgModels.push(message);
|
||||
}
|
||||
});
|
||||
|
||||
if (!isEmpty(updatedMsgModels)) {
|
||||
await Promise.all(updatedMsgModels.map(m => m.commit()));
|
||||
}
|
||||
}
|
||||
|
||||
export const DisappearingMessages = {
|
||||
|
@ -583,5 +613,6 @@ export const DisappearingMessages = {
|
|||
checkForExpiringOutgoingMessage,
|
||||
getMessageReadyToDisappear,
|
||||
checkHasOutdatedDisappearingMessageClient,
|
||||
updateMessageExpiryOnSwarm,
|
||||
updateMessageExpiriesOnSwarm,
|
||||
destroyExpiredMessages,
|
||||
};
|
||||
|
|
|
@ -192,7 +192,7 @@ async function send(
|
|||
canBeDeleteAfterRead &&
|
||||
(expirationMode === 'legacy' || expirationMode === 'deleteAfterRead')
|
||||
) {
|
||||
await DisappearingMessages.updateMessageExpiryOnSwarm(foundMessage, 'send()');
|
||||
await DisappearingMessages.updateMessageExpiriesOnSwarm([foundMessage]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
import { cloneDeep, compact, isArray, isString } from 'lodash';
|
||||
import { Data } from '../../../data/data';
|
||||
import { Storage } from '../../../util/storage';
|
||||
import { timeout } from '../Promise';
|
||||
import { persistedJobFromData } from './JobDeserialization';
|
||||
import { JobRunnerType } from './jobs/JobRunnerType';
|
||||
import {
|
||||
AvatarDownloadPersistedData,
|
||||
ConfigurationSyncPersistedData,
|
||||
FetchMsgExpirySwarmPersistedData,
|
||||
PersistedJob,
|
||||
RunJobResult,
|
||||
TypeOfPersistedData,
|
||||
UpdateMsgExpirySwarmPersistedData,
|
||||
} from './PersistedJob';
|
||||
import { Storage } from '../../../util/storage';
|
||||
import { JobRunnerType } from './jobs/JobRunnerType';
|
||||
|
||||
/**
|
||||
* 'job_in_progress' if there is already a job in progress
|
||||
|
@ -100,7 +102,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
|||
|
||||
// make sure there is no job with that same identifier already .
|
||||
|
||||
window.log.info(`job runner adding type :"${job.persistedData.jobType}" `);
|
||||
window.log.debug(`job runner adding type:"${job.persistedData.jobType}"`);
|
||||
return this.addJobUnchecked(job);
|
||||
}
|
||||
|
||||
|
@ -166,7 +168,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
|||
|
||||
private async writeJobsToDB() {
|
||||
const serialized = this.getSerializedJobs();
|
||||
window.log.debug(`writing to db for "${this.jobRunnerType}": `, serialized);
|
||||
|
||||
await Storage.put(this.getJobRunnerItemId(), JSON.stringify(serialized));
|
||||
}
|
||||
|
||||
|
@ -357,7 +359,19 @@ const avatarDownloadRunner = new PersistedJobRunner<AvatarDownloadPersistedData>
|
|||
null
|
||||
);
|
||||
|
||||
const updateMsgExpiryRunner = new PersistedJobRunner<UpdateMsgExpirySwarmPersistedData>(
|
||||
'UpdateMsgExpirySwarmJob',
|
||||
null
|
||||
);
|
||||
|
||||
const fetchSwarmMsgExpiryRunner = new PersistedJobRunner<FetchMsgExpirySwarmPersistedData>(
|
||||
'FetchMsgExpirySwarmJob',
|
||||
null
|
||||
);
|
||||
|
||||
export const runners = {
|
||||
configurationSyncRunner,
|
||||
updateMsgExpiryRunner,
|
||||
fetchSwarmMsgExpiryRunner,
|
||||
avatarDownloadRunner,
|
||||
};
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import { cloneDeep, isEmpty } from 'lodash';
|
||||
import { cloneDeep, flatten, isEmpty, isNil, uniq } from 'lodash';
|
||||
|
||||
export type PersistedJobType =
|
||||
| 'ConfigurationSyncJobType'
|
||||
| 'AvatarDownloadJobType'
|
||||
| 'FetchMsgExpirySwarmJobType'
|
||||
| 'UpdateMsgExpirySwarmJobType'
|
||||
| 'FakeSleepForJobType'
|
||||
| 'FakeSleepForJobMultiType';
|
||||
|
||||
|
@ -31,23 +33,40 @@ export interface AvatarDownloadPersistedData extends PersistedJobData {
|
|||
conversationId: string;
|
||||
}
|
||||
|
||||
interface PersitedDataWithMsgIds extends PersistedJobData {
|
||||
msgIds: Array<string>;
|
||||
}
|
||||
|
||||
export interface ConfigurationSyncPersistedData extends PersistedJobData {
|
||||
jobType: 'ConfigurationSyncJobType';
|
||||
}
|
||||
|
||||
export interface FetchMsgExpirySwarmPersistedData extends PersitedDataWithMsgIds {
|
||||
jobType: 'FetchMsgExpirySwarmJobType';
|
||||
}
|
||||
|
||||
export interface UpdateMsgExpirySwarmPersistedData extends PersitedDataWithMsgIds {
|
||||
jobType: 'UpdateMsgExpirySwarmJobType';
|
||||
}
|
||||
|
||||
export type TypeOfPersistedData =
|
||||
| ConfigurationSyncPersistedData
|
||||
| AvatarDownloadPersistedData
|
||||
| FetchMsgExpirySwarmPersistedData
|
||||
| UpdateMsgExpirySwarmPersistedData
|
||||
| FakeSleepJobData
|
||||
| FakeSleepForMultiJobData;
|
||||
|
||||
export type AddJobCheckReturn = 'skipAddSameJobPresent' | 'sameJobDataAlreadyInQueue' | null;
|
||||
export type AddJobCheckReturn = 'skipAddSameJobPresent' | null;
|
||||
|
||||
export enum RunJobResult {
|
||||
Success = 1,
|
||||
RetryJobIfPossible = 2,
|
||||
PermanentFailure = 3,
|
||||
}
|
||||
function isDataWithMsgIds(data: PersistedJobData): data is PersitedDataWithMsgIds {
|
||||
return !isNil((data as PersitedDataWithMsgIds)?.msgIds);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class can be used to save and run jobs from the database.
|
||||
|
@ -122,6 +141,34 @@ export abstract class PersistedJob<T extends PersistedJobData> {
|
|||
: null;
|
||||
}
|
||||
|
||||
public addJobCheckEveryMsgIdsAlreadyPresent(jobs: Array<T>): 'skipAddSameJobPresent' | null {
|
||||
if (!jobs.length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!isDataWithMsgIds(this.persistedData)) {
|
||||
throw new Error(`${this.persistedData.jobType} does not have a msgIds field`);
|
||||
}
|
||||
|
||||
const allIdsAlreadyScheduled = uniq(
|
||||
flatten(
|
||||
jobs.map(m => {
|
||||
if (!isDataWithMsgIds(m)) {
|
||||
throw new Error(`${this.persistedData.jobType} does not have a msgIds field`);
|
||||
}
|
||||
return m.msgIds;
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
// if all ids we are trying to add are already tracked as other jobs in the job runner,
|
||||
// there is no need to add this job at all.
|
||||
if (this.persistedData.msgIds.every(m => allIdsAlreadyScheduled.includes(m))) {
|
||||
return 'skipAddSameJobPresent';
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public abstract getJobTimeoutMs(): number;
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
/* eslint-disable no-await-in-loop */
|
||||
import { compact, isEmpty, isNumber, uniq } from 'lodash';
|
||||
import { v4 } from 'uuid';
|
||||
import { Data } from '../../../../data/data';
|
||||
import { READ_MESSAGE_STATE } from '../../../../models/conversationAttributes';
|
||||
import { MessageModel } from '../../../../models/message';
|
||||
import { isSignInByLinking } from '../../../../util/storage';
|
||||
import { getExpiriesFromSnode } from '../../../apis/snode_api/getExpiriesRequest';
|
||||
import { DisappearingMessages } from '../../../disappearing_messages';
|
||||
import { runners } from '../JobRunner';
|
||||
import {
|
||||
AddJobCheckReturn,
|
||||
FetchMsgExpirySwarmPersistedData,
|
||||
PersistedJob,
|
||||
RunJobResult,
|
||||
} from '../PersistedJob';
|
||||
|
||||
class FetchMsgExpirySwarmJob extends PersistedJob<FetchMsgExpirySwarmPersistedData> {
|
||||
constructor({
|
||||
identifier,
|
||||
nextAttemptTimestamp,
|
||||
maxAttempts,
|
||||
currentRetry,
|
||||
msgIds,
|
||||
}: Partial<
|
||||
Pick<
|
||||
FetchMsgExpirySwarmPersistedData,
|
||||
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
|
||||
>
|
||||
> &
|
||||
Pick<FetchMsgExpirySwarmPersistedData, 'msgIds'>) {
|
||||
super({
|
||||
jobType: 'FetchMsgExpirySwarmJobType',
|
||||
identifier: identifier || v4(),
|
||||
delayBetweenRetries: 2000,
|
||||
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 2,
|
||||
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
|
||||
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
|
||||
msgIds: uniq(msgIds),
|
||||
});
|
||||
}
|
||||
|
||||
public async run(): Promise<RunJobResult> {
|
||||
const start = Date.now();
|
||||
|
||||
try {
|
||||
if (!this.persistedData.msgIds || isEmpty(this.persistedData.msgIds)) {
|
||||
return RunJobResult.Success;
|
||||
}
|
||||
let msgModels = await Data.getMessagesById(this.persistedData.msgIds);
|
||||
const messageHashes = compact(msgModels.map(m => m.getMessageHash()));
|
||||
|
||||
if (isEmpty(msgModels) || isEmpty(messageHashes)) {
|
||||
return RunJobResult.Success;
|
||||
}
|
||||
|
||||
const fetchedExpiries = await getExpiriesFromSnode({
|
||||
messageHashes,
|
||||
});
|
||||
const updatedMsgModels: Array<MessageModel> = [];
|
||||
|
||||
if (fetchedExpiries.length) {
|
||||
// get a fresh list of attributes for those message models
|
||||
msgModels = await Data.getMessagesById(this.persistedData.msgIds);
|
||||
for (let index = 0; index < fetchedExpiries.length; index++) {
|
||||
const expiry = fetchedExpiries[index];
|
||||
if (expiry.fetchedExpiry <= 0) {
|
||||
continue;
|
||||
}
|
||||
const message = msgModels.find(m => m.getMessageHash() === expiry.messageHash);
|
||||
if (!message) {
|
||||
continue;
|
||||
}
|
||||
const realReadAt = expiry.fetchedExpiry - message.getExpireTimer() * 1000;
|
||||
|
||||
if (
|
||||
(message.get('expirationStartTimestamp') !== realReadAt ||
|
||||
message.get('expires_at') !== expiry.fetchedExpiry) &&
|
||||
message.getExpireTimer()
|
||||
) {
|
||||
window.log.debug(
|
||||
`FetchMsgExpirySwarmJob: setting for msg hash ${message.getMessageHash()}:`,
|
||||
{
|
||||
expires_at: expiry.fetchedExpiry,
|
||||
unread: READ_MESSAGE_STATE.read,
|
||||
expirationStartTimestamp: realReadAt,
|
||||
}
|
||||
);
|
||||
message.set({
|
||||
expires_at: expiry.fetchedExpiry,
|
||||
unread: READ_MESSAGE_STATE.read,
|
||||
expirationStartTimestamp: realReadAt,
|
||||
});
|
||||
updatedMsgModels.push(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
await Promise.all(updatedMsgModels.map(m => m.commit()));
|
||||
await DisappearingMessages.destroyExpiredMessages();
|
||||
|
||||
return RunJobResult.Success;
|
||||
} finally {
|
||||
window.log.debug(`FetchMsgExpirySwarmJob run() took ${Date.now() - start}ms`);
|
||||
}
|
||||
}
|
||||
|
||||
public serializeJob(): FetchMsgExpirySwarmPersistedData {
|
||||
const fromParent = super.serializeBase();
|
||||
return fromParent;
|
||||
}
|
||||
|
||||
public addJobCheck(jobs: Array<FetchMsgExpirySwarmPersistedData>): AddJobCheckReturn {
|
||||
return this.addJobCheckEveryMsgIdsAlreadyPresent(jobs);
|
||||
}
|
||||
|
||||
public nonRunningJobsToRemove(_jobs: Array<FetchMsgExpirySwarmPersistedData>) {
|
||||
return [];
|
||||
}
|
||||
|
||||
public getJobTimeoutMs(): number {
|
||||
return 20000;
|
||||
}
|
||||
}
|
||||
|
||||
async function queueNewJobIfNeeded(msgIds: Array<string>) {
|
||||
if (isSignInByLinking()) {
|
||||
window.log.info('NOT Scheduling FetchMsgExpirySwarmJob: as we are linking a device');
|
||||
|
||||
return;
|
||||
}
|
||||
if (isEmpty(msgIds)) {
|
||||
return;
|
||||
}
|
||||
|
||||
await runners.fetchSwarmMsgExpiryRunner.addJob(
|
||||
new FetchMsgExpirySwarmJob({ nextAttemptTimestamp: Date.now() + 1000, msgIds })
|
||||
);
|
||||
}
|
||||
|
||||
export const FetchMsgExpirySwarm = {
|
||||
FetchMsgExpirySwarmJob,
|
||||
queueNewJobIfNeeded,
|
||||
};
|
|
@ -1,5 +1,7 @@
|
|||
export type JobRunnerType =
|
||||
| 'ConfigurationSyncJob'
|
||||
| 'FetchMsgExpirySwarmJob'
|
||||
| 'UpdateMsgExpirySwarmJob'
|
||||
| 'FakeSleepForJob'
|
||||
| 'FakeSleepForMultiJob'
|
||||
| 'AvatarDownloadJob';
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/* eslint-disable no-await-in-loop */
|
||||
import { isEmpty, isNumber, uniq } from 'lodash';
|
||||
import { v4 } from 'uuid';
|
||||
import { Data } from '../../../../data/data';
|
||||
import { isSignInByLinking } from '../../../../util/storage';
|
||||
import { DisappearingMessages } from '../../../disappearing_messages';
|
||||
import { runners } from '../JobRunner';
|
||||
import {
|
||||
AddJobCheckReturn,
|
||||
PersistedJob,
|
||||
RunJobResult,
|
||||
UpdateMsgExpirySwarmPersistedData,
|
||||
} from '../PersistedJob';
|
||||
|
||||
class UpdateMsgExpirySwarmJob extends PersistedJob<UpdateMsgExpirySwarmPersistedData> {
|
||||
constructor({
|
||||
identifier,
|
||||
nextAttemptTimestamp,
|
||||
maxAttempts,
|
||||
currentRetry,
|
||||
msgIds,
|
||||
}: Partial<
|
||||
Pick<
|
||||
UpdateMsgExpirySwarmPersistedData,
|
||||
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
|
||||
>
|
||||
> &
|
||||
Pick<UpdateMsgExpirySwarmPersistedData, 'msgIds'>) {
|
||||
super({
|
||||
jobType: 'UpdateMsgExpirySwarmJobType',
|
||||
identifier: identifier || v4(),
|
||||
delayBetweenRetries: 2000,
|
||||
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 2,
|
||||
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
|
||||
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
|
||||
msgIds: uniq(msgIds),
|
||||
});
|
||||
}
|
||||
|
||||
public async run(): Promise<RunJobResult> {
|
||||
const start = Date.now();
|
||||
|
||||
try {
|
||||
if (!this.persistedData.msgIds || isEmpty(this.persistedData.msgIds)) {
|
||||
return RunJobResult.Success;
|
||||
}
|
||||
const msgModels = await Data.getMessagesById(this.persistedData.msgIds);
|
||||
if (isEmpty(msgModels)) {
|
||||
return RunJobResult.Success;
|
||||
}
|
||||
await DisappearingMessages.updateMessageExpiriesOnSwarm(msgModels);
|
||||
await DisappearingMessages.destroyExpiredMessages();
|
||||
|
||||
return RunJobResult.Success;
|
||||
} finally {
|
||||
window.log.debug(`UpdateMsgExpirySwarmJob run() took ${Date.now() - start}ms`);
|
||||
}
|
||||
}
|
||||
|
||||
public serializeJob(): UpdateMsgExpirySwarmPersistedData {
|
||||
const fromParent = super.serializeBase();
|
||||
return fromParent;
|
||||
}
|
||||
|
||||
public addJobCheck(jobs: Array<UpdateMsgExpirySwarmPersistedData>): AddJobCheckReturn {
|
||||
// if all ids we are trying to add are already tracked as other jobs in the job runner,
|
||||
// there is no need to add this job at all.
|
||||
return this.addJobCheckEveryMsgIdsAlreadyPresent(jobs);
|
||||
}
|
||||
|
||||
public nonRunningJobsToRemove(_jobs: Array<UpdateMsgExpirySwarmPersistedData>) {
|
||||
return [];
|
||||
}
|
||||
|
||||
public getJobTimeoutMs(): number {
|
||||
return 20000;
|
||||
}
|
||||
}
|
||||
|
||||
async function queueNewJobIfNeeded(msgIds: Array<string>) {
|
||||
if (isSignInByLinking()) {
|
||||
window.log.info('NOT Scheduling UpdateMsgExpirySwarmJob: as we are linking a device');
|
||||
|
||||
return;
|
||||
}
|
||||
if (isEmpty(msgIds)) {
|
||||
return;
|
||||
}
|
||||
|
||||
await runners.updateMsgExpiryRunner.addJob(
|
||||
new UpdateMsgExpirySwarmJob({ nextAttemptTimestamp: Date.now() + 1000, msgIds })
|
||||
);
|
||||
}
|
||||
|
||||
export const UpdateMsgExpirySwarm = {
|
||||
UpdateMsgExpirySwarmJob,
|
||||
queueNewJobIfNeeded,
|
||||
};
|
|
@ -3,9 +3,9 @@ import chaiAsPromised from 'chai-as-promised';
|
|||
import Sinon from 'sinon';
|
||||
import { UpdateExpiryOnNodeSubRequest } from '../../../../session/apis/snode_api/SnodeRequestTypes';
|
||||
import {
|
||||
ExpireMessageOnSnodeProps,
|
||||
ExpireMessageWithExpiryOnSnodeProps,
|
||||
ExpireRequestResponseResults,
|
||||
buildExpireRequest,
|
||||
buildExpireRequestSingleExpiry,
|
||||
processExpireRequestResponse,
|
||||
verifyExpireMsgsResponseSignature,
|
||||
verifyExpireMsgsResponseSignatureProps,
|
||||
|
@ -37,13 +37,16 @@ describe('ExpireRequest', () => {
|
|||
});
|
||||
|
||||
describe('buildExpireRequest', () => {
|
||||
const props: ExpireMessageOnSnodeProps = {
|
||||
messageHash: 'messageHash',
|
||||
expireTimer: 60,
|
||||
const props: ExpireMessageWithExpiryOnSnodeProps = {
|
||||
messageHashes: ['messageHash'],
|
||||
expiryMs: 12340000 + 60 * 1000,
|
||||
shortenOrExtend: '',
|
||||
};
|
||||
|
||||
it('builds a request with just the messageHash and expireTimer of 1 minute', async () => {
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest(props);
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequestSingleExpiry(
|
||||
props
|
||||
);
|
||||
|
||||
expect(request, 'should not return null').to.not.be.null;
|
||||
expect(request, 'should not return undefined').to.not.be.undefined;
|
||||
|
@ -53,8 +56,8 @@ describe('ExpireRequest', () => {
|
|||
|
||||
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
|
||||
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
|
||||
expect(request.params.messages[0], 'messageHash should be in messages array').to.equal(
|
||||
props.messageHash
|
||||
expect(request.params.messages, 'messageHash should be in messages array').to.deep.equal(
|
||||
props.messageHashes
|
||||
);
|
||||
expect(
|
||||
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
|
||||
|
@ -65,9 +68,9 @@ describe('ExpireRequest', () => {
|
|||
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
|
||||
});
|
||||
it('builds a request with extend enabled', async () => {
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest({
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequestSingleExpiry({
|
||||
...props,
|
||||
extend: true,
|
||||
shortenOrExtend: 'extend',
|
||||
});
|
||||
|
||||
expect(request, 'should not return null').to.not.be.null;
|
||||
|
@ -78,8 +81,8 @@ describe('ExpireRequest', () => {
|
|||
|
||||
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
|
||||
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
|
||||
expect(request.params.messages[0], 'messageHash should be in messages array').to.equal(
|
||||
props.messageHash
|
||||
expect(request.params.messages, 'messageHash should be in messages array').to.equal(
|
||||
props.messageHashes
|
||||
);
|
||||
expect(
|
||||
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
|
||||
|
@ -90,9 +93,9 @@ describe('ExpireRequest', () => {
|
|||
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
|
||||
});
|
||||
it('builds a request with shorten enabled', async () => {
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest({
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequestSingleExpiry({
|
||||
...props,
|
||||
shorten: true,
|
||||
shortenOrExtend: 'shorten',
|
||||
});
|
||||
|
||||
expect(request, 'should not return null').to.not.be.null;
|
||||
|
@ -103,8 +106,8 @@ describe('ExpireRequest', () => {
|
|||
|
||||
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
|
||||
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
|
||||
expect(request.params.messages[0], 'messageHash should be in messages array').to.equal(
|
||||
props.messageHash
|
||||
expect(request.params.messages, 'messageHash should be in messages array').to.equal(
|
||||
props.messageHashes
|
||||
);
|
||||
expect(
|
||||
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
|
||||
|
@ -114,15 +117,6 @@ describe('ExpireRequest', () => {
|
|||
expect(request.params.shorten, 'shorten should be true').to.be.true;
|
||||
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
|
||||
});
|
||||
it('fails to build a request if extend and shorten are both enabled', async () => {
|
||||
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest({
|
||||
...props,
|
||||
extend: true,
|
||||
shorten: true,
|
||||
});
|
||||
|
||||
expect(request, 'should return null').to.be.null;
|
||||
});
|
||||
});
|
||||
|
||||
describe('verifyExpireMsgsResponseSignature', () => {
|
||||
|
|
|
@ -44,7 +44,6 @@ describe('GetExpiriesRequest', () => {
|
|||
describe('buildGetExpiriesRequest', () => {
|
||||
const props: GetExpiriesFromSnodeProps = {
|
||||
messageHashes: ['messageHash'],
|
||||
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
|
||||
};
|
||||
|
||||
it('builds a valid request given the messageHashes and valid timestamp for now', async () => {
|
||||
|
|
Loading…
Reference in New Issue