Compare commits

...

2 Commits

Author SHA1 Message Date
Audric Ackermann 5cfbb8405c fix: use expiry from swarm to update readAt & expiresAt for msg 2023-12-12 17:11:00 +11:00
Audric Ackermann 82c6f0897b fix: add jobs for expiry update & expiry fetch 2023-12-12 16:00:23 +11:00
25 changed files with 844 additions and 395 deletions

View File

@ -127,7 +127,7 @@ export const ReadableMessage = (props: ReadableMessageProps) => {
dispatch(showScrollToBottomButton(false));
getConversationController()
.get(selectedConversationKey)
?.markConversationRead(receivedAt || 0); // TODOLATER this should be `sentAt || serverTimestamp` I think
?.markConversationRead({ newestUnreadDate: receivedAt || 0, fromConfigMessage: false }); // TODOLATER this should be `sentAt || serverTimestamp` I think
dispatch(markConversationFullyRead(selectedConversationKey));
} else if (inView === false) {
@ -172,7 +172,7 @@ export const ReadableMessage = (props: ReadableMessageProps) => {
if (foundSentAt) {
getConversationController()
.get(selectedConversationKey)
?.markConversationRead(foundSentAt, Date.now());
?.markConversationRead({ newestUnreadDate: foundSentAt, fromConfigMessage: false });
}
}
}

View File

@ -1,6 +1,6 @@
// eslint:disable: no-require-imports no-var-requires one-variable-per-declaration no-void-expression function-name
import _ from 'lodash';
import _, { isEmpty } from 'lodash';
import { MessageResultProps } from '../components/search/MessageSearchResults';
import { ConversationModel } from '../models/conversation';
import { ConversationAttributes } from '../models/conversationAttributes';
@ -307,6 +307,14 @@ async function getMessageById(
return new MessageModel(message);
}
async function getMessagesById(ids: Array<string>): Promise<Array<MessageModel>> {
const messages = await channels.getMessagesById(ids);
if (!messages || isEmpty(messages)) {
return [];
}
return messages.map((msg: any) => new MessageModel(msg));
}
async function getMessageByServerId(
conversationId: string,
serverId: number,
@ -361,12 +369,12 @@ async function getUnreadByConversation(
async function getUnreadDisappearingByConversation(
conversationId: string,
sentBeforeTimestamp: number
): Promise<MessageCollection> {
): Promise<Array<MessageModel>> {
const messages = await channels.getUnreadDisappearingByConversation(
conversationId,
sentBeforeTimestamp
);
return new MessageCollection(messages);
return new MessageCollection(messages).models;
}
async function markAllAsReadByConversationNoExpiration(
@ -816,6 +824,7 @@ export const Data = {
cleanUpExpirationTimerUpdateHistory,
getMessageIdsFromServerIds,
getMessageById,
getMessagesById,
getMessagesBySenderAndSentAt,
getMessageByServerId,
filterAlreadyFetchedOpengroupMessage,

View File

@ -53,6 +53,7 @@ const channelsToMake = new Set([
'getMessagesBySenderAndSentAt',
'getMessageIdsFromServerIds',
'getMessageById',
'getMessagesById',
'getMessagesBySentAt',
'getMessageByServerId',
'getExpiredMessages',

View File

@ -1,19 +1,19 @@
import { compact } from 'lodash';
import { SessionButtonColor } from '../../components/basic/SessionButton';
import { Data } from '../../data/data';
import { ConversationModel } from '../../models/conversation';
import { MessageModel } from '../../models/message';
import { getMessageQueue } from '../../session';
import { deleteSogsMessageByServerIds } from '../../session/apis/open_group_api/sogsv3/sogsV3DeleteMessages';
import { SnodeAPI } from '../../session/apis/snode_api/SNodeAPI';
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
import { getConversationController } from '../../session/conversations';
import { UnsendMessage } from '../../session/messages/outgoing/controlMessage/UnsendMessage';
import { ed25519Str } from '../../session/onions/onionPath';
import { SnodeAPI } from '../../session/apis/snode_api/SNodeAPI';
import { PubKey } from '../../session/types';
import { ToastUtils, UserUtils } from '../../session/utils';
import { resetSelectedMessageIds } from '../../state/ducks/conversations';
import { updateConfirmModal } from '../../state/ducks/modalDialog';
import { SessionButtonColor } from '../../components/basic/SessionButton';
import { deleteSogsMessageByServerIds } from '../../session/apis/open_group_api/sogsv3/sogsV3DeleteMessages';
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
/**
* Deletes messages for everyone in a 1-1 or everyone in a closed group conversation.
@ -204,7 +204,6 @@ export async function deleteMessageLocallyOnly({
} else {
// just mark the message as deleted but still show in conversation
await message.markAsDeleted();
await message.markMessageAsRead(Date.now());
}
conversation.updateLastMessage();
}

View File

@ -116,6 +116,10 @@ async function startJobRunners() {
runners.avatarDownloadRunner.startProcessing();
await runners.configurationSyncRunner.loadJobsFromDb();
runners.configurationSyncRunner.startProcessing();
await runners.updateMsgExpiryRunner.loadJobsFromDb();
runners.updateMsgExpiryRunner.startProcessing();
await runners.fetchSwarmMsgExpiryRunner.loadJobsFromDb();
runners.fetchSwarmMsgExpiryRunner.startProcessing();
}
// We need this 'first' check because we don't want to start the app up any other time

View File

@ -47,7 +47,6 @@ import {
actions as conversationActions,
conversationsChanged,
markConversationFullyRead,
MessageModelPropsWithoutConvoProps,
messagesDeleted,
ReduxConversationType,
} from '../state/ducks/conversations';
@ -120,6 +119,8 @@ import {
import { DisappearingMessages } from '../session/disappearing_messages';
import { DisappearingMessageConversationModeType } from '../session/disappearing_messages/types';
import { FetchMsgExpirySwarm } from '../session/utils/job_runners/jobs/FetchMsgExpirySwarmJob';
import { UpdateMsgExpirySwarm } from '../session/utils/job_runners/jobs/UpdateMsgExpirySwarmJob';
import { ReleasedFeatures } from '../util/releaseFeature';
import { markAttributesAsReadIfNeeded } from './messageFactory';
@ -138,7 +139,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
public updateLastMessage: () => any;
public throttledBumpTyping: () => void;
public throttledNotify: (message: MessageModel) => void;
public markConversationRead: (newestUnreadDate: number, readAt?: number) => void;
public markConversationRead: (opts: {
newestUnreadDate: number;
fromConfigMessage?: boolean;
}) => void;
public initialPromise: any;
private typingRefreshTimer?: NodeJS.Timeout | null;
@ -1143,7 +1147,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
}
// otherwise, do it the slow and expensive way
await this.markConversationReadBouncy(Date.now());
await this.markConversationReadBouncy({ newestUnreadDate: Date.now() });
}
public getUsInThatConversation() {
@ -1814,7 +1818,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
* This call is not debounced and can be quite heavy, so only call it when handling config messages updates
*/
public async markReadFromConfigMessage(newestUnreadDate: number) {
return this.markConversationReadBouncy(newestUnreadDate);
return this.markConversationReadBouncy({ newestUnreadDate, fromConfigMessage: true });
}
private async sendMessageJob(message: MessageModel) {
@ -2066,7 +2070,14 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
}
}
private async markConversationReadBouncy(newestUnreadDate: number, readAt: number = Date.now()) {
private async markConversationReadBouncy({
newestUnreadDate,
fromConfigMessage = false,
}: {
newestUnreadDate: number;
fromConfigMessage?: boolean;
}) {
const readAt = Date.now();
const conversationId = this.id;
Notifications.clearByConversationID(conversationId);
@ -2079,32 +2090,39 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
// Build the list of updated message models so we can mark them all as read on a single sqlite call
const readDetails = [];
const msgsIdsToUpdateExpireOnSwarm: Array<string> = [];
// eslint-disable-next-line no-restricted-syntax
for (const nowRead of oldUnreadNowRead) {
nowRead.markMessageReadNoCommit(readAt);
const shouldUpdateSwarmExpiry = nowRead.markMessageReadNoCommit(readAt);
if (shouldUpdateSwarmExpiry) {
msgsIdsToUpdateExpireOnSwarm.push(nowRead.get('id') as string);
}
const validTimestamp = nowRead.get('sent_at') || nowRead.get('serverTimestamp');
if (nowRead.get('source') && validTimestamp && isFinite(validTimestamp)) {
const sentAt = nowRead.get('sent_at') || nowRead.get('serverTimestamp');
if (nowRead.get('source') && sentAt && isFinite(sentAt)) {
readDetails.push({
sender: nowRead.get('source'),
timestamp: validTimestamp,
timestamp: sentAt,
});
}
}
const oldUnreadNowReadAttrs = oldUnreadNowRead.map(m => m.attributes);
if (oldUnreadNowReadAttrs?.length) {
await Data.saveMessages(oldUnreadNowReadAttrs);
}
const allProps: Array<MessageModelPropsWithoutConvoProps> = [];
// eslint-disable-next-line no-restricted-syntax
for (const nowRead of oldUnreadNowRead) {
allProps.push(nowRead.getMessageModelProps());
}
if (allProps.length) {
window.inboxStore?.dispatch(conversationActions.messagesChanged(allProps));
if (!isEmpty(msgsIdsToUpdateExpireOnSwarm)) {
if (fromConfigMessage) {
// when we mark a message as read through a convo volatile update,
// it means those messages have already an up to date expiry on the server side
// so we can just fetch those expiries for all the hashes we are marking as read, and trust it.
await FetchMsgExpirySwarm.queueNewJobIfNeeded(msgsIdsToUpdateExpireOnSwarm);
} else {
await UpdateMsgExpirySwarm.queueNewJobIfNeeded(msgsIdsToUpdateExpireOnSwarm);
}
}
// save all the attributes in a single call
await Data.saveMessages(oldUnreadNowRead.map(m => m.attributes));
// trigger all the ui updates in a single call
window.inboxStore?.dispatch(
conversationActions.messagesChanged(oldUnreadNowRead.map(m => m.getMessageModelProps()))
);
await this.commit();
@ -2527,6 +2545,7 @@ async function cleanUpExpireHistoryFromConvo(conversationId: string, isPrivate:
conversationId,
isPrivate
);
console.warn('cleanUpExpirationTimerUpdateHistory', conversationId, isPrivate, updateIdsRemoved);
window.inboxStore.dispatch(
messagesDeleted(updateIdsRemoved.map(m => ({ conversationKey: conversationId, messageId: m })))

View File

@ -539,8 +539,8 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
props.isDeleted = this.get('isDeleted');
}
if (this.get('messageHash')) {
props.messageHash = this.get('messageHash');
if (this.getMessageHash()) {
props.messageHash = this.getMessageHash();
}
if (this.get('received_at')) {
props.receivedAt = this.get('received_at');
@ -824,8 +824,14 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
reacts: undefined,
reactsIndex: undefined,
});
await this.markMessageAsRead(Date.now());
// we can ignore the result of that markMessageReadNoCommit as it would only be used
// to refresh the expiry of it(but it is already marked as "deleted", so we don't care)
this.markMessageReadNoCommit(Date.now());
await this.commit();
// the line below makes sure that getNextExpiringMessage will find this message as expiring.
// getNextExpiringMessage is used on app start to clean already expired messages which should have been removed already, but are not
await this.setToExpire();
await this.getConversation()?.refreshInMemoryDetails();
}
// One caller today: event handler for the 'Retry Send' entry on right click of a failed send message
@ -1096,17 +1102,16 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
return id;
}
public async markMessageAsRead(readAt: number) {
this.markMessageReadNoCommit(readAt);
await this.commit();
// the line below makes sure that getNextExpiringMessage will find this message as expiring.
// getNextExpiringMessage is used on app start to clean already expired messages which should have been removed already, but are not
await this.setToExpire();
/**
* Mark a message as read if it was not already read.
* @param readAt the timestamp at which this message was read
* @returns true if the message was marked as read, and if its expiry should be updated on the swarm, false otherwise
*/
public markMessageReadNoCommit(readAt: number): boolean {
if (!this.isUnread()) {
return false;
}
await this.getConversation()?.refreshInMemoryDetails();
}
public markMessageReadNoCommit(readAt: number) {
this.set({ unread: READ_MESSAGE_STATE.read });
const convo = this.getConversation();
@ -1123,27 +1128,28 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
if (expirationMode === 'legacy' || expirationMode === 'deleteAfterRead') {
if (this.isIncoming() && !this.isExpiring()) {
// NOTE We want to trigger disappearing now and then the TTL can update itself while it is running. Otherwise the UI is blocked until the request is completed.
void DisappearingMessages.updateMessageExpiryOnSwarm(
this,
'markMessageReadNoCommit()',
true
);
}
if (!this.getExpirationStartTimestamp()) {
this.set({
expirationStartTimestamp: DisappearingMessages.setExpirationStartTimestamp(
expirationMode,
readAt,
'markMessageReadNoCommit',
this.get('id')
),
});
// only if that message has not started to expire already, set its "start expiry".
// this is because a message can have a expire start timestamp set when receiving it, if the convo volatile said that the message was read by another device.
if (!this.getExpirationStartTimestamp()) {
this.set({
expirationStartTimestamp: DisappearingMessages.setExpirationStartTimestamp(
expirationMode,
readAt,
'markMessageReadNoCommit',
this.get('id')
),
});
// return true, we want to update/refresh the real expiry of this message from the swarm
return true;
}
// return true, we want to update/refresh the real expiry of this message from the swarm
return true;
}
}
}
Notifications.clearByMessageId(this.id);
return false;
}
public isExpiring() {
@ -1151,26 +1157,18 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
}
public isExpired() {
return this.msTilExpire() <= 0;
}
public msTilExpire() {
if (!this.isExpiring()) {
return Infinity;
return false;
}
const now = Date.now();
const start = this.getExpirationStartTimestamp();
if (!start) {
return Infinity;
return false;
}
const delta = this.getExpireTimer() * 1000;
let msFromNow = start + delta - now;
if (msFromNow < 0) {
msFromNow = 0;
}
return msFromNow;
const msFromNow = start + delta - now;
return msFromNow < 0;
}
public async setToExpire() {
if (this.isExpiring() && !this.getExpiresAt()) {
const start = this.getExpirationStartTimestamp();
@ -1185,8 +1183,8 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
this.set({
expires_at: expiresAt,
});
const id = this.get('id');
if (id) {
if (this.get('id')) {
await this.commit();
}
@ -1378,6 +1376,10 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
return this.get('expires_at');
}
public getMessageHash() {
return this.get('messageHash');
}
public getExpirationTimerUpdate() {
return this.get('expirationTimerUpdate');
}

View File

@ -31,6 +31,7 @@ export interface MessageAttributes {
expirationType?: DisappearingMessageType;
/** in seconds, 0 means no expiration */
expireTimer: number;
/** in milliseconds */
expirationStartTimestamp: number;
expires_at?: number;
expirationTimerUpdate?: ExpirationTimerUpdate;

View File

@ -1091,6 +1091,19 @@ function getMessageById(id: string) {
return jsonToObject(row.json);
}
function getMessagesById(ids: Array<string>) {
if (!isArray(ids)) {
throw new Error('getMessagesById expect an array of strings');
}
const rows = assertGlobalInstance()
.prepare(`SELECT json FROM ${MESSAGES_TABLE} WHERE id IN ( ${ids.map(() => '?').join(', ')} );`)
.all(ids);
if (!rows || isEmpty(rows)) {
return null;
}
return map(rows, row => jsonToObject(row.json));
}
// serverIds are not unique so we need the conversationId
function getMessageByServerId(conversationId: string, serverId: number) {
const row = assertGlobalInstance()
@ -2469,6 +2482,7 @@ export const sqlNode = {
getMessagesBySenderAndSentAt,
getMessageIdsFromServerIds,
getMessageById,
getMessagesById,
getMessagesBySentAt,
getMessageByServerId,
getSeenMessagesByHashList,

View File

@ -17,7 +17,6 @@ import { getOpenGroupManager } from '../session/apis/open_group_api/opengroupV2/
import { OpenGroupUtils } from '../session/apis/open_group_api/utils';
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
import { getSwarmPollingInstance } from '../session/apis/snode_api';
import { getExpiriesFromSnode } from '../session/apis/snode_api/getExpiriesRequest';
import { getConversationController } from '../session/conversations';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { Profile, ProfileManager } from '../session/profile_manager/ProfileManager';
@ -42,6 +41,7 @@ import {
setLastProfileUpdateTimestamp,
} from '../util/storage';
// eslint-disable-next-line import/no-unresolved, import/extensions
import { FetchMsgExpirySwarm } from '../session/utils/job_runners/jobs/FetchMsgExpirySwarmJob';
import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions';
import {
ContactsWrapperActions,
@ -700,55 +700,25 @@ async function applyConvoVolatileUpdateFromWrapper(
}
try {
const canBeDeleteAfterRead = foundConvo && !foundConvo.isMe() && foundConvo.isPrivate();
// TODO legacy messages support will be removed in a future release
if (
canBeDeleteAfterRead &&
(foundConvo.getExpirationMode() === 'deleteAfterRead' ||
foundConvo.getExpirationMode() === 'legacy') &&
foundConvo.getExpireTimer() > 0
) {
const messages2Expire = await Data.getUnreadDisappearingByConversation(
if (foundConvo.isPrivate() && !foundConvo.isMe() && foundConvo.getExpireTimer() > 0) {
const messagesExpiring = await Data.getUnreadDisappearingByConversation(
convoId,
lastReadMessageTimestamp
);
if (messages2Expire.length) {
const messageHashes = compact(
messages2Expire
.filter(
m =>
m.getExpirationType() !== undefined &&
m.getExpirationType() !== 'deleteAfterSend' &&
m.getExpireTimer() > 0
)
.map(m => m.get('messageHash'))
);
const currentExpiryTimestamps = await getExpiriesFromSnode({
messageHashes,
timestamp: lastReadMessageTimestamp,
});
const messagesExpiringAfterRead = messagesExpiring.filter(
m => m.getExpirationType() === 'deleteAfterRead' && m.getExpireTimer() > 0
);
if (currentExpiryTimestamps.length) {
for (let index = 0; index < messages2Expire.length; index++) {
if (currentExpiryTimestamps[index] === -1) {
window.log.debug(
`[applyConvoVolatileUpdateFromWrapper] invalid expiry value returned from snode. We will keep the local value of ${messages2Expire
.get(index)
.get('id')}.\nmessageHash: ${messageHashes[index]},`
);
continue;
}
messages2Expire.at(index).set('expires_at', currentExpiryTimestamps[index]);
}
}
const messageIdsToFetchExpiriesFor = compact(messagesExpiringAfterRead.map(m => m.id));
if (messageIdsToFetchExpiriesFor.length) {
await FetchMsgExpirySwarm.queueNewJobIfNeeded(messageIdsToFetchExpiriesFor);
}
}
// window.log.debug(
// `applyConvoVolatileUpdateFromWrapper: ${convoId}: forcedUnread:${forcedUnread}, lastReadMessage:${lastReadMessageTimestamp}`
// );
// this should mark all the messages sent before fromWrapper.lastRead as read and update the unreadCount
// this mark all the messages sent before fromWrapper.lastRead as read and update the unreadCount
await foundConvo.markReadFromConfigMessage(lastReadMessageTimestamp);
// this commits to the DB, if needed
await foundConvo.markAsUnread(forcedUnread, true);

View File

@ -19,7 +19,6 @@ import { showMessageRequestBannerOutsideRedux } from '../state/ducks/userConfig'
import { getHideMessageRequestBannerOutsideRedux } from '../state/selectors/userConfig';
import { GoogleChrome } from '../util';
import { LinkPreviews } from '../util/linkPreviews';
import { ReleasedFeatures } from '../util/releaseFeature';
function contentTypeSupported(type: string): boolean {
const Chrome = GoogleChrome;
@ -174,14 +173,6 @@ async function processProfileKeyNoCommit(
}
}
function handleSyncedReceiptsNoCommit(message: MessageModel, conversation: ConversationModel) {
// If the newly received message is from us, we assume that we've seen the messages up until that point
const sentTimestamp = message.get('sent_at');
if (sentTimestamp) {
conversation.markConversationRead(sentTimestamp);
}
}
export type RegularMessageType = Pick<
SignalService.DataMessage,
| 'attachments'
@ -285,19 +276,7 @@ async function handleRegularMessage(
// should only occur after isOutgoing request as it relies on didApproveMe being false.
await conversation.setDidApproveMe(true);
}
} else if (type === 'outgoing') {
const userConfigLibsession = await ReleasedFeatures.checkIsUserConfigFeatureReleased();
if (!userConfigLibsession) {
// we want to do this for all types of conversations, not just private chats
handleSyncedReceiptsNoCommit(message, conversation);
if (conversation.isPrivate()) {
await conversation.setIsApproved(true);
}
}
}
const conversationActiveAt = conversation.get('active_at');
if (
!conversationActiveAt ||
@ -362,7 +341,10 @@ async function markConvoAsReadIfOutgoingMessage(
await message.commit();
}
}
conversation.markConversationRead(sentAt);
conversation.markConversationRead({
newestUnreadDate: sentAt,
fromConfigMessage: false,
});
}
}
}

View File

@ -179,3 +179,7 @@ export type NotEmptyArrayOfBatchResults = NonEmptyArray<{
code: number;
body: Record<string, any>;
}>;
export type WithShortenOrExtend = { shortenOrExtend: 'shorten' | 'extend' | '' };
export const MAX_SUBREQUESTS_COUNT = 20;

View File

@ -2,7 +2,11 @@ import { isArray } from 'lodash';
import { Snode } from '../../../data/data';
import { processOnionRequestErrorAtDestination, SnodeResponse } from './onions';
import { snodeRpc } from './sessionRpc';
import { NotEmptyArrayOfBatchResults, SnodeApiSubRequests } from './SnodeRequestTypes';
import {
MAX_SUBREQUESTS_COUNT,
NotEmptyArrayOfBatchResults,
SnodeApiSubRequests,
} from './SnodeRequestTypes';
/**
* This is the equivalent to the batch send on sogs. The target node runs each sub request and returns a list of all the sub status and bodies.
@ -21,6 +25,14 @@ export async function doSnodeBatchRequest(
associatedWith: string | null,
method: 'batch' | 'sequence' = 'batch'
): Promise<NotEmptyArrayOfBatchResults> {
if (subRequests.length > MAX_SUBREQUESTS_COUNT) {
window.log.error(
`batch subRequests count cannot be more than ${MAX_SUBREQUESTS_COUNT}. Got ${subRequests.length}`
);
throw new Error(
`batch subRequests count cannot be more than ${MAX_SUBREQUESTS_COUNT}. Got ${subRequests.length}`
);
}
const result = await snodeRpc({
method,
params: { requests: subRequests },

View File

@ -1,5 +1,15 @@
/* eslint-disable no-restricted-syntax */
import { isEmpty, sample } from 'lodash';
import {
chunk,
compact,
difference,
flatten,
isArray,
isEmpty,
isNumber,
sample,
uniqBy,
} from 'lodash';
import pRetry from 'p-retry';
import { Snode } from '../../../data/data';
import { getSodiumRenderer } from '../../crypto';
@ -7,16 +17,19 @@ import { StringUtils, UserUtils } from '../../utils';
import { fromBase64ToArray, fromHexToArray } from '../../utils/String';
import { EmptySwarmError } from '../../utils/errors';
import { SeedNodeAPI } from '../seed_node_api';
import { UpdateExpiryOnNodeSubRequest } from './SnodeRequestTypes';
import {
MAX_SUBREQUESTS_COUNT,
UpdateExpiryOnNodeSubRequest,
WithShortenOrExtend,
} from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { getSwarmFor } from './snodePool';
import { SnodeSignature } from './snodeSignatures';
import { ExpireMessageResultItem, ExpireMessagesResultsContent } from './types';
export type verifyExpireMsgsResponseSignatureProps = ExpireMessageResultItem & {
pubkey: string;
snodePubkey: any;
snodePubkey: string;
messageHashes: Array<string>;
};
@ -99,13 +112,14 @@ export async function processExpireRequestResponse(
const signature = swarm[nodeKey].signature;
if (!updatedHashes || !expiry || !signature) {
window.log.warn(
`[processExpireRequestResponse] Missing arguments on ${
targetNode.pubkey_ed25519
} so we will ignore this result on (${nodeKey}) and move onto the next node.\n${JSON.stringify(
swarm[nodeKey]
)}`
);
// most likely just a timeout from one of the swarm members
// window.log.debug(
// `[processExpireRequestResponse] Missing arguments on ${
// targetNode.pubkey_ed25519
// } so we will ignore this result on (${nodeKey}) and move onto the next node.\n${JSON.stringify(
// swarm[nodeKey]
// )}`
// );
continue;
}
@ -134,24 +148,23 @@ export async function processExpireRequestResponse(
return results;
}
async function expireOnNodes(
targetNode: Snode,
expireRequest: UpdateExpiryOnNodeSubRequest
): Promise<number> {
try {
const result = await doSnodeBatchRequest(
[expireRequest],
targetNode,
4000,
expireRequest.params.pubkey,
'batch'
);
type UpdatedExpiryWithHashes = { messageHashes: Array<string>; updatedExpiryMs: number };
type UpdatedExpiryWithHash = { messageHash: string; updatedExpiryMs: number };
if (!result || result.length !== 1) {
async function updateExpiryOnNodes(
targetNode: Snode,
ourPubKey: string,
expireRequests: Array<UpdateExpiryOnNodeSubRequest>
): Promise<Array<UpdatedExpiryWithHash>> {
try {
const result = await doSnodeBatchRequest(expireRequests, targetNode, 4000, ourPubKey, 'batch');
if (!result || result.length !== expireRequests.length) {
window.log.error(
`There was an issue with the results. updateExpiryOnNodes ${targetNode.ip}:${targetNode.port}. expected length or results ${expireRequests.length} but got ${result.length}`
);
throw Error(
`There was an issue with the results. sessionRpc ${targetNode.ip}:${
targetNode.port
} expireRequest ${JSON.stringify(expireRequest)}`
`There was an issue with the results. updateExpiryOnNodes ${targetNode.ip}:${targetNode.port}`
);
}
@ -163,30 +176,63 @@ async function expireOnNodes(
throw Error(`result is not 200 but ${firstResult.code}`);
}
const bodyFirstResult = firstResult.body;
const expirationResults = await processExpireRequestResponse(
expireRequest.params.pubkey,
targetNode,
bodyFirstResult.swarm as ExpireMessagesResultsContent,
expireRequest.params.messages
// Note: expirationResults is an array of `Map<snode pubkeys, {msgHashes,expiry}>` changed/unchanged which have a valid signature
const expirationResults: Array<ExpireRequestResponseResults> = await Promise.all(
expireRequests.map((request, index) => {
const bodyIndex = result[index]?.body?.swarm;
if (!bodyIndex || isEmpty(bodyIndex)) {
return {};
}
return processExpireRequestResponse(
ourPubKey,
targetNode,
bodyIndex as ExpireMessagesResultsContent,
request.params.messages
);
})
);
const firstExpirationResult = Object.entries(expirationResults).at(0);
if (!firstExpirationResult) {
throw new Error('firstExpirationResult is null');
const changesValid: Array<UpdatedExpiryWithHashes> = [];
// then we need to iterate over each subrequests result to find the snodes which reporting a valid update of the expiry
for (let index = 0; index < expirationResults.length; index++) {
// the 0 gets the first snode of the swarm (they all report the same *sig verified* changes).
// the 1 discard the snode_pk entry and access the request result (i.e. a record with hashes: [string] and the expiry)
const expirationResult = Object.entries(expirationResults?.[index])?.[0]?.[1];
if (
!expirationResult ||
isEmpty(expirationResult) ||
!isArray(expirationResult.hashes) ||
!isNumber(expirationResult.expiry)
) {
continue;
}
changesValid.push({
messageHashes: expirationResult.hashes,
updatedExpiryMs: expirationResult.expiry,
});
}
const messageHash = firstExpirationResult[0];
const expiry = firstExpirationResult[1].expiry;
if (!expiry || !messageHash) {
throw new Error(
`Something is wrong with the firstExpirationResult: ${JSON.stringify(
JSON.stringify(firstExpirationResult)
)}`
);
const hashesRequestedButNotInResults = difference(
flatten(expireRequests.map(m => m.params.messages)),
flatten(changesValid.map(c => c.messageHashes))
);
if (!isEmpty(hashesRequestedButNotInResults)) {
// we requested hashes which are not part of the result. They most likely expired already so let's mark those messages as expiring now.
changesValid.push({
messageHashes: hashesRequestedButNotInResults,
updatedExpiryMs: Date.now(),
});
}
return expiry;
const expiryWithIndividualHash: Array<UpdatedExpiryWithHash> = flatten(
changesValid.map(change =>
change.messageHashes.map(h => ({ messageHash: h, updatedExpiryMs: change.updatedExpiryMs }))
)
);
window.log.debug('update expiry expiryWithIndividualHash: ', expiryWithIndividualHash);
return expiryWithIndividualHash;
} catch (err) {
// NOTE batch requests have their own retry logic which includes abort errors that will break our retry logic so we need to catch them and throw regular errors
if (err instanceof pRetry.AbortError) {
@ -197,122 +243,198 @@ async function expireOnNodes(
}
}
export type ExpireMessageOnSnodeProps = {
messageHash: string;
expireTimer: number;
extend?: boolean;
shorten?: boolean;
};
export type ExpireMessageWithTimerOnSnodeProps = {
messageHashes: Array<string>;
expireTimerMs: number;
readAt: number;
} & WithShortenOrExtend;
export async function buildExpireRequest(
props: ExpireMessageOnSnodeProps
): Promise<UpdateExpiryOnNodeSubRequest | null> {
const { messageHash, expireTimer, extend, shorten } = props;
export type ExpireMessageWithExpiryOnSnodeProps = Pick<
ExpireMessageWithTimerOnSnodeProps,
'messageHashes'
> &
WithShortenOrExtend & {
expiryMs: number;
};
if (extend && shorten) {
window.log.error(
'[buildExpireRequest] We cannot extend and shorten a message at the same time',
messageHash
);
return null;
/**
* Exported for testing for testing only. Used to shorten/extend expiries of an array of array of messagehashes.
* @param expireDetails the subrequest to do
* @returns
*/
export async function buildExpireRequestBatchExpiry(
expireDetails: Array<ExpireMessageWithExpiryOnSnodeProps>
) {
if (expireDetails.length > MAX_SUBREQUESTS_COUNT) {
throw new Error(`batch request can only have ${MAX_SUBREQUESTS_COUNT} subrequests at most`);
}
const results = await Promise.all(expireDetails.map(m => buildExpireRequestSingleExpiry(m)));
return compact(results);
}
// NOTE empty string means we want to hardcode the expiry to a TTL value, otherwise it's a shorten or extension of the TTL
const shortenOrExtend = shorten ? 'shorten' : extend ? 'extend' : ('' as const);
export async function buildExpireRequestSingleExpiry(
expireDetails: ExpireMessageWithExpiryOnSnodeProps
): Promise<UpdateExpiryOnNodeSubRequest | null> {
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
if (!ourPubKey) {
window.log.error('[buildExpireRequest] No pubkey found', messageHash);
window.log.error('[buildExpireRequestSingleExpiry] No user pubkey');
return null;
}
const { messageHashes, expiryMs, shortenOrExtend } = expireDetails;
// NOTE for shortenOrExtend, '' means we want to hardcode the expiry to a TTL value, otherwise it's a shorten or extension of the TTL
const expiry = GetNetworkTime.getNowWithNetworkOffset() + expireTimer;
const signResult = await SnodeSignature.generateUpdateExpirySignature({
shortenOrExtend,
timestamp: expiry,
messageHashes: [messageHash],
timestamp: expiryMs,
messageHashes,
});
if (!signResult) {
window.log.error(
`[buildExpireRequest] SnodeSignature.generateUpdateExpirySignature returned an empty result ${messageHash}`
`[buildExpireRequestSingleExpiry] SnodeSignature.generateUpdateExpirySignature returned an empty result`
);
return null;
}
const expireParams: UpdateExpiryOnNodeSubRequest = {
method: 'expire',
return {
method: 'expire' as const,
params: {
pubkey: ourPubKey,
pubkey_ed25519: signResult.pubkey_ed25519.toUpperCase(),
messages: [messageHash],
expiry,
extend: extend || undefined,
shorten: shorten || undefined,
messages: messageHashes,
expiry: expiryMs,
extend: shortenOrExtend === 'extend' || undefined,
shorten: shortenOrExtend === 'shorten' || undefined,
signature: signResult?.signature,
},
};
return expireParams;
}
type GroupedBySameExpiry = Record<string, Array<string>>;
function getBatchExpiryChunk({
expiryChunk,
groupedBySameExpiry,
shortenOrExtend,
}: {
expiryChunk: Array<string>;
} & WithShortenOrExtend & { groupedBySameExpiry: GroupedBySameExpiry }) {
const expiryDetails: Array<ExpireMessageWithExpiryOnSnodeProps> = expiryChunk.map(expiryStr => {
const expiryMs = parseInt(expiryStr, 10);
const msgHashesForThisExpiry = groupedBySameExpiry[expiryStr];
return {
expiryMs,
messageHashes: msgHashesForThisExpiry,
shortenOrExtend,
};
});
return buildExpireRequestBatchExpiry(expiryDetails);
}
function groupMsgByExpiry(expiringDetails: ExpiringDetails) {
const hashesWithExpiry = uniqBy(
expiringDetails.map(m => ({
messageHash: m.messageHash,
expiry: m.expireTimerMs + m.readAt,
})),
n => n.messageHash
);
const groupedBySameExpiry: GroupedBySameExpiry = {};
for (let index = 0; index < hashesWithExpiry.length; index++) {
const { expiry, messageHash } = hashesWithExpiry[index];
const expiryStr = `${expiry}`;
if (!groupedBySameExpiry[expiryStr]) {
groupedBySameExpiry[expiryStr] = [];
}
groupedBySameExpiry[expiryStr].push(messageHash);
}
return groupedBySameExpiry;
}
export type ExpiringDetails = Array<
{ messageHash: string } & Pick<ExpireMessageWithTimerOnSnodeProps, 'readAt' | 'expireTimerMs'>
>;
/**
* Sends an 'expire' request to the user's swarm for a specific message.
* This supports both extending and shortening a message's TTL.
* The returned TTL should be assigned to the message to expire.
* @param messageHash the hash of the message to expire
* @param readAt when that message was read on this device (network timestamp offset is removed later)
* @param expireTimer amount of time until we expire the message from now in milliseconds
* @param extend whether to extend the message's TTL
* @param shorten whether to shorten the message's TTL
* @returns the TTL of the message as set by the server
*/
export async function expireMessageOnSnode(
props: ExpireMessageOnSnodeProps
): Promise<number | null> {
const { messageHash } = props;
export async function expireMessagesOnSnode(
expiringDetails: ExpiringDetails,
options: WithShortenOrExtend
): Promise<Array<{ messageHash: string; updatedExpiryMs: number }>> {
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
if (!ourPubKey) {
window.log.error('[expireMessageOnSnode] No pubkey found', messageHash);
return null;
throw new Error('[expireMessageOnSnode] No pubkey found');
}
let snode: Snode | undefined;
try {
const expireRequestParams = await buildExpireRequest(props);
if (!expireRequestParams) {
throw new Error(`Failed to build expire request ${JSON.stringify(props)}`);
}
let newTTL = null;
// key is a string even if it is really a number because Object.keys only knows strings...
const groupedBySameExpiry = groupMsgByExpiry(expiringDetails);
const chunkedExpiries = chunk(Object.keys(groupedBySameExpiry), MAX_SUBREQUESTS_COUNT); // chunking because the batch endpoint only allow MAX_SUBREQUESTS_COUNT subrequests per requests
await pRetry(
async () => {
const swarm = await getSwarmFor(ourPubKey);
snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
}
newTTL = await expireOnNodes(snode, expireRequestParams);
},
{
retries: 3,
factor: 2,
minTimeout: SeedNodeAPI.getMinTimeout(),
onFailedAttempt: e => {
window?.log?.warn(
`[expireMessageOnSnode] expire message on snode attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... Error: ${e.message}`
);
},
}
// TODO after the next storage server fork we will get a new endpoint allowing to batch
// update expiries even when they are * not * the same for all the message hashes.
// But currently we can't access it that endpoint, so we need to keep this hacky way for now.
// groupby expiries ( expireTimer+ readAt), then batch them with a limit of MAX_SUBREQUESTS_COUNT batch calls per batch requests, then do those in parralel, for now.
const expireRequestsParams = await Promise.all(
chunkedExpiries.map(chk =>
getBatchExpiryChunk({
expiryChunk: chk,
groupedBySameExpiry,
shortenOrExtend: options.shortenOrExtend,
})
)
);
if (!expireRequestsParams || isEmpty(expireRequestsParams)) {
throw new Error(`Failed to build expire request`);
}
// we most likely will only have a single chunk, so this is a bit of over engineered.
// if any of those requests fails, make sure to not consider
const allSettled = await Promise.allSettled(
expireRequestsParams.map(chunkRequest =>
pRetry(
async () => {
const swarm = await getSwarmFor(ourPubKey);
snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
}
return updateExpiryOnNodes(snode, ourPubKey, chunkRequest);
},
{
retries: 3,
factor: 2,
minTimeout: SeedNodeAPI.getMinTimeout(),
onFailedAttempt: e => {
window?.log?.warn(
`[expireMessageOnSnode] expire message on snode attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... Error: ${e.message}`
);
},
}
)
)
);
return newTTL;
return flatten(compact(allSettled.map(m => (m.status === 'fulfilled' ? m.value : null))));
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`[expireMessageOnSnode] ${e.code ? `${e.code} ` : ''}${e.message ||
e} by ${ourPubKey} for ${messageHash} via snode:${snodeStr}`
`[expireMessageOnSnode] ${e.code || ''}${e.message ||
e} by ${ourPubKey} via snode:${snodeStr}`
);
throw e;
}

View File

@ -1,5 +1,5 @@
/* eslint-disable no-restricted-syntax */
import { isEmpty, sample } from 'lodash';
import { isFinite, isNil, isNumber, sample } from 'lodash';
import pRetry from 'p-retry';
import { Snode } from '../../../data/data';
import { UserUtils } from '../../utils';
@ -7,6 +7,7 @@ import { EmptySwarmError } from '../../utils/errors';
import { SeedNodeAPI } from '../seed_node_api';
import { GetExpiriesFromNodeSubRequest } from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { getSwarmFor } from './snodePool';
import { SnodeSignature } from './snodeSignatures';
import { GetExpiriesResultsContent } from './types';
@ -14,44 +15,27 @@ import { GetExpiriesResultsContent } from './types';
export type GetExpiriesRequestResponseResults = Record<string, number>;
export async function processGetExpiriesRequestResponse(
targetNode: Snode,
_targetNode: Snode,
expiries: GetExpiriesResultsContent,
messageHashes: Array<string>
): Promise<GetExpiriesRequestResponseResults> {
if (isEmpty(expiries)) {
if (isNil(expiries)) {
throw Error(
`[processGetExpiriesRequestResponse] Expiries are missing! ${JSON.stringify(messageHashes)}`
`[processGetExpiriesRequestResponse] Expiries are nul/undefined! ${JSON.stringify(
messageHashes
)}`
);
}
const results: GetExpiriesRequestResponseResults = {};
for (const messageHash of Object.keys(expiries)) {
if (!expiries[messageHash]) {
window.log.warn(
`[processGetExpiriesRequestResponse] Expiries result failure on ${
targetNode.pubkey_ed25519
} for messageHash ${messageHash}\n${JSON.stringify(
expiries[messageHash]
)} moving to the next node`
);
continue;
}
// Note: we iterate over the hash we've requested and not the one we received,
// because a message which expired already is not in the result at all (and we need to force it to be expired)
for (const messageHash of messageHashes) {
const expiryMs = expiries[messageHash];
if (!expiryMs) {
window.log.warn(
`[processGetExpiriesRequestResponse] Missing expiry value on ${
targetNode.pubkey_ed25519
} so we will ignore this result (${messageHash}) and move onto the next node.\n${JSON.stringify(
expiries[messageHash]
)}`
);
results[messageHash] = -1; // explicit failure value
} else {
if (expiries[messageHash] && isNumber(expiryMs) && isFinite(expiryMs)) {
results[messageHash] = expiryMs;
}
} // not adding the Date.now() fallback here as it is done in the caller of this function
}
return results;
@ -60,7 +44,7 @@ export async function processGetExpiriesRequestResponse(
async function getExpiriesFromNodes(
targetNode: Snode,
expireRequest: GetExpiriesFromNodeSubRequest
): Promise<Array<number>> {
) {
try {
const result = await doSnodeBatchRequest(
[expireRequest],
@ -83,22 +67,27 @@ async function getExpiriesFromNodes(
const firstResult = result[0];
if (firstResult.code !== 200) {
throw Error(`result is not 200 but ${firstResult.code}`);
throw Error(`getExpiriesFromNodes result is not 200 but ${firstResult.code}`);
}
const bodyFirstResult = firstResult.body;
debugger;
// expirationResults is a record of {messageHash: currentExpiry}
const expirationResults = await processGetExpiriesRequestResponse(
targetNode,
bodyFirstResult.expiries as GetExpiriesResultsContent,
firstResult.body.expiries as GetExpiriesResultsContent,
expireRequest.params.messages
);
if (!Object.keys(expirationResults).length) {
throw new Error('expirationResults is empty');
}
// Note: even if expirationResults is empty we need to process the results.
// The status code is 200, so if the results is empty, it means all those messages already expired.
const expiryTimestamps: Array<number> = Object.values(expirationResults);
return expiryTimestamps;
// Note: a hash which already expired on the server is not going to be returned. So we force it's fetchedExpiry to be now() to make it expire asap
const expiriesWithForcedExpiried = expireRequest.params.messages.map(messageHash => ({
messageHash,
fetchedExpiry: expirationResults?.[messageHash] || Date.now(),
}));
return expiriesWithForcedExpiried;
} catch (err) {
// NOTE batch requests have their own retry logic which includes abort errors that will break our retry logic so we need to catch them and throw regular errors
if (err instanceof pRetry.AbortError) {
@ -111,13 +100,12 @@ async function getExpiriesFromNodes(
export type GetExpiriesFromSnodeProps = {
messageHashes: Array<string>;
timestamp: number;
};
export async function buildGetExpiriesRequest(
props: GetExpiriesFromSnodeProps
): Promise<GetExpiriesFromNodeSubRequest | null> {
const { messageHashes, timestamp } = props;
export async function buildGetExpiriesRequest({
messageHashes,
}: GetExpiriesFromSnodeProps): Promise<GetExpiriesFromNodeSubRequest | null> {
const timestamp = GetNetworkTime.getNowWithNetworkOffset();
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
if (!ourPubKey) {
@ -159,11 +147,7 @@ export async function buildGetExpiriesRequest(
* @param timestamp the time (ms) the request was initiated, must be within ±60s of the current time so using the server time is recommended.
* @returns an arrray of the expiry timestamps (TTL) for the given messages
*/
export async function getExpiriesFromSnode(
props: GetExpiriesFromSnodeProps
): Promise<Array<number>> {
const { messageHashes } = props;
export async function getExpiriesFromSnode({ messageHashes }: GetExpiriesFromSnodeProps) {
// FIXME There is a bug in the snode code that requires at least 2 messages to be requested. Will be fixed in next storage server release
if (messageHashes.length === 1) {
messageHashes.push('fakehash');
@ -178,21 +162,19 @@ export async function getExpiriesFromSnode(
let snode: Snode | undefined;
try {
const expireRequestParams = await buildGetExpiriesRequest(props);
const expireRequestParams = await buildGetExpiriesRequest({ messageHashes });
if (!expireRequestParams) {
throw new Error(`Failed to build get_expiries request ${JSON.stringify(props)}`);
throw new Error(`Failed to build get_expiries request ${JSON.stringify({ messageHashes })}`);
}
let expiryTimestamps: Array<number> = [];
await pRetry(
const fetchedExpiries = await pRetry(
async () => {
const swarm = await getSwarmFor(ourPubKey);
snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
}
expiryTimestamps = await getExpiriesFromNodes(snode, expireRequestParams);
return getExpiriesFromNodes(snode, expireRequestParams);
},
{
retries: 3,
@ -206,7 +188,7 @@ export async function getExpiriesFromSnode(
}
);
return expiryTimestamps;
return fetchedExpiries;
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(

View File

@ -1,6 +1,7 @@
import { getSodiumRenderer } from '../../crypto';
import { StringUtils, UserUtils } from '../../utils';
import { fromHexToArray, fromUInt8ArrayToBase64 } from '../../utils/String';
import { WithShortenOrExtend } from './SnodeRequestTypes';
import { GetNetworkTime } from './getNetworkTime';
export type SnodeSignatureResult = {
@ -104,10 +105,9 @@ async function generateUpdateExpirySignature({
timestamp,
messageHashes,
}: {
shortenOrExtend: 'extend' | 'shorten' | '';
timestamp: number;
messageHashes: Array<string>;
}): Promise<{ signature: string; pubkey_ed25519: string } | null> {
} & WithShortenOrExtend): Promise<{ signature: string; pubkey_ed25519: string } | null> {
const ourEd25519Key = await UserUtils.getUserED25519KeyPair();
if (!ourEd25519Key) {

View File

@ -1,4 +1,4 @@
import { isNumber, throttle, uniq } from 'lodash';
import { isEmpty, isNumber, throttle, uniq } from 'lodash';
import { messagesExpired } from '../../state/ducks/conversations';
import { initWallClockListener } from '../../util/wallClockListener';
@ -8,7 +8,7 @@ import { READ_MESSAGE_STATE } from '../../models/conversationAttributes';
import { MessageModel } from '../../models/message';
import { SignalService } from '../../protobuf';
import { ReleasedFeatures } from '../../util/releaseFeature';
import { expireMessageOnSnode } from '../apis/snode_api/expireRequest';
import { ExpiringDetails, expireMessagesOnSnode } from '../apis/snode_api/expireRequest';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
import { getConversationController } from '../conversations';
import { isValidUnixTimestamp } from '../utils/Timestamps';
@ -40,6 +40,8 @@ async function destroyMessagesAndUpdateRedux(
// Delete any attachments
for (let i = 0; i < messageIds.length; i++) {
/* eslint-disable no-await-in-loop */
// TODO make this use getMessagesById and not getMessageById
const message = await Data.getMessageById(messageIds[i]);
await message?.cleanup();
/* eslint-enable no-await-in-loop */
@ -77,10 +79,22 @@ async function destroyExpiredMessages() {
messages.forEach(expired => {
window.log.info('Message expired', {
sentAt: expired.get('sent_at'),
hash: expired.getMessageHash(),
});
});
await destroyMessagesAndUpdateRedux(messagesExpiredDetails);
const convosToRefresh = uniq(messagesExpiredDetails.map(m => m.conversationKey));
await Promise.all(
convosToRefresh.map(async c => {
getConversationController()
.get(c)
?.updateLastMessage();
return getConversationController()
.get(c)
?.refreshInMemoryDetails();
})
);
} catch (error) {
window.log.error(
'destroyExpiredMessages: Error deleting expired messages',
@ -108,7 +122,11 @@ async function checkExpiringMessages() {
}
const ms = expiresAt - Date.now();
window.log.info(`message expires in ${ms}ms, or ${ms / 1000}s, or ${ms / (3600 * 1000)}h`);
window.log.info(
`message with hash:${next.getMessageHash()} expires in ${ms}ms, or ${Math.floor(
ms / 1000
)}s, or ${Math.floor(ms / (3600 * 1000))}h`
);
let wait = expiresAt - Date.now();
@ -516,60 +534,72 @@ async function checkHasOutdatedDisappearingMessageClient(
}
}
async function updateMessageExpiryOnSwarm(
message: MessageModel,
callLocation?: string, // this is for debugging purposes
shouldCommit?: boolean
) {
if (callLocation) {
// window.log.debug(`[updateMessageExpiryOnSwarm] called from: ${callLocation} `);
}
async function updateMessageExpiriesOnSwarm(messages: Array<MessageModel>) {
const expiringDetails: ExpiringDetails = [];
if (!message.getExpirationType() || !message.getExpireTimer()) {
window.log.debug(
`[updateMessageExpiryOnSwarm] Message ${message.get(
'messageHash'
)} has no expirationType or expireTimer set. Ignoring`
);
return message;
}
const messageHash = message.get('messageHash');
if (!messageHash) {
window.log.debug(
`[updateMessageExpiryOnSwarm] Missing messageHash messageId: ${message.get('id')}`
);
return message;
}
const newTTL = await expireMessageOnSnode({
messageHash,
expireTimer: message.getExpireTimer() * 1000,
shorten: true,
});
const expiresAt = message.getExpiresAt();
if (newTTL && newTTL !== expiresAt) {
message.set({
expires_at: newTTL,
});
if (shouldCommit) {
await message.commit();
messages.forEach(msg => {
const hash = msg.getMessageHash();
const timestampStarted = msg.getExpirationStartTimestamp();
const timerSeconds = msg.getExpireTimer();
const disappearingType = msg.getExpirationType();
if (
!hash ||
!timestampStarted ||
timestampStarted <= 0 ||
!timerSeconds ||
timerSeconds <= 0 ||
disappearingType !== 'deleteAfterRead' || // this is very important as a message not stored on the swarm will be assumed expired, and so deleted locally!
!msg.isIncoming() // this is very important as a message not stored on the swarm will be assumed expired, and so deleted locally!
) {
return;
}
} else {
window.log.debug(
`[updateMessageExpiryOnSwarm]\nmessageHash ${messageHash} has no new TTL.${
expiresAt
? `\nKeeping the old one ${expiresAt}which expires at ${new Date(
expiresAt
).toUTCString()}`
: ''
}`
);
}
expiringDetails.push({
messageHash: hash,
expireTimerMs: timerSeconds * 1000,
readAt: timestampStarted,
});
});
return message;
if (isEmpty(expiringDetails)) {
window.log.debug(`[updateMessageExpiriesOnSwarm] no expiringDetails to update`);
return;
}
window.log.debug('updateMessageExpiriesOnSwarm: expiringDetails', expiringDetails);
const newTTLs = await expireMessagesOnSnode(expiringDetails, { shortenOrExtend: 'shorten' });
const updatedMsgModels: Array<MessageModel> = [];
newTTLs.forEach(m => {
const message = messages.find(model => model.getMessageHash() === m.messageHash);
if (!message) {
return;
}
const newTTLms = m.updatedExpiryMs;
const realReadAt = newTTLms - message.getExpireTimer() * 1000;
if (
newTTLms &&
(newTTLms !== message.getExpiresAt() ||
message.get('expirationStartTimestamp') !== realReadAt) &&
message.getExpireTimer()
) {
window.log.debug(`updateMessageExpiriesOnSwarm: setting for msg hash ${m.messageHash}:`, {
expires_at: newTTLms,
expirationStartTimestamp: realReadAt,
unread: READ_MESSAGE_STATE.read,
});
message.set({
expires_at: newTTLms,
expirationStartTimestamp: realReadAt,
unread: READ_MESSAGE_STATE.read,
});
updatedMsgModels.push(message);
}
});
if (!isEmpty(updatedMsgModels)) {
await Promise.all(updatedMsgModels.map(m => m.commit()));
}
}
export const DisappearingMessages = {
@ -583,5 +613,6 @@ export const DisappearingMessages = {
checkForExpiringOutgoingMessage,
getMessageReadyToDisappear,
checkHasOutdatedDisappearingMessageClient,
updateMessageExpiryOnSwarm,
updateMessageExpiriesOnSwarm,
destroyExpiredMessages,
};

View File

@ -192,7 +192,7 @@ async function send(
canBeDeleteAfterRead &&
(expirationMode === 'legacy' || expirationMode === 'deleteAfterRead')
) {
await DisappearingMessages.updateMessageExpiryOnSwarm(foundMessage, 'send()');
await DisappearingMessages.updateMessageExpiriesOnSwarm([foundMessage]);
}
}

View File

@ -1,16 +1,18 @@
import { cloneDeep, compact, isArray, isString } from 'lodash';
import { Data } from '../../../data/data';
import { Storage } from '../../../util/storage';
import { timeout } from '../Promise';
import { persistedJobFromData } from './JobDeserialization';
import { JobRunnerType } from './jobs/JobRunnerType';
import {
AvatarDownloadPersistedData,
ConfigurationSyncPersistedData,
FetchMsgExpirySwarmPersistedData,
PersistedJob,
RunJobResult,
TypeOfPersistedData,
UpdateMsgExpirySwarmPersistedData,
} from './PersistedJob';
import { Storage } from '../../../util/storage';
import { JobRunnerType } from './jobs/JobRunnerType';
/**
* 'job_in_progress' if there is already a job in progress
@ -100,7 +102,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
// make sure there is no job with that same identifier already .
window.log.info(`job runner adding type :"${job.persistedData.jobType}" `);
window.log.debug(`job runner adding type:"${job.persistedData.jobType}"`);
return this.addJobUnchecked(job);
}
@ -166,7 +168,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
private async writeJobsToDB() {
const serialized = this.getSerializedJobs();
window.log.debug(`writing to db for "${this.jobRunnerType}": `, serialized);
await Storage.put(this.getJobRunnerItemId(), JSON.stringify(serialized));
}
@ -357,7 +359,19 @@ const avatarDownloadRunner = new PersistedJobRunner<AvatarDownloadPersistedData>
null
);
const updateMsgExpiryRunner = new PersistedJobRunner<UpdateMsgExpirySwarmPersistedData>(
'UpdateMsgExpirySwarmJob',
null
);
const fetchSwarmMsgExpiryRunner = new PersistedJobRunner<FetchMsgExpirySwarmPersistedData>(
'FetchMsgExpirySwarmJob',
null
);
export const runners = {
configurationSyncRunner,
updateMsgExpiryRunner,
fetchSwarmMsgExpiryRunner,
avatarDownloadRunner,
};

View File

@ -1,8 +1,10 @@
import { cloneDeep, isEmpty } from 'lodash';
import { cloneDeep, flatten, isEmpty, isNil, uniq } from 'lodash';
export type PersistedJobType =
| 'ConfigurationSyncJobType'
| 'AvatarDownloadJobType'
| 'FetchMsgExpirySwarmJobType'
| 'UpdateMsgExpirySwarmJobType'
| 'FakeSleepForJobType'
| 'FakeSleepForJobMultiType';
@ -31,23 +33,40 @@ export interface AvatarDownloadPersistedData extends PersistedJobData {
conversationId: string;
}
interface PersitedDataWithMsgIds extends PersistedJobData {
msgIds: Array<string>;
}
export interface ConfigurationSyncPersistedData extends PersistedJobData {
jobType: 'ConfigurationSyncJobType';
}
export interface FetchMsgExpirySwarmPersistedData extends PersitedDataWithMsgIds {
jobType: 'FetchMsgExpirySwarmJobType';
}
export interface UpdateMsgExpirySwarmPersistedData extends PersitedDataWithMsgIds {
jobType: 'UpdateMsgExpirySwarmJobType';
}
export type TypeOfPersistedData =
| ConfigurationSyncPersistedData
| AvatarDownloadPersistedData
| FetchMsgExpirySwarmPersistedData
| UpdateMsgExpirySwarmPersistedData
| FakeSleepJobData
| FakeSleepForMultiJobData;
export type AddJobCheckReturn = 'skipAddSameJobPresent' | 'sameJobDataAlreadyInQueue' | null;
export type AddJobCheckReturn = 'skipAddSameJobPresent' | null;
export enum RunJobResult {
Success = 1,
RetryJobIfPossible = 2,
PermanentFailure = 3,
}
function isDataWithMsgIds(data: PersistedJobData): data is PersitedDataWithMsgIds {
return !isNil((data as PersitedDataWithMsgIds)?.msgIds);
}
/**
* This class can be used to save and run jobs from the database.
@ -122,6 +141,34 @@ export abstract class PersistedJob<T extends PersistedJobData> {
: null;
}
public addJobCheckEveryMsgIdsAlreadyPresent(jobs: Array<T>): 'skipAddSameJobPresent' | null {
if (!jobs.length) {
return null;
}
if (!isDataWithMsgIds(this.persistedData)) {
throw new Error(`${this.persistedData.jobType} does not have a msgIds field`);
}
const allIdsAlreadyScheduled = uniq(
flatten(
jobs.map(m => {
if (!isDataWithMsgIds(m)) {
throw new Error(`${this.persistedData.jobType} does not have a msgIds field`);
}
return m.msgIds;
})
)
);
// if all ids we are trying to add are already tracked as other jobs in the job runner,
// there is no need to add this job at all.
if (this.persistedData.msgIds.every(m => allIdsAlreadyScheduled.includes(m))) {
return 'skipAddSameJobPresent';
}
return null;
}
public abstract getJobTimeoutMs(): number;
/**

View File

@ -0,0 +1,143 @@
/* eslint-disable no-await-in-loop */
import { compact, isEmpty, isNumber, uniq } from 'lodash';
import { v4 } from 'uuid';
import { Data } from '../../../../data/data';
import { READ_MESSAGE_STATE } from '../../../../models/conversationAttributes';
import { MessageModel } from '../../../../models/message';
import { isSignInByLinking } from '../../../../util/storage';
import { getExpiriesFromSnode } from '../../../apis/snode_api/getExpiriesRequest';
import { DisappearingMessages } from '../../../disappearing_messages';
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
FetchMsgExpirySwarmPersistedData,
PersistedJob,
RunJobResult,
} from '../PersistedJob';
class FetchMsgExpirySwarmJob extends PersistedJob<FetchMsgExpirySwarmPersistedData> {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
msgIds,
}: Partial<
Pick<
FetchMsgExpirySwarmPersistedData,
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
>
> &
Pick<FetchMsgExpirySwarmPersistedData, 'msgIds'>) {
super({
jobType: 'FetchMsgExpirySwarmJobType',
identifier: identifier || v4(),
delayBetweenRetries: 2000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 2,
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
msgIds: uniq(msgIds),
});
}
public async run(): Promise<RunJobResult> {
const start = Date.now();
try {
if (!this.persistedData.msgIds || isEmpty(this.persistedData.msgIds)) {
return RunJobResult.Success;
}
let msgModels = await Data.getMessagesById(this.persistedData.msgIds);
const messageHashes = compact(msgModels.map(m => m.getMessageHash()));
if (isEmpty(msgModels) || isEmpty(messageHashes)) {
return RunJobResult.Success;
}
const fetchedExpiries = await getExpiriesFromSnode({
messageHashes,
});
const updatedMsgModels: Array<MessageModel> = [];
if (fetchedExpiries.length) {
// get a fresh list of attributes for those message models
msgModels = await Data.getMessagesById(this.persistedData.msgIds);
for (let index = 0; index < fetchedExpiries.length; index++) {
const expiry = fetchedExpiries[index];
if (expiry.fetchedExpiry <= 0) {
continue;
}
const message = msgModels.find(m => m.getMessageHash() === expiry.messageHash);
if (!message) {
continue;
}
const realReadAt = expiry.fetchedExpiry - message.getExpireTimer() * 1000;
if (
(message.get('expirationStartTimestamp') !== realReadAt ||
message.get('expires_at') !== expiry.fetchedExpiry) &&
message.getExpireTimer()
) {
window.log.debug(
`FetchMsgExpirySwarmJob: setting for msg hash ${message.getMessageHash()}:`,
{
expires_at: expiry.fetchedExpiry,
unread: READ_MESSAGE_STATE.read,
expirationStartTimestamp: realReadAt,
}
);
message.set({
expires_at: expiry.fetchedExpiry,
unread: READ_MESSAGE_STATE.read,
expirationStartTimestamp: realReadAt,
});
updatedMsgModels.push(message);
}
}
}
await Promise.all(updatedMsgModels.map(m => m.commit()));
await DisappearingMessages.destroyExpiredMessages();
return RunJobResult.Success;
} finally {
window.log.debug(`FetchMsgExpirySwarmJob run() took ${Date.now() - start}ms`);
}
}
public serializeJob(): FetchMsgExpirySwarmPersistedData {
const fromParent = super.serializeBase();
return fromParent;
}
public addJobCheck(jobs: Array<FetchMsgExpirySwarmPersistedData>): AddJobCheckReturn {
return this.addJobCheckEveryMsgIdsAlreadyPresent(jobs);
}
public nonRunningJobsToRemove(_jobs: Array<FetchMsgExpirySwarmPersistedData>) {
return [];
}
public getJobTimeoutMs(): number {
return 20000;
}
}
async function queueNewJobIfNeeded(msgIds: Array<string>) {
if (isSignInByLinking()) {
window.log.info('NOT Scheduling FetchMsgExpirySwarmJob: as we are linking a device');
return;
}
if (isEmpty(msgIds)) {
return;
}
await runners.fetchSwarmMsgExpiryRunner.addJob(
new FetchMsgExpirySwarmJob({ nextAttemptTimestamp: Date.now() + 1000, msgIds })
);
}
export const FetchMsgExpirySwarm = {
FetchMsgExpirySwarmJob,
queueNewJobIfNeeded,
};

View File

@ -1,5 +1,7 @@
export type JobRunnerType =
| 'ConfigurationSyncJob'
| 'FetchMsgExpirySwarmJob'
| 'UpdateMsgExpirySwarmJob'
| 'FakeSleepForJob'
| 'FakeSleepForMultiJob'
| 'AvatarDownloadJob';

View File

@ -0,0 +1,98 @@
/* eslint-disable no-await-in-loop */
import { isEmpty, isNumber, uniq } from 'lodash';
import { v4 } from 'uuid';
import { Data } from '../../../../data/data';
import { isSignInByLinking } from '../../../../util/storage';
import { DisappearingMessages } from '../../../disappearing_messages';
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
PersistedJob,
RunJobResult,
UpdateMsgExpirySwarmPersistedData,
} from '../PersistedJob';
class UpdateMsgExpirySwarmJob extends PersistedJob<UpdateMsgExpirySwarmPersistedData> {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
msgIds,
}: Partial<
Pick<
UpdateMsgExpirySwarmPersistedData,
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
>
> &
Pick<UpdateMsgExpirySwarmPersistedData, 'msgIds'>) {
super({
jobType: 'UpdateMsgExpirySwarmJobType',
identifier: identifier || v4(),
delayBetweenRetries: 2000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 2,
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
msgIds: uniq(msgIds),
});
}
public async run(): Promise<RunJobResult> {
const start = Date.now();
try {
if (!this.persistedData.msgIds || isEmpty(this.persistedData.msgIds)) {
return RunJobResult.Success;
}
const msgModels = await Data.getMessagesById(this.persistedData.msgIds);
if (isEmpty(msgModels)) {
return RunJobResult.Success;
}
await DisappearingMessages.updateMessageExpiriesOnSwarm(msgModels);
await DisappearingMessages.destroyExpiredMessages();
return RunJobResult.Success;
} finally {
window.log.debug(`UpdateMsgExpirySwarmJob run() took ${Date.now() - start}ms`);
}
}
public serializeJob(): UpdateMsgExpirySwarmPersistedData {
const fromParent = super.serializeBase();
return fromParent;
}
public addJobCheck(jobs: Array<UpdateMsgExpirySwarmPersistedData>): AddJobCheckReturn {
// if all ids we are trying to add are already tracked as other jobs in the job runner,
// there is no need to add this job at all.
return this.addJobCheckEveryMsgIdsAlreadyPresent(jobs);
}
public nonRunningJobsToRemove(_jobs: Array<UpdateMsgExpirySwarmPersistedData>) {
return [];
}
public getJobTimeoutMs(): number {
return 20000;
}
}
async function queueNewJobIfNeeded(msgIds: Array<string>) {
if (isSignInByLinking()) {
window.log.info('NOT Scheduling UpdateMsgExpirySwarmJob: as we are linking a device');
return;
}
if (isEmpty(msgIds)) {
return;
}
await runners.updateMsgExpiryRunner.addJob(
new UpdateMsgExpirySwarmJob({ nextAttemptTimestamp: Date.now() + 1000, msgIds })
);
}
export const UpdateMsgExpirySwarm = {
UpdateMsgExpirySwarmJob,
queueNewJobIfNeeded,
};

View File

@ -3,9 +3,9 @@ import chaiAsPromised from 'chai-as-promised';
import Sinon from 'sinon';
import { UpdateExpiryOnNodeSubRequest } from '../../../../session/apis/snode_api/SnodeRequestTypes';
import {
ExpireMessageOnSnodeProps,
ExpireMessageWithExpiryOnSnodeProps,
ExpireRequestResponseResults,
buildExpireRequest,
buildExpireRequestSingleExpiry,
processExpireRequestResponse,
verifyExpireMsgsResponseSignature,
verifyExpireMsgsResponseSignatureProps,
@ -37,13 +37,16 @@ describe('ExpireRequest', () => {
});
describe('buildExpireRequest', () => {
const props: ExpireMessageOnSnodeProps = {
messageHash: 'messageHash',
expireTimer: 60,
const props: ExpireMessageWithExpiryOnSnodeProps = {
messageHashes: ['messageHash'],
expiryMs: 12340000 + 60 * 1000,
shortenOrExtend: '',
};
it('builds a request with just the messageHash and expireTimer of 1 minute', async () => {
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest(props);
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequestSingleExpiry(
props
);
expect(request, 'should not return null').to.not.be.null;
expect(request, 'should not return undefined').to.not.be.undefined;
@ -53,8 +56,8 @@ describe('ExpireRequest', () => {
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(request.params.messages[0], 'messageHash should be in messages array').to.equal(
props.messageHash
expect(request.params.messages, 'messageHash should be in messages array').to.deep.equal(
props.messageHashes
);
expect(
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
@ -65,9 +68,9 @@ describe('ExpireRequest', () => {
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
});
it('builds a request with extend enabled', async () => {
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest({
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequestSingleExpiry({
...props,
extend: true,
shortenOrExtend: 'extend',
});
expect(request, 'should not return null').to.not.be.null;
@ -78,8 +81,8 @@ describe('ExpireRequest', () => {
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(request.params.messages[0], 'messageHash should be in messages array').to.equal(
props.messageHash
expect(request.params.messages, 'messageHash should be in messages array').to.equal(
props.messageHashes
);
expect(
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
@ -90,9 +93,9 @@ describe('ExpireRequest', () => {
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
});
it('builds a request with shorten enabled', async () => {
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest({
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequestSingleExpiry({
...props,
shorten: true,
shortenOrExtend: 'shorten',
});
expect(request, 'should not return null').to.not.be.null;
@ -103,8 +106,8 @@ describe('ExpireRequest', () => {
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(request.params.messages[0], 'messageHash should be in messages array').to.equal(
props.messageHash
expect(request.params.messages, 'messageHash should be in messages array').to.equal(
props.messageHashes
);
expect(
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
@ -114,15 +117,6 @@ describe('ExpireRequest', () => {
expect(request.params.shorten, 'shorten should be true').to.be.true;
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
});
it('fails to build a request if extend and shorten are both enabled', async () => {
const request: UpdateExpiryOnNodeSubRequest | null = await buildExpireRequest({
...props,
extend: true,
shorten: true,
});
expect(request, 'should return null').to.be.null;
});
});
describe('verifyExpireMsgsResponseSignature', () => {

View File

@ -44,7 +44,6 @@ describe('GetExpiriesRequest', () => {
describe('buildGetExpiriesRequest', () => {
const props: GetExpiriesFromSnodeProps = {
messageHashes: ['messageHash'],
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
};
it('builds a valid request given the messageHashes and valid timestamp for now', async () => {