use timestamp offset in for messages storage requests (#1892)

* add using timestamp offset from network for sending part1

* remove MessageController as we now rely on the database

* fix tests for message sending overriding timestamp
This commit is contained in:
Audric Ackermann 2021-09-16 06:48:46 +02:00 committed by GitHub
parent ab75f945ff
commit b45109985c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 302 additions and 341 deletions

View File

@ -2,7 +2,6 @@
_,
Backbone,
i18n,
getMessageController,
moment,
Whisper
*/
@ -19,9 +18,7 @@
const messages = await window.Signal.Data.getExpiredMessages();
await Promise.all(
messages.map(async fromDB => {
const message = getMessageController().register(fromDB.id, fromDB);
messages.map(async message => {
window.log.info('Message expired', {
sentAt: message.get('sent_at'),
});

View File

@ -2,8 +2,7 @@
Whisper,
Backbone,
_,
getMessageController,
window
window
*/
/* eslint-disable more/no-then */
@ -57,7 +56,7 @@
return null;
}
return getMessageController().register(target.id, target);
return target;
},
async onReceipt(receipt) {
try {

View File

@ -1,7 +1,6 @@
/* global
Backbone,
Whisper,
getMessageController
*/
/* eslint-disable more/no-then */
@ -53,33 +52,32 @@
return;
}
const message = getMessageController().register(found.id, found);
const readAt = receipt.get('read_at');
// If message is unread, we mark it read. Otherwise, we update the expiration
// timer to the time specified by the read sync if it's earlier than
// the previous read time.
if (message.isUnread() && window.isFocused()) {
await message.markRead(readAt);
if (found.isUnread() && window.isFocused()) {
await found.markRead(readAt);
// onReadMessage may result in messages older than this one being
// marked read. We want those messages to have the same expire timer
// start time as this one, so we pass the readAt value through.
const conversation = message.getConversation();
const conversation = found.getConversation();
if (conversation) {
conversation.onReadMessage(message, readAt);
conversation.onReadMessage(found, readAt);
}
} else {
const now = Date.now();
const existingTimestamp = message.get('expirationStartTimestamp');
const existingTimestamp = found.get('expirationStartTimestamp');
const expirationStartTimestamp = Math.min(
now,
Math.min(existingTimestamp || now, readAt || now)
);
message.set({ expirationStartTimestamp });
found.set({ expirationStartTimestamp });
const force = true;
await message.setToExpire(force);
await found.setToExpire(force);
}
this.remove(receipt);

View File

@ -667,6 +667,8 @@ async function removeDB() {
await sql.removeDB(userDir);
try {
console.warn('Remove DB: removing.', userDir);
userConfig.remove();
ephemeralConfig.remove();
} catch (e) {

View File

@ -317,8 +317,6 @@ window.models = require('./ts/models');
window.Signal = window.Signal || {};
window.Signal.Data = require('./ts/data/data');
window.getMessageController = () => window.libsession.Messages.getMessageController();
window.Signal.Logs = require('./js/modules/logs');
window.addEventListener('contextmenu', e => {

View File

@ -6,7 +6,6 @@ import { ClosedGroupVisibleMessage } from '../session/messages/outgoing/visibleM
import { PubKey } from '../session/types';
import { UserUtils } from '../session/utils';
import { BlockedNumberController } from '../util';
import { getMessageController } from '../session/messages';
import { leaveClosedGroup } from '../session/group';
import { SignalService } from '../protobuf';
import { MessageModel } from './message';
@ -50,6 +49,7 @@ import {
import { ed25519Str } from '../session/onions/onionPath';
import { getDecryptedMediaUrl } from '../session/crypto/DecryptedAttachmentsManager';
import { IMAGE_JPEG } from '../types/MIME';
import { getLatestTimestampOffset } from '../session/snode_api/SNodeAPI';
export enum ConversationTypeEnum {
GROUP = 'group',
@ -734,6 +734,8 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
const editedQuote = _.isEmpty(quote) ? undefined : quote;
const { upgradeMessageSchema } = window.Signal.Migrations;
const diffTimestamp = Date.now() - getLatestTimestampOffset();
const messageWithSchema = await upgradeMessageSchema({
type: 'outgoing',
body,
@ -741,7 +743,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
quote: editedQuote,
preview,
attachments,
sent_at: now,
sent_at: diffTimestamp,
received_at: now,
expireTimer,
recipients,
@ -943,7 +945,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
if (setToExpire) {
await model.setToExpire();
}
getMessageController().register(messageId, model);
window.inboxStore?.dispatch(
conversationActions.messageAdded({
conversationKey: this.id,

View File

@ -47,7 +47,6 @@ import {
} from '../session/utils/AttachmentsV2';
import { OpenGroupVisibleMessage } from '../session/messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { getV2OpenGroupRoom } from '../data/opengroups';
import { getMessageController } from '../session/messages';
import { isUsFromCache } from '../session/utils/User';
import { perfEnd, perfStart } from '../session/utils/Performance';
import { AttachmentTypeWithPath } from '../types/Attachment';
@ -268,7 +267,6 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
}
public async cleanup() {
getMessageController().unregister(this.id);
await window.Signal.Migrations.deleteExternalMessageFiles(this.attributes);
}

View File

@ -28,7 +28,6 @@ import { UserUtils } from '../session/utils';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import _ from 'lodash';
import { forceSyncConfigurationNowIfNeeded } from '../session/utils/syncUtils';
import { getMessageController } from '../session/messages';
import { ClosedGroupEncryptionPairReplyMessage } from '../session/messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairReplyMessage';
import { queueAllCachedFromSource } from './receiver';
import { openConversationWithMessages } from '../state/ducks/conversations';
@ -916,7 +915,6 @@ export async function createClosedGroup(groupName: string, members: Array<string
};
const dbMessage = await ClosedGroup.addUpdateMessage(convo, groupDiff, 'outgoing', Date.now());
getMessageController().register(dbMessage.id, dbMessage);
// be sure to call this before sending the message.
// the sending pipeline needs to know from GroupUtils when a message is for a medium group

View File

@ -393,6 +393,7 @@ export async function isMessageDuplicate({
sentAt: timestamp,
});
}
if (!result) {
return false;
}

View File

@ -1,7 +1,6 @@
import { initIncomingMessage } from './dataMessage';
import { toNumber } from 'lodash';
import { getConversationController } from '../session/conversations';
import { getMessageController } from '../session/messages';
import { actions as conversationActions } from '../state/ducks/conversations';
import { ConversationTypeEnum } from '../models/conversation';
@ -34,7 +33,6 @@ export async function onError(ev: any) {
conversation.updateLastMessage();
await conversation.notify(message);
getMessageController().register(message.id, message);
window.inboxStore?.dispatch(
conversationActions.messageAdded({
conversationKey: conversation.id,

View File

@ -8,7 +8,6 @@ import { UserUtils } from '../session/utils';
import { getConversationController } from '../session/conversations';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import { MessageModel } from '../models/message';
import { getMessageController } from '../session/messages';
import { getMessageById, getMessagesBySentAt } from '../../ts/data/data';
import { MessageModelPropsWithoutConvoProps, messagesAdded } from '../state/ducks/conversations';
import { updateProfileOneAtATime } from './dataMessage';
@ -118,8 +117,7 @@ async function copyFromQuotedMessage(msg: MessageModel, quote?: Quote): Promise<
window?.log?.info(`Found quoted message id: ${id}`);
quote.referencedMessageNotFound = false;
const queryMessage = getMessageController().register(found.id, found);
quote.text = queryMessage.get('body') || '';
quote.text = found.get('body') || '';
if (!firstAttachment || !contentTypeSupported(firstAttachment.contentType)) {
return;
@ -128,9 +126,9 @@ async function copyFromQuotedMessage(msg: MessageModel, quote?: Quote): Promise<
firstAttachment.thumbnail = null;
try {
if ((queryMessage.get('schemaVersion') || 0) < TypedMessage.VERSION_NEEDED_FOR_DISPLAY) {
const upgradedMessage = await upgradeMessageSchema(queryMessage.attributes);
queryMessage.set(upgradedMessage);
if ((found.get('schemaVersion') || 0) < TypedMessage.VERSION_NEEDED_FOR_DISPLAY) {
const upgradedMessage = await upgradeMessageSchema(found.attributes);
found.set(upgradedMessage);
await upgradedMessage.commit();
}
} catch (error) {
@ -141,7 +139,7 @@ async function copyFromQuotedMessage(msg: MessageModel, quote?: Quote): Promise<
return;
}
const queryAttachments = queryMessage.get('attachments') || [];
const queryAttachments = found.get('attachments') || [];
if (queryAttachments.length > 0) {
const queryFirst = queryAttachments[0];
@ -155,7 +153,7 @@ async function copyFromQuotedMessage(msg: MessageModel, quote?: Quote): Promise<
}
}
const queryPreview = queryMessage.get('preview') || [];
const queryPreview = found.get('preview') || [];
if (queryPreview.length > 0) {
const queryFirst = queryPreview[0];
const { image } = queryFirst;
@ -412,8 +410,6 @@ export async function handleMessageJob(
message.set({ id });
getMessageController().register(message.id, message);
// Note that this can save the message again, if jobs were queued. We need to
// call it after we have an id for this message, because the jobs refer back
// to their source message.

View File

@ -1,7 +1,7 @@
import { EncryptionType } from '../types/EncryptionType';
import { SignalService } from '../../protobuf';
import { PubKey } from '../types';
import { concatUInt8Array, getSodium } from '.';
import { concatUInt8Array, getSodium, MessageEncrypter } from '.';
import { fromHexToArray } from '../utils/String';
export { concatUInt8Array, getSodium };
import { getLatestClosedGroupEncryptionKeyPair } from '../../../ts/data/data';
@ -27,9 +27,11 @@ export async function encrypt(
encryptionType: EncryptionType
): Promise<EncryptResult> {
const { CLOSED_GROUP_MESSAGE, SESSION_MESSAGE } = SignalService.Envelope.Type;
if (encryptionType !== EncryptionType.ClosedGroup && encryptionType !== EncryptionType.Fallback) {
throw new Error(`Invalid encryption type:${encryptionType}`);
}
const encryptForClosedGroup = encryptionType === EncryptionType.ClosedGroup;
const plainText = addMessagePadding(plainTextBuffer);
@ -44,9 +46,7 @@ export async function encrypt(
}
const hexPubFromECKeyPair = PubKey.cast(hexEncryptionKeyPair.publicHex);
// the exports is to reference the exported function, so when we stub it during test, we stub the one called here
const cipherTextClosedGroup = await exports.encryptUsingSessionProtocol(
const cipherTextClosedGroup = await MessageEncrypter.encryptUsingSessionProtocol(
hexPubFromECKeyPair,
plainText
);
@ -56,8 +56,8 @@ export async function encrypt(
cipherText: cipherTextClosedGroup,
};
}
const cipherText = await MessageEncrypter.encryptUsingSessionProtocol(device, plainText);
const cipherText = await exports.encryptUsingSessionProtocol(device, plainText);
return { envelopeType: SESSION_MESSAGE, cipherText };
}

View File

@ -16,7 +16,6 @@ import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/controlMessag
import { ConversationModel, ConversationTypeEnum } from '../../models/conversation';
import { MessageModel } from '../../models/message';
import { MessageModelType } from '../../models/messageType';
import { getMessageController } from '../messages';
import {
addKeyPairToCacheAndDBIfNeeded,
distributingClosedGroupEncryptionKeyPairs,
@ -30,6 +29,7 @@ import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group
import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage';
import { updateOpenGroupV2 } from '../../opengroup/opengroupV2/OpenGroupUpdate';
import { getSwarmPollingInstance } from '../snode_api';
import { getLatestTimestampOffset } from '../snode_api/SNodeAPI';
export type GroupInfo = {
id: string;
@ -109,21 +109,18 @@ export async function initiateGroupUpdate(
if (diff.newName?.length) {
const nameOnlyDiff: GroupDiff = { newName: diff.newName };
const dbMessageName = await addUpdateMessage(convo, nameOnlyDiff, 'outgoing', Date.now());
getMessageController().register(dbMessageName.id as string, dbMessageName);
await sendNewName(convo, diff.newName, dbMessageName.id as string);
}
if (diff.joiningMembers?.length) {
const joiningOnlyDiff: GroupDiff = { joiningMembers: diff.joiningMembers };
const dbMessageAdded = await addUpdateMessage(convo, joiningOnlyDiff, 'outgoing', Date.now());
getMessageController().register(dbMessageAdded.id as string, dbMessageAdded);
await sendAddedMembers(convo, diff.joiningMembers, dbMessageAdded.id as string, updateObj);
}
if (diff.leavingMembers?.length) {
const leavingOnlyDiff: GroupDiff = { leavingMembers: diff.leavingMembers };
const dbMessageLeaving = await addUpdateMessage(convo, leavingOnlyDiff, 'outgoing', Date.now());
getMessageController().register(dbMessageLeaving.id as string, dbMessageLeaving);
const stillMembers = members;
await sendRemovedMembers(
convo,
@ -312,17 +309,17 @@ export async function leaveClosedGroup(groupId: string) {
await convo.commit();
const source = UserUtils.getOurPubKeyStrFromCache();
const diffTimestamp = Date.now() - getLatestTimestampOffset();
const dbMessage = await convo.addSingleMessage({
group_update: { left: 'You' },
conversationId: groupId,
source,
type: 'outgoing',
sent_at: now,
sent_at: diffTimestamp,
received_at: now,
expireTimer: 0,
});
getMessageController().register(dbMessage.id as string, dbMessage);
// Send the update to the group
const ourLeavingMessage = new ClosedGroupMemberLeftMessage({
timestamp: Date.now(),

View File

@ -1,75 +0,0 @@
// You can see MessageController for in memory registered messages.
// Ee register messages to it everytime we send one, so that when an event happens we can find which message it was based on this id.
import { MessageModel } from '../../models/message';
type MessageControllerEntry = {
message: MessageModel;
timestamp: number;
};
let messageControllerInstance: MessageController | null;
export const getMessageController = () => {
if (messageControllerInstance) {
return messageControllerInstance;
}
messageControllerInstance = new MessageController();
return messageControllerInstance;
};
// It's not only data from the db which is stored on the MessageController entries, we could fetch this again. What we cannot fetch from the db and which is stored here is all listeners a particular messages is linked to for instance. We will be able to get rid of this once we don't use backbone models at all
export class MessageController {
private readonly messageLookup: Map<string, MessageControllerEntry>;
/**
* Not to be used directly. Instead call getMessageController()
*/
constructor() {
this.messageLookup = new Map();
// cleanup every hour the cache
setInterval(this.cleanup, 3600 * 1000);
}
public register(id: string, message: MessageModel) {
if (!(message instanceof MessageModel)) {
throw new Error('Only MessageModels can be registered to the MessageController.');
}
const existing = this.messageLookup.get(id);
if (existing) {
this.messageLookup.set(id, {
message: existing.message,
timestamp: Date.now(),
});
return existing.message;
}
this.messageLookup.set(id, {
message,
timestamp: Date.now(),
});
return message;
}
public unregister(id: string) {
this.messageLookup.delete(id);
}
public cleanup() {
window?.log?.warn('Cleaning up MessageController singleton oldest messages...');
const now = Date.now();
(this.messageLookup || []).forEach(messageEntry => {
const { message, timestamp } = messageEntry;
const conversation = message.getConversation();
if (now - timestamp > 5 * 60 * 1000 && !conversation) {
this.unregister(message.id);
}
});
}
public get(identifier: string) {
return this.messageLookup.get(identifier);
}
}

View File

@ -1,4 +1,3 @@
import * as Outgoing from './outgoing';
import { getMessageController } from './MessageController';
export { Outgoing, getMessageController };
export { Outgoing };

View File

@ -1,80 +0,0 @@
import _ from 'lodash';
import { storeOnNode } from '../snode_api/SNodeAPI';
import { getSwarmFor } from '../snode_api/snodePool';
import { firstTrue } from '../utils/Promise';
const DEFAULT_CONNECTIONS = 3;
/**
* Refactor note: We should really clean this up ... it's very messy
*
* We need to split it into 2 sends:
* - Snodes
* - Open Groups
*
* Mikunj:
* Temporarily i've made it so `MessageSender` handles open group sends and calls this function for regular sends.
*/
export async function sendMessage(
pubKey: string,
data: Uint8Array,
messageTimeStamp: number,
ttl: number,
options: {
isPublic?: boolean;
} = {}
): Promise<void> {
const { isPublic = false } = options;
if (isPublic) {
window?.log?.warn('this sendMessage() should not be called anymore with an open group message');
return;
}
const data64 = window.dcodeIO.ByteBuffer.wrap(data).toString('base64');
// Using timestamp as a unique identifier
const swarm = await getSwarmFor(pubKey);
// send parameters
const params = {
pubKey,
ttl: `${ttl}`,
timestamp: `${messageTimeStamp}`,
data: data64,
};
const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS);
const promises = usedNodes.map(async usedNode => {
// TODO: Revert back to using snode address instead of IP
// No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch.
// the only case we could care about a retry would be when the usedNode is not correct,
// but considering we trigger this request with a few snode in //, this should be fine.
const successfulSend = await storeOnNode(usedNode, params);
if (successfulSend) {
return usedNode;
}
// should we mark snode as bad if it can't store our message?
return undefined;
});
let snode;
try {
snode = await firstTrue(promises);
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`loki_message:::sendMessage - ${e.code} ${e.message} to ${pubKey} via snode:${snodeStr}`
);
throw e;
}
if (!usedNodes || usedNodes.length === 0) {
throw new window.textsecure.EmptySwarmError(pubKey, 'Ran out of swarm nodes to query');
}
window?.log?.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${snode.ip}:${snode.port}`
);
}

View File

@ -129,8 +129,12 @@ export class MessageQueue {
let rawMessage;
try {
rawMessage = await MessageUtils.toRawMessage(user, message);
const wrappedEnvelope = await MessageSender.send(rawMessage);
await MessageSentHandler.handleMessageSentSuccess(rawMessage, wrappedEnvelope);
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(rawMessage);
await MessageSentHandler.handleMessageSentSuccess(
rawMessage,
effectiveTimestamp,
wrappedEnvelope
);
return !!wrappedEnvelope;
} catch (error) {
if (rawMessage) {
@ -145,14 +149,19 @@ export class MessageQueue {
const jobQueue = this.getJobQueue(device);
messages.forEach(async message => {
const messageId = String(message.timestamp);
const messageId = message.identifier;
if (!jobQueue.has(messageId)) {
// We put the event handling inside this job to avoid sending duplicate events
const job = async () => {
try {
const wrappedEnvelope = await MessageSender.send(message);
await MessageSentHandler.handleMessageSentSuccess(message, wrappedEnvelope);
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(message);
await MessageSentHandler.handleMessageSentSuccess(
message,
effectiveTimestamp,
wrappedEnvelope
);
const cb = this.pendingMessageCache.callbacks.get(message.identifier);

View File

@ -11,10 +11,45 @@ import { postMessage } from '../../opengroup/opengroupV2/OpenGroupAPIV2';
import { OpenGroupMessageV2 } from '../../opengroup/opengroupV2/OpenGroupMessageV2';
import { fromUInt8ArrayToBase64 } from '../utils/String';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import * as LokiMessageApi from './LokiMessageApi';
import { addMessagePadding } from '../crypto/BufferPadding';
import _ from 'lodash';
import { storeOnNode } from '../snode_api/SNodeAPI';
import { getSwarmFor } from '../snode_api/snodePool';
import { firstTrue } from '../utils/Promise';
import { MessageSender } from '.';
import * as Data from '../../../ts/data/data';
import { SNodeAPI } from '../snode_api';
// ================ Regular ================
const DEFAULT_CONNECTIONS = 3;
// ================ SNODE STORE ================
function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) {
const diffTimestamp = Date.now() - SNodeAPI.getLatestTimestampOffset();
const { plainTextBuffer } = message;
const contentDecoded = SignalService.Content.decode(plainTextBuffer);
const { dataMessage, dataExtractionNotification, typingMessage } = contentDecoded;
if (dataMessage && dataMessage.timestamp && dataMessage.timestamp > 0) {
dataMessage.timestamp = diffTimestamp;
}
if (
dataExtractionNotification &&
dataExtractionNotification.timestamp &&
dataExtractionNotification.timestamp > 0
) {
dataExtractionNotification.timestamp = diffTimestamp;
}
if (typingMessage && typingMessage.timestamp && typingMessage.timestamp > 0) {
typingMessage.timestamp = diffTimestamp;
}
const overRiddenTimestampBuffer = SignalService.Content.encode(contentDecoded).finish();
return { overRiddenTimestampBuffer, diffTimestamp };
}
export function getMinRetryTimeout() {
return 1000;
}
/**
* Send a message via service nodes.
@ -26,31 +61,98 @@ export async function send(
message: RawMessage,
attempts: number = 3,
retryMinTimeout?: number // in ms
): Promise<Uint8Array> {
const device = PubKey.cast(message.device);
const { plainTextBuffer, encryption, timestamp, ttl } = message;
const { envelopeType, cipherText } = await MessageEncrypter.encrypt(
device,
plainTextBuffer,
encryption
);
const envelope = await buildEnvelope(envelopeType, device.key, timestamp, cipherText);
window?.log?.debug('Sending envelope with timestamp: ', envelope.timestamp, ' to ', device.key);
const data = wrapEnvelope(envelope);
): Promise<{ wrappedEnvelope: Uint8Array; effectiveTimestamp: number }> {
return pRetry(
async () => {
await LokiMessageApi.sendMessage(device.key, data, timestamp, ttl);
return data;
const device = PubKey.cast(message.device);
const { encryption, ttl } = message;
const {
overRiddenTimestampBuffer,
diffTimestamp,
} = overwriteOutgoingTimestampWithNetworkTimestamp(message);
const { envelopeType, cipherText } = await MessageEncrypter.encrypt(
device,
overRiddenTimestampBuffer,
encryption
);
const envelope = await buildEnvelope(envelopeType, device.key, diffTimestamp, cipherText);
const data = wrapEnvelope(envelope);
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
// before we return from the await below.
// and the isDuplicate messages relies on sent_at timestamp to be valid.
const found = await Data.getMessageById(message.identifier);
if (found) {
found.set({ sent_at: diffTimestamp });
await found.commit();
}
await MessageSender.TEST_sendMessageToSnode(device.key, data, ttl, diffTimestamp);
return { wrappedEnvelope: data, effectiveTimestamp: diffTimestamp };
},
{
retries: Math.max(attempts - 1, 0),
factor: 1,
minTimeout: retryMinTimeout || 1000,
minTimeout: retryMinTimeout || MessageSender.getMinRetryTimeout(),
}
);
}
export async function TEST_sendMessageToSnode(
pubKey: string,
data: Uint8Array,
ttl: number,
timestamp: number
): Promise<void> {
const data64 = window.dcodeIO.ByteBuffer.wrap(data).toString('base64');
const swarm = await getSwarmFor(pubKey);
window?.log?.debug('Sending envelope with timestamp: ', timestamp, ' to ', pubKey);
// send parameters
const params = {
pubKey,
ttl: `${ttl}`,
timestamp: `${timestamp}`,
data: data64,
};
const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS);
const promises = usedNodes.map(async usedNode => {
// TODO: Revert back to using snode address instead of IP
// No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch.
// the only case we could care about a retry would be when the usedNode is not correct,
// but considering we trigger this request with a few snode in //, this should be fine.
const successfulSend = await storeOnNode(usedNode, params);
if (successfulSend) {
return usedNode;
}
// should we mark snode as bad if it can't store our message?
return undefined;
});
let snode;
try {
snode = await firstTrue(promises);
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`loki_message:::sendMessage - ${e.code} ${e.message} to ${pubKey} via snode:${snodeStr}`
);
throw e;
}
if (!usedNodes || usedNodes.length === 0) {
throw new window.textsecure.EmptySwarmError(pubKey, 'Ran out of swarm nodes to query');
}
window?.log?.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${snode.ip}:${snode.port}`
);
}
async function buildEnvelope(
type: SignalService.Envelope.Type,
sskSource: string | undefined,

View File

@ -2,7 +2,6 @@ import _ from 'lodash';
import { getMessageById } from '../../data/data';
import { SignalService } from '../../protobuf';
import { PnServer } from '../../pushnotification';
import { getMessageController } from '../messages';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { EncryptionType, RawMessage } from '../types';
import { UserUtils } from '../utils';
@ -42,6 +41,7 @@ export class MessageSentHandler {
public static async handleMessageSentSuccess(
sentMessage: RawMessage,
effectiveTimestamp: number,
wrappedEnvelope?: Uint8Array
) {
// The wrappedEnvelope will be set only if the message is not one of OpenGroupV2Message type.
@ -107,7 +107,7 @@ export class MessageSentHandler {
try {
await fetchedMessage.sendSyncMessage(
dataMessage as SignalService.DataMessage,
sentMessage.timestamp
effectiveTimestamp
);
} catch (e) {
window?.log?.warn('Got an error while trying to sendSyncMessage():', e);
@ -123,7 +123,7 @@ export class MessageSentHandler {
sent_to: sentTo,
sent: true,
expirationStartTimestamp: Date.now(),
sent_at: sentMessage.timestamp,
sent_at: effectiveTimestamp,
});
await fetchedMessage.commit();
@ -177,23 +177,11 @@ export class MessageSentHandler {
* If the message is found on the db, it will also register it to the MessageController so our subsequent calls are quicker.
*/
private static async fetchHandleMessageSentData(m: RawMessage | OpenGroupVisibleMessage) {
// if a message was sent and this message was sent after the last app restart,
// this message is still in memory in the MessageController
const msg = getMessageController().get(m.identifier);
const dbMessage = await getMessageById(m.identifier);
if (!msg || !msg.message) {
// otherwise, look for it in the database
// nobody is listening to this freshly fetched message .trigger calls
// so we can just update the fields on the database
const dbMessage = await getMessageById(m.identifier);
if (!dbMessage) {
return null;
}
getMessageController().register(m.identifier, dbMessage);
return dbMessage;
if (!dbMessage) {
return null;
}
return msg.message;
return dbMessage;
}
}

View File

@ -20,8 +20,8 @@ export class PendingMessageCache {
public async getAllPending(): Promise<Array<RawMessage>> {
await this.loadFromDBIfNeeded();
// Get all pending from cache, sorted with oldest first
return [...this.cache].sort((a, b) => a.timestamp - b.timestamp);
// Get all pending from cache
return [...this.cache];
}
public async getForDevice(device: PubKey): Promise<Array<RawMessage>> {
@ -71,7 +71,7 @@ export class PendingMessageCache {
// Remove item from cache and sync with database
const updatedCache = this.cache.filter(
cached => !(cached.device === message.device && cached.timestamp === message.timestamp)
cached => !(cached.device === message.device && cached.identifier === message.identifier)
);
this.cache = updatedCache;
this.callbacks.delete(message.identifier);
@ -82,7 +82,7 @@ export class PendingMessageCache {
public find(message: RawMessage): RawMessage | undefined {
// Find a message in the cache
return this.cache.find(m => m.device === message.device && m.timestamp === message.timestamp);
return this.cache.find(m => m.device === message.device && m.identifier === message.identifier);
}
public async clear() {

View File

@ -1,7 +1,6 @@
// TS 3.8 supports export * as X from 'Y'
import * as MessageSender from './MessageSender';
import * as LokiMessageApi from './LokiMessageApi';
export { MessageSender, LokiMessageApi };
export { MessageSender };
export * from './PendingMessageCache';
export * from './MessageQueue';

View File

@ -28,6 +28,27 @@ export const onsNameRegex = '^\\w([\\w-]*[\\w])?$';
export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.';
let latestTimestampOffset = Number.MAX_SAFE_INTEGER;
function handleTimestampOffset(request: string, snodeTimestamp: number) {
if (snodeTimestamp && _.isNumber(snodeTimestamp) && snodeTimestamp > 1609419600 * 1000) {
// first january 2021. Arbitrary, just want to make sure the return timestamp is somehow valid and not some crazy low value
const now = Date.now();
window?.log?.info(`timestamp offset from request ${request}: ${now - snodeTimestamp}ms`);
latestTimestampOffset = now - snodeTimestamp;
}
}
export function getLatestTimestampOffset() {
if (latestTimestampOffset === Number.MAX_SAFE_INTEGER) {
window.log.warn('latestTimestampOffset is not set yet');
return 0;
}
window.log.info('latestTimestampOffset is ', latestTimestampOffset);
return latestTimestampOffset;
}
export type SendParams = {
pubKey: string;
ttl: string;
@ -44,13 +65,13 @@ async function requestSnodesForPubkeyWithTargetNodeRetryable(
const params = {
pubKey,
};
const result = await snodeRpc({
method: 'get_snodes_for_pubkey',
params,
targetNode,
associatedWith: pubKey,
});
if (!result) {
window?.log?.warn(
`LokiSnodeAPI::requestSnodesForPubkeyWithTargetNodeRetryable - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value`,
@ -77,6 +98,7 @@ async function requestSnodesForPubkeyWithTargetNodeRetryable(
}
const snodes = json.snodes.filter((tSnode: any) => tSnode.ip !== '0.0.0.0');
handleTimestampOffset('get_snodes_for_pubkey', json.t);
return snodes;
} catch (e) {
throw new Error('Invalid json');
@ -175,6 +197,7 @@ export async function getSessionIDForOnsName(onsNameCase: string) {
try {
parsedBody = JSON.parse(result.body);
handleTimestampOffset('ons_resolve', parsedBody.t);
} catch (e) {
window?.log?.warn('ONSresolve: failed to parse ons result body', result.body);
throw new Error('ONSresolve: json ONS resovle');
@ -374,6 +397,7 @@ export async function TEST_getSnodePoolFromSnode(targetNode: Snode): Promise<Arr
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
})) as Array<Snode>;
handleTimestampOffset('get_service_nodes', json.t);
// we the return list by the snode is already made of uniq snodes
return _.compact(snodes);
@ -387,6 +411,7 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
try {
// no retry here. If an issue is with the path this is handled in lokiOnionFetch
// if there is an issue with the targetNode, we still send a few times this request to a few snodes in // already so it's handled
const result = await snodeRpc({
method: 'store',
params,
@ -394,11 +419,18 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
associatedWith: params.pubKey,
});
if (!result || result.status !== 200) {
if (!result || result.status !== 200 || !result.body) {
return false;
}
return true;
try {
const parsed = JSON.parse(result.body);
handleTimestampOffset('store', parsed.t);
return true;
} catch (e) {
window?.log?.warn('Failed to parse "store" result: ', e.msg);
}
return false;
} catch (e) {
window?.log?.warn(
'loki_message:::store - send error:',
@ -446,6 +478,8 @@ export async function retrieveNextMessages(
window.inboxStore?.dispatch(updateIsOnline(true));
}
handleTimestampOffset('retrieve', json.t);
return json.messages || [];
} catch (e) {
window?.log?.warn('exception while parsing json of nextMessage:', e);

View File

@ -3,7 +3,6 @@ import { EncryptionType } from './EncryptionType';
export type RawMessage = {
identifier: string;
plainTextBuffer: Uint8Array;
timestamp: number;
device: string;
ttl: number;
encryption: EncryptionType;
@ -13,7 +12,6 @@ export type RawMessage = {
export interface PartialRawMessage {
identifier: string;
plainTextBuffer: any;
timestamp: number;
device: string;
ttl: number;
encryption: number;

View File

@ -12,7 +12,6 @@ import {
} from '../../../ts/data/data';
import { MessageModel } from '../../models/message';
import { downloadAttachment, downloadAttachmentOpenGroupV2 } from '../../receiver/attachments';
import { getMessageController } from '../messages';
const MAX_ATTACHMENT_JOB_PARALLELISM = 3;
@ -131,14 +130,13 @@ async function _maybeStartJob() {
async function _runJob(job: any) {
const { id, messageId, attachment, type, index, attempts, isOpenGroupV2, openGroupV2Details } =
job || {};
let message;
let found: MessageModel | undefined | null;
try {
if (!job || !attachment || !messageId) {
throw new Error(`_runJob: Key information required for job was missing. Job id: ${id}`);
}
const found = await getMessageById(messageId);
found = await getMessageById(messageId);
if (!found) {
logger.error('_runJob: Source message not found, deleting job');
await _finishJob(null, id);
@ -161,8 +159,6 @@ async function _runJob(job: any) {
return;
}
message = getMessageController().register(found.id, found);
const pending = true;
await setAttachmentDownloadJobPending(id, pending);
@ -180,11 +176,11 @@ async function _runJob(job: any) {
logger.warn(
`_runJob: Got 404 from server, marking attachment ${
attachment.id
} from message ${message.idForLogging()} as permanent error`
} from message ${found.idForLogging()} as permanent error`
);
await _finishJob(message, id);
await _addAttachmentToMessage(message, _markAttachmentAsError(attachment), { type, index });
await _finishJob(found, id);
await _addAttachmentToMessage(found, _markAttachmentAsError(attachment), { type, index });
return;
}
@ -193,27 +189,27 @@ async function _runJob(job: any) {
const upgradedAttachment = await window.Signal.Migrations.processNewAttachment(downloaded);
await _addAttachmentToMessage(message, upgradedAttachment, { type, index });
await _addAttachmentToMessage(found, upgradedAttachment, { type, index });
await _finishJob(message, id);
await _finishJob(found, id);
} catch (error) {
// tslint:disable: restrict-plus-operands
const currentAttempt: 1 | 2 | 3 = (attempts || 0) + 1;
if (currentAttempt >= 3) {
logger.error(
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message?.idForLogging()} as permament error:`,
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${found?.idForLogging()} as permament error:`,
error && error.stack ? error.stack : error
);
await _finishJob(message || null, id);
await _addAttachmentToMessage(message, _markAttachmentAsError(attachment), { type, index });
await _finishJob(found || null, id);
await _addAttachmentToMessage(found, _markAttachmentAsError(attachment), { type, index });
return;
}
logger.error(
`_runJob: Failed to download attachment type ${type} for message ${message?.idForLogging()}, attempt ${currentAttempt}:`,
`_runJob: Failed to download attachment type ${type} for message ${found?.idForLogging()}, attempt ${currentAttempt}:`,
error && error.stack ? error.stack : error
);

View File

@ -29,7 +29,6 @@ function getEncryptionTypeFromMessageType(message: ContentMessage): EncryptionTy
}
export async function toRawMessage(device: PubKey, message: ContentMessage): Promise<RawMessage> {
const timestamp = message.timestamp;
const ttl = message.ttl();
const plainTextBuffer = message.plainTextBuffer();
@ -39,7 +38,6 @@ export async function toRawMessage(device: PubKey, message: ContentMessage): Pro
const rawMessage: RawMessage = {
identifier: message.identifier,
plainTextBuffer,
timestamp,
device: device.key,
ttl,
encryption,

View File

@ -3,6 +3,8 @@
import chai from 'chai';
import * as sinon from 'sinon';
import { describe } from 'mocha';
import { randomBytes } from 'crypto';
import * as Data from '../../../../../ts/data/data';
import { GroupUtils, PromiseUtils, UserUtils } from '../../../../session/utils';
import { TestUtils } from '../../../../test/test-utils';
@ -15,6 +17,7 @@ import { ClosedGroupMessage } from '../../../../session/messages/outgoing/contro
import chaiAsPromised from 'chai-as-promised';
import { MessageSentHandler } from '../../../../session/sending/MessageSentHandler';
chai.use(chaiAsPromised as any);
chai.should();
@ -42,7 +45,7 @@ describe('MessageQueue', () => {
sandbox.stub(UserUtils, 'getOurPubKeyStrFromCache').returns(ourNumber);
// Message Sender Stubs
sendStub = sandbox.stub(MessageSender, 'send').resolves();
sendStub = sandbox.stub(MessageSender, 'send');
messageSentHandlerFailedStub = sandbox
.stub(MessageSentHandler as any, 'handleMessageSentFailure')
.resolves();
@ -56,6 +59,7 @@ describe('MessageQueue', () => {
// Init Queue
pendingMessageCache = new PendingMessageCacheStub();
messageQueueStub = new MessageQueue(pendingMessageCache);
TestUtils.stubWindowLog();
});
afterEach(() => {
@ -108,8 +112,12 @@ describe('MessageQueue', () => {
describe('events', () => {
it('should send a success event if message was sent', done => {
const device = TestUtils.generateFakePubKey();
sandbox.stub(Data, 'getMessageById').resolves();
const message = TestUtils.generateVisibleMessage();
sendStub.resolves({ effectiveTimestamp: Date.now(), wrappedEnvelope: randomBytes(10) });
const device = TestUtils.generateFakePubKey();
sandbox.stub(MessageSender, 'getMinRetryTimeout').returns(10);
const waitForMessageSentEvent = async () =>
new Promise<void>(resolve => {
resolve();

View File

@ -1,15 +1,17 @@
import { expect } from 'chai';
import * as crypto from 'crypto';
import * as sinon from 'sinon';
import { toNumber } from 'lodash';
import { LokiMessageApi, MessageSender } from '../../../../session/sending';
import { MessageSender } from '../../../../session/sending';
import { TestUtils } from '../../../test-utils';
import { MessageEncrypter } from '../../../../session/crypto';
import { SignalService } from '../../../../protobuf';
import { EncryptionType } from '../../../../session/types/EncryptionType';
import { PubKey } from '../../../../session/types';
import { UserUtils } from '../../../../session/utils';
import { PubKey, RawMessage } from '../../../../session/types';
import { MessageUtils, UserUtils } from '../../../../session/utils';
import { ApiV2 } from '../../../../opengroup/opengroupV2';
import * as Data from '../../../../../ts/data/data';
import { SNodeAPI } from '../../../../session/snode_api';
import _ from 'lodash';
describe('MessageSender', () => {
const sandbox = sinon.createSandbox();
@ -19,6 +21,10 @@ describe('MessageSender', () => {
TestUtils.restoreStubs();
});
beforeEach(() => {
TestUtils.stubWindowLog();
});
// tslint:disable-next-line: max-func-body-length
describe('send', () => {
const ourNumber = '0123456789abcdef';
@ -26,7 +32,9 @@ describe('MessageSender', () => {
let encryptStub: sinon.SinonStub<[PubKey, Uint8Array, EncryptionType]>;
beforeEach(() => {
lokiMessageAPISendStub = sandbox.stub(LokiMessageApi, 'sendMessage').resolves();
lokiMessageAPISendStub = sandbox.stub(MessageSender, 'TEST_sendMessageToSnode').resolves();
sandbox.stub(Data, 'getMessageById').resolves();
encryptStub = sandbox.stub(MessageEncrypter, 'encrypt').resolves({
envelopeType: SignalService.Envelope.Type.SESSION_MESSAGE,
@ -37,24 +45,24 @@ describe('MessageSender', () => {
});
describe('retry', () => {
const rawMessage = {
identifier: '1',
device: TestUtils.generateFakePubKey().key,
plainTextBuffer: crypto.randomBytes(10),
encryption: EncryptionType.Fallback,
timestamp: Date.now(),
ttl: 100,
};
let rawMessage: RawMessage;
beforeEach(async () => {
rawMessage = await MessageUtils.toRawMessage(
TestUtils.generateFakePubKey(),
TestUtils.generateVisibleMessage()
);
});
it('should not retry if an error occurred during encryption', async () => {
encryptStub.throws(new Error('Failed to encrypt.'));
const promise = MessageSender.send(rawMessage);
const promise = MessageSender.send(rawMessage, 3, 10);
await expect(promise).is.rejectedWith('Failed to encrypt.');
expect(lokiMessageAPISendStub.callCount).to.equal(0);
});
it('should only call lokiMessageAPI once if no errors occured', async () => {
await MessageSender.send(rawMessage);
await MessageSender.send(rawMessage, 3, 10);
expect(lokiMessageAPISendStub.callCount).to.equal(1);
});
@ -87,41 +95,30 @@ describe('MessageSender', () => {
});
it('should pass the correct values to lokiMessageAPI', async () => {
const device = TestUtils.generateFakePubKey().key;
const timestamp = Date.now();
const ttl = 100;
const device = TestUtils.generateFakePubKey();
const visibleMessage = TestUtils.generateVisibleMessage();
await MessageSender.send({
identifier: '1',
device,
plainTextBuffer: crypto.randomBytes(10),
encryption: EncryptionType.Fallback,
timestamp,
ttl,
});
const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage);
await MessageSender.send(rawMessage, 3, 10);
const args = lokiMessageAPISendStub.getCall(0).args;
expect(args[0]).to.equal(device);
expect(args[2]).to.equal(timestamp);
expect(args[3]).to.equal(ttl);
expect(args[0]).to.equal(device.key);
// expect(args[3]).to.equal(visibleMessage.timestamp); the timestamp is overwritten on sending by the network clock offset
expect(args[2]).to.equal(visibleMessage.ttl());
});
it('should correctly build the envelope', async () => {
it('should correctly build the envelope and override the timestamp', async () => {
messageEncyrptReturnEnvelopeType = SignalService.Envelope.Type.SESSION_MESSAGE;
// This test assumes the encryption stub returns the plainText passed into it.
const device = TestUtils.generateFakePubKey().key;
const plainTextBuffer = crypto.randomBytes(10);
const timestamp = Date.now();
const device = TestUtils.generateFakePubKey();
await MessageSender.send({
identifier: '1',
device,
plainTextBuffer,
encryption: EncryptionType.Fallback,
timestamp,
ttl: 1,
});
const visibleMessage = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage);
const offset = 200000;
sandbox.stub(SNodeAPI, 'getLatestTimestampOffset').returns(offset);
await MessageSender.send(rawMessage, 3, 10);
const data = lokiMessageAPISendStub.getCall(0).args[1];
const webSocketMessage = SignalService.WebSocketMessage.decode(data);
@ -139,8 +136,20 @@ describe('MessageSender', () => {
);
expect(envelope.type).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE);
expect(envelope.source).to.equal('');
expect(toNumber(envelope.timestamp)).to.equal(timestamp);
expect(envelope.content).to.deep.equal(plainTextBuffer);
// the timestamp is overridden on sending with the network offset
const expectedTimestamp = Date.now() - offset;
const decodedTimestampFromSending = _.toNumber(envelope.timestamp);
expect(decodedTimestampFromSending).to.be.above(expectedTimestamp - 10);
expect(decodedTimestampFromSending).to.be.below(expectedTimestamp + 10);
// then make sure the plaintextBuffer was overriden too
const visibleMessageExpected = TestUtils.generateVisibleMessage({
timestamp: decodedTimestampFromSending,
});
const rawMessageExpected = await MessageUtils.toRawMessage(device, visibleMessageExpected);
expect(envelope.content).to.deep.equal(rawMessageExpected.plainTextBuffer);
});
describe('SESSION_MESSAGE', () => {
@ -148,18 +157,11 @@ describe('MessageSender', () => {
messageEncyrptReturnEnvelopeType = SignalService.Envelope.Type.SESSION_MESSAGE;
// This test assumes the encryption stub returns the plainText passed into it.
const device = TestUtils.generateFakePubKey().key;
const plainTextBuffer = crypto.randomBytes(10);
const timestamp = Date.now();
const device = TestUtils.generateFakePubKey();
await MessageSender.send({
identifier: '1',
device,
plainTextBuffer,
encryption: EncryptionType.Fallback,
timestamp,
ttl: 1,
});
const visibleMessage = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage);
await MessageSender.send(rawMessage, 3, 10);
const data = lokiMessageAPISendStub.getCall(0).args[1];
const webSocketMessage = SignalService.WebSocketMessage.decode(data);

View File

@ -68,7 +68,6 @@ describe('PendingMessageCache', () => {
const addedMessage = finalCache[0];
expect(addedMessage.device).to.deep.equal(rawMessage.device);
expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp);
});
it('can add multiple messages belonging to the same user', async () => {
@ -106,17 +105,14 @@ describe('PendingMessageCache', () => {
expect(finalCache).to.have.length(0);
});
it('should only remove messages with different timestamp and device', async () => {
it('should only remove messages with different identifier and device', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
await pendingMessageCacheStub.add(device, message);
await TestUtils.timeout(5);
const one = await pendingMessageCacheStub.add(
device,
TestUtils.generateVisibleMessage(message.identifier)
);
const one = await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage());
const two = await pendingMessageCacheStub.add(TestUtils.generateFakePubKey(), message);
const initialCache = await pendingMessageCacheStub.getAllPending();

View File

@ -35,18 +35,16 @@ describe('Message Utils', () => {
const rawMessage = await MessageUtils.toRawMessage(device, message);
expect(Object.keys(rawMessage)).to.have.length(6);
expect(Object.keys(rawMessage)).to.have.length(5);
expect(rawMessage.identifier).to.exist;
expect(rawMessage.device).to.exist;
expect(rawMessage.encryption).to.exist;
expect(rawMessage.plainTextBuffer).to.exist;
expect(rawMessage.timestamp).to.exist;
expect(rawMessage.ttl).to.exist;
expect(rawMessage.identifier).to.equal(message.identifier);
expect(rawMessage.device).to.equal(device.key);
expect(rawMessage.plainTextBuffer).to.deep.equal(message.plainTextBuffer());
expect(rawMessage.timestamp).to.equal(message.timestamp);
expect(rawMessage.ttl).to.equal(message.ttl());
});

View File

@ -9,11 +9,17 @@ import { TestUtils } from '..';
import { OpenGroupRequestCommonType } from '../../../opengroup/opengroupV2/ApiUtil';
import { OpenGroupVisibleMessage } from '../../../session/messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
export function generateVisibleMessage(identifier?: string): VisibleMessage {
export function generateVisibleMessage({
identifier,
timestamp,
}: {
identifier?: string;
timestamp?: number;
} = {}): VisibleMessage {
return new VisibleMessage({
body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit',
identifier: identifier ?? uuid(),
timestamp: Date.now(),
timestamp: timestamp || Date.now(),
attachments: undefined,
quote: undefined,
expireTimer: undefined,