feat: addConfSyncJob start

This commit is contained in:
Audric Ackermann 2023-02-08 10:50:23 +11:00
parent 4bfcf91193
commit 7c56310e69
34 changed files with 1117 additions and 622 deletions

View file

@ -16,6 +16,9 @@ export const ConfigDumpData: AsyncObjectWrapper<ConfigDumpDataNode> = {
},
saveConfigDump: (dump: ConfigDumpRow) => {
console.warn('saveConfigDump', dump);
if (dump.combinedMessageHashes.some(m => m && m.length < 5)) {
throw new Error('saveConfigDump combinedMessageHashes have invalid size');
}
return channels.saveConfigDump(dump);
},
saveCombinedMessageHashesForMatching: (dump: ConfigDumpRowWithoutData) => {
@ -28,4 +31,7 @@ export const ConfigDumpData: AsyncObjectWrapper<ConfigDumpDataNode> = {
getAllDumpsWithoutData: () => {
return channels.getAllDumpsWithoutData();
},
getCombinedHashesByVariantAndPubkey: (variant: ConfigWrapperObjectTypes, pubkey: string) => {
return channels.getCombinedHashesByVariantAndPubkey(variant, pubkey);
},
};

View file

@ -4,11 +4,10 @@
import _ from 'lodash';
import { MessageResultProps } from '../components/search/MessageSearchResults';
import { ConversationCollection, ConversationModel } from '../models/conversation';
import { ConversationAttributes, ConversationTypeEnum } from '../models/conversationAttributes';
import { ConversationAttributes } from '../models/conversationAttributes';
import { MessageCollection, MessageModel } from '../models/message';
import { MessageAttributes, MessageDirection } from '../models/messageType';
import { HexKeyPair } from '../receiver/keypairs';
import { getConversationController } from '../session/conversations';
import { getSodiumRenderer } from '../session/crypto';
import { PubKey } from '../session/types';
import {
@ -702,40 +701,6 @@ async function updateSnodePoolOnDb(snodesAsJsonString: string): Promise<void> {
await Data.createOrUpdateItem({ id: SNODE_POOL_ITEM_ID, value: snodesAsJsonString });
}
/**
* Generates fake conversations and distributes messages amongst the conversations randomly
* @param numConvosToAdd Amount of fake conversations to generate
* @param numMsgsToAdd Number of fake messages to generate
*/
async function fillWithTestData(convs: number, msgs: number) {
const newConvos = [];
for (let convsAddedCount = 0; convsAddedCount < convs; convsAddedCount++) {
const convoId = `${Date.now()} + ${convsAddedCount}`;
const newConvo = await getConversationController().getOrCreateAndWait(
convoId,
ConversationTypeEnum.PRIVATE
);
newConvos.push(newConvo);
}
for (let msgsAddedCount = 0; msgsAddedCount < msgs; msgsAddedCount++) {
// tslint:disable: insecure-random
const convoToChoose = newConvos[Math.floor(Math.random() * newConvos.length)];
const direction = Math.random() > 0.5 ? 'outgoing' : 'incoming';
const body = `spongebob ${new Date().toString()}`;
if (direction === 'outgoing') {
await convoToChoose.addSingleOutgoingMessage({
body,
});
} else {
await convoToChoose.addSingleIncomingMessage({
source: convoToChoose.id,
body,
});
}
}
}
function keysToArrayBuffer(keys: any, data: any) {
const updated = _.cloneDeep(data);
// tslint:disable: one-variable-per-declaration
@ -908,5 +873,4 @@ export const Data = {
getMessagesWithFileAttachments,
getSnodePoolFromDb,
updateSnodePoolOnDb,
fillWithTestData,
};

View file

@ -90,7 +90,6 @@ const channelsToMake = new Set([
'getLatestClosedGroupEncryptionKeyPair',
'addClosedGroupEncryptionKeyPair',
'removeAllClosedGroupEncryptionKeyPairs',
'fillWithTestData',
...channelsToMakeForOpengroupV2,
...channelsToMakeForConfigDumps,
]);

View file

@ -88,6 +88,8 @@ export interface ConversationAttributes {
/** The open group chat this conversation originated from (if from closed group) */
conversationIdOrigin?: string;
markedAsUnread: boolean;
/**
* When we create a closed group v3 or get promoted to admim, we need to save the private key of that closed group.
*/

View file

@ -42,6 +42,7 @@ export function start(
console.log(`config/set: Saving ${name} config to disk`);
const text = JSON.stringify(cachedValue, null, ' ');
writeFileSync(targetPath, text, ENCODING);
console.log(`config/set: Saved ${name} config to disk`);
}
function remove() {

View file

@ -77,6 +77,7 @@ const allowedKeysFormatRowOfConversation = [
'displayNameInProfile',
'conversationIdOrigin',
'identityPrivateKey',
'markedAsUnread',
];
// tslint:disable: cyclomatic-complexity
export function formatRowOfConversation(row?: Record<string, any>): ConversationAttributes | null {
@ -132,6 +133,7 @@ export function formatRowOfConversation(row?: Record<string, any>): Conversation
convo.readCapability = Boolean(convo.readCapability);
convo.writeCapability = Boolean(convo.writeCapability);
convo.uploadCapability = Boolean(convo.uploadCapability);
convo.markedAsUnread = Boolean(convo.markedAsUnread);
if (!convo.conversationIdOrigin) {
convo.conversationIdOrigin = undefined;
@ -211,6 +213,7 @@ const allowedKeysOfConversationAttributes = [
'displayNameInProfile',
'conversationIdOrigin',
'identityPrivateKey',
'markedAsUnread',
];
/**

View file

@ -1239,7 +1239,9 @@ function updateToSessionSchemaVersion31(currentVersion: number, db: BetterSqlite
variant TEXT NOT NULL,
publicKey TEXT NOT NULL,
data BLOB,
combinedMessageHashes TEXT);
combinedMessageHashes TEXT,
PRIMARY KEY (publicKey, variant)
);
`);
// db.exec(`ALTER TABLE conversations

View file

@ -23,7 +23,7 @@ import { redactAll } from '../util/privacy'; // checked - only node
import { LocaleMessagesType } from './locale'; // checked - only node
import { PubKey } from '../session/types/PubKey'; // checked - only node
import { StorageItem } from './storage_item'; // checked - only node
import { ConversationAttributes, ConversationTypeEnum } from '../models/conversationAttributes';
import { ConversationAttributes } from '../models/conversationAttributes';
import {
arrayStrToJson,
assertValidConversationAttributes,
@ -447,7 +447,7 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3
//FIXME
console.warn('FIXME omit(formatted, identityPrivateKey);');
const omited = omit(formatted, 'identityPrivateKey');
const omited = omit(formatted, 'identityPrivateKey', 'markedAsUnread');
const keys = Object.keys(omited);
const columnsCommaSeparated = keys.join(', ');
const valuesArgs = keys.map(k => `$${k}`).join(', ');
@ -2187,161 +2187,6 @@ function cleanUpOldOpengroupsOnStart() {
})();
}
// tslint:disable: binary-expression-operand-order insecure-random
/**
* Only using this for development. Populate conversation and message tables.
*/
function fillWithTestData(numConvosToAdd: number, numMsgsToAdd: number) {
const convoBeforeCount = assertGlobalInstance()
.prepare(`SELECT count(*) from ${CONVERSATIONS_TABLE};`)
.get()['count(*)'];
const lipsum =
// eslint:disable-next-line max-line-length
`Lorem ipsum dolor sit amet, consectetur adipiscing elit. Duis ac ornare lorem,
non suscipit purus. Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Suspendisse cursus aliquet velit a dignissim. Integer at nisi sed velit consequat
dictum. Phasellus congue tellus ante. Ut rutrum hendrerit dapibus. Fusce
luctus, ante nec interdum molestie, purus urna volutpat turpis, eget mattis
lectus velit at velit. Praesent vel tellus turpis. Praesent eget purus at
nisl blandit pharetra. Cras dapibus sem vitae rutrum dapibus. Vivamus vitae mi
ante. Donec aliquam porta nibh, vel scelerisque orci condimentum sed.
Proin in mattis ipsum, ac euismod sem. Donec malesuada sem nisl, at
vehicula ante efficitur sed. Curabitur in sapien eros. Morbi tempor ante ut
metus scelerisque condimentum. Integer sit amet tempus nulla. Vivamus
imperdiet dui ac luctus vulputate. Sed a accumsan risus. Nulla facilisi.
Nulla mauris dui, luctus in sagittis at, sodales id mauris. Integer efficitur
viverra ex, ut dignissim eros tincidunt placerat. Sed facilisis gravida
mauris in luctus . Fusce dapibus, est vitae tincidunt eleifend, justo
odio porta dui, sed ultrices mi arcu vitae ante. Mauris ut libero
erat. Nam ut mi quis ante tincidunt facilisis sit amet id enim.
Vestibulum in molestie mi. In ac felis est. Vestibulum vel blandit ex. Morbi vitae
viverra augue . Ut turpis quam, cursus quis ex a, convallis
ullamcorper purus. Nam eget libero arcu. Integer fermentum enim nunc, non consequat urna
fermentum condimentum. Nulla vitae malesuada est. Donec imperdiet tortor interdum
malesuada feugiat. Integer pulvinar dui ex, eget tristique arcu mattis at. Nam eu neque
eget mauris varius suscipit. Quisque ac enim vitae mauris laoreet congue nec sed
justo. Curabitur fermentum quam eget est tincidunt, at faucibus lacus maximus. Donec
auctor enim dolor, faucibus egestas diam consectetur sed. Donec eget rutrum arcu, at
tempus mi. Fusce quis volutpat sapien. In aliquet fringilla purus. Ut eu nunc non
augue lacinia ultrices at eget tortor. Maecenas pulvinar odio sit amet purus
elementum, a vehicula lorem maximus. Pellentesque eu lorem magna. Vestibulum ut facilisis
lorem. Proin et enim cursus, vulputate neque sit amet, posuere enim. Praesent
faucibus tellus vel mi tincidunt, nec malesuada nibh malesuada. In laoreet sapien vitae
aliquet sollicitudin.
`;
const msgBeforeCount = assertGlobalInstance()
.prepare(`SELECT count(*) from ${MESSAGES_TABLE};`)
.get()['count(*)'];
console.info('==== fillWithTestData ====');
console.info({
convoBeforeCount,
msgBeforeCount,
convoToAdd: numConvosToAdd,
msgToAdd: numMsgsToAdd,
});
const convosIdsAdded = [];
// eslint-disable-next-line no-plusplus
for (let index = 0; index < numConvosToAdd; index++) {
const activeAt = Date.now() - index;
const id = Date.now() - 1000 * index;
const convoObjToAdd: ConversationAttributes = {
active_at: activeAt,
members: [],
displayNameInProfile: `${activeAt}`,
id: `05${id}`,
type: ConversationTypeEnum.GROUPV3,
didApproveMe: false,
expireTimer: 0,
groupAdmins: [],
groupModerators: [],
isApproved: false,
isKickedFromGroup: false,
isPinned: false,
isTrustedForAttachmentDownload: false,
is_medium_group: false,
lastJoinedTimestamp: 0,
lastMessage: null,
lastMessageStatus: undefined,
left: false,
mentionedUs: false,
subscriberCount: 0,
readCapability: true,
writeCapability: true,
uploadCapability: true,
triggerNotificationsFor: 'all',
unreadCount: 0,
zombies: [],
};
convosIdsAdded.push(id);
try {
saveConversation(convoObjToAdd);
// eslint-disable-next-line no-empty
} catch (e) {
console.error(e);
}
}
// eslint-disable-next-line no-plusplus
for (let index = 0; index < numMsgsToAdd; index++) {
const activeAt = Date.now() - index;
const id = Date.now() - 1000 * index;
const lipsumStartIdx = Math.floor(Math.random() * lipsum.length);
const lipsumLength = Math.floor(Math.random() * lipsum.length - lipsumStartIdx);
const fakeBodyText = lipsum.substring(lipsumStartIdx, lipsumStartIdx + lipsumLength);
const convoId = convosIdsAdded[Math.floor(Math.random() * convosIdsAdded.length)];
const msgObjToAdd = {
// body: `fake body ${activeAt}`,
body: `fakeMsgIdx-spongebob-${index} ${fakeBodyText} ${activeAt}`,
conversationId: `05${id}`,
// eslint-disable-next-line camelcase
expires_at: 0,
hasAttachments: 0,
hasFileAttachments: 0,
hasVisualMediaAttachments: 0,
id: `${id}`,
serverId: 0,
serverTimestamp: 0,
// eslint-disable-next-line camelcase
received_at: Date.now(),
sent: 0,
// eslint-disable-next-line camelcase
sent_at: Date.now(),
source: `${convoId}`,
type: 'outgoing',
unread: 1,
expireTimer: 0,
expirationStartTimestamp: 0,
};
if (convoId % 10 === 0) {
console.info('uyo , convoId ', { index, convoId });
}
try {
saveMessage(msgObjToAdd);
// eslint-disable-next-line no-empty
} catch (e) {
console.error(e);
}
}
const convoAfterCount = assertGlobalInstance()
.prepare(`SELECT count(*) from ${CONVERSATIONS_TABLE};`)
.get()['count(*)'];
const msgAfterCount = assertGlobalInstance()
.prepare(`SELECT count(*) from ${MESSAGES_TABLE};`)
.get()['count(*)'];
console.info({ convoAfterCount, msgAfterCount });
return convosIdsAdded;
}
export type SqlNodeType = typeof sqlNode;
export function close() {
@ -2417,7 +2262,6 @@ export const sqlNode = {
getFirstUnreadMessageIdInConversation,
getFirstUnreadMessageWithMention,
hasConversationOutgoingMessage,
fillWithTestData,
// add all the calls related to the unprocessed cache of incoming messages
...unprocessed,

View file

@ -144,4 +144,29 @@ export const configDumpData: ConfigDumpDataNode = {
combinedMessageHashes: JSON.stringify(combinedMessageHashes || []),
});
},
getCombinedHashesByVariantAndPubkey: (variant: ConfigWrapperObjectTypes, publicKey: string) => {
const rows = assertGlobalInstance()
.prepare(
'SELECT combinedMessageHashes from configDump WHERE variant = $variant AND publicKey = $publicKey;'
)
.all({
publicKey,
variant,
});
if (!rows) {
return new Set();
}
const asArrays = compact(
rows.map(t => {
try {
return JSON.parse(t.combinedMessageHashes);
} catch {
return null;
}
})
);
return new Set(asArrays.flat(1));
},
};

View file

@ -1,4 +1,4 @@
import _, { compact, flattenDeep, isEmpty, isEqual } from 'lodash';
import _, { isEmpty, isEqual } from 'lodash';
import { ConfigDumpData } from '../data/configDump/configDump';
import { Data, hasSyncedInitialConfigurationItem } from '../data/data';
import { ConversationInteraction } from '../interactions';
@ -14,11 +14,11 @@ import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { ProfileManager } from '../session/profile_manager/ProfileManager';
import { UserUtils } from '../session/utils';
import { ConfigurationSync } from '../session/utils/job_runners/jobs/ConfigurationSyncJob';
import { IncomingConfResult, LibSessionUtil } from '../session/utils/libsession/libsession_utils';
import { toHex } from '../session/utils/String';
import { configurationMessageReceived, trigger } from '../shims/events';
import { BlockedNumberController } from '../util';
import { getLastProfileUpdateTimestamp, setLastProfileUpdateTimestamp } from '../util/storage';
import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions';
import {
ContactsWrapperActions,
GenericWrapperActions,
@ -28,26 +28,6 @@ import { removeFromCache } from './cache';
import { handleNewClosedGroup } from './closedGroups';
import { EnvelopePlus } from './types';
type IncomingConfResult = {
needsPush: boolean;
needsDump: boolean;
messageHashes: Array<string>;
latestSentTimestamp: number;
};
function pbKindToVariant(
kind: SignalService.SharedConfigMessage.Kind
): ConfigWrapperObjectTypes | null {
switch (kind) {
case SignalService.SharedConfigMessage.Kind.USER_PROFILE:
return 'UserConfig';
case SignalService.SharedConfigMessage.Kind.CONTACTS:
return 'ContactsConfig';
default:
return null;
}
}
async function mergeConfigsWithIncomingUpdates(
incomingConfig: IncomingMessage<SignalService.ISharedConfigMessage>
): Promise<{ kind: SignalService.SharedConfigMessage.Kind; result: IncomingConfResult }> {
@ -55,11 +35,7 @@ async function mergeConfigsWithIncomingUpdates(
const toMerge = [incomingConfig.message.data];
const wrapperId = pbKindToVariant(kind);
if (!wrapperId) {
throw new Error(`Invalid kind: ${kind}`);
}
const wrapperId = LibSessionUtil.kindToVariant(kind);
await GenericWrapperActions.merge(wrapperId, toMerge);
const needsPush = await GenericWrapperActions.needsPush(wrapperId);
const needsDump = await GenericWrapperActions.needsDump(wrapperId);
@ -78,19 +54,13 @@ async function mergeConfigsWithIncomingUpdates(
async function handleUserProfileUpdate(result: IncomingConfResult): Promise<IncomingConfResult> {
const updatedUserName = await UserConfigWrapperActions.getName();
console.warn('got', updatedUserName);
if (result.needsDump) {
if (!result.needsDump) {
return result;
}
if (!updatedUserName) {
debugger;
}
const updatedProfilePicture = await UserConfigWrapperActions.getProfilePicture();
debugger;
const picUpdate = !isEmpty(updatedProfilePicture.key) && !isEmpty(updatedProfilePicture.url);
await ProfileManager.updateOurProfileSync(
updatedUserName,
picUpdate ? updatedProfilePicture.url : null,
@ -100,7 +70,7 @@ async function handleUserProfileUpdate(result: IncomingConfResult): Promise<Inco
}
async function handleContactsUpdate(result: IncomingConfResult): Promise<IncomingConfResult> {
if (result.needsDump) {
if (!result.needsDump) {
return result;
}
@ -188,21 +158,19 @@ async function processMergingResults(
default:
throw new Error(`processMergingResults unknown kind of contact : ${kind}`);
}
const variant = pbKindToVariant(kind);
if (!variant) {
throw new Error('unknown variant');
}
const variant = LibSessionUtil.kindToVariant(kind);
// We need to get the existing message hashes and combine them with the latest from the
// service node to ensure the next push will properly clean up old messages
const oldMessageHashesWithDup = (
await ConfigDumpData.getByVariantAndPubkey(variant, envelope.source)
).map(m => m.combinedMessageHashes);
const oldMessageHashes = new Set(...flattenDeep(compact(oldMessageHashesWithDup)));
const allMessageHashes = new Set([...oldMessageHashes, ...finalResult.messageHashes]);
const finalResultsHashes = new Set([...finalResult.messageHashes]);
const oldMessagesHashesSet = await ConfigDumpData.getCombinedHashesByVariantAndPubkey(
variant,
envelope.source
);
const allMessageHashes = new Set([...oldMessagesHashesSet, ...finalResult.messageHashes]);
const finalResultsHashes = new Set(finalResult.messageHashes);
// lodash does deep compare of Sets
const messageHashesChanged = !isEqual(oldMessageHashes, finalResultsHashes);
const messageHashesChanged = !isEqual(oldMessagesHashesSet, finalResultsHashes);
if (finalResult.needsDump) {
// The config data had changes so regenerate the dump and save it
@ -236,6 +204,7 @@ async function processMergingResults(
// Now that the local state has been updated, trigger a config sync (this will push any
// pending updates and properly update the state)
if (result.result.needsPush) {
console.warn(`processMergingResults ${LibSessionUtil.kindToVariant(result.kind)} needs push`);
await ConfigurationSync.queueNewJobIfNeeded();
}
}

View file

@ -1,18 +1,19 @@
import { snodeRpc } from './sessionRpc';
import { getSwarmFor } from './snodePool';
import { getSodiumRenderer } from '../../crypto';
import _, { compact, sample } from 'lodash';
import { compact, sample } from 'lodash';
import pRetry from 'p-retry';
import { fromBase64ToArray, fromHexToArray, fromUInt8ArrayToBase64 } from '../../utils/String';
import { Snode } from '../../../data/data';
import { getSodiumRenderer } from '../../crypto';
import { ed25519Str } from '../../onions/onionPath';
import { StringUtils, UserUtils } from '../../utils';
import { GetNetworkTime } from './getNetworkTime';
import { fromBase64ToArray, fromHexToArray } from '../../utils/String';
import { doSnodeBatchRequest } from './batchRequest';
import { getSwarmFor } from './snodePool';
import { SnodeSignature } from './snodeSignatures';
export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.';
// tslint:disable-next-line: max-func-body-length
// tslint:disable: max-func-body-length
// TODO we should merge those two functions together as they are almost exactly the same
const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
const sodium = await getSodiumRenderer();
const userX25519PublicKey = UserUtils.getOurPubKeyStrFromCache();
@ -23,14 +24,13 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
window?.log?.warn('Cannot forceNetworkDeletion, did not find user ed25519 key.');
return null;
}
const edKeyPriv = userED25519KeyPair.privKey;
const method = 'delete_all' as const;
try {
const maliciousSnodes = await pRetry(
async () => {
const userSwarm = await getSwarmFor(userX25519PublicKey);
const snodeToMakeRequestTo: Snode | undefined = sample(userSwarm);
const edKeyPrivBytes = fromHexToArray(edKeyPriv);
if (!snodeToMakeRequestTo) {
window?.log?.warn('Cannot forceNetworkDeletion, without a valid swarm node.');
@ -39,51 +39,44 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
return pRetry(
async () => {
const timestamp = await GetNetworkTime.getNetworkTime(snodeToMakeRequestTo);
const verificationData = StringUtils.encode(`delete_all${timestamp}`, 'utf8');
const message = new Uint8Array(verificationData);
const signature = sodium.crypto_sign_detached(message, edKeyPrivBytes);
const signatureBase64 = fromUInt8ArrayToBase64(signature);
const deleteMessageParams = {
const signOpts = await SnodeSignature.getSnodeSignatureParams({
method,
namespace: null,
pubkey: userX25519PublicKey,
pubkey_ed25519: userED25519KeyPair.pubKey.toUpperCase(),
timestamp,
signature: signatureBase64,
};
const ret = await snodeRpc({
method: 'delete_all',
params: deleteMessageParams,
targetNode: snodeToMakeRequestTo,
associatedWith: userX25519PublicKey,
});
if (!ret) {
const ret = await doSnodeBatchRequest(
[{ method, params: signOpts }],
snodeToMakeRequestTo,
10000,
userX25519PublicKey
);
if (!ret || !ret?.[0].body || ret[0].code !== 200) {
throw new Error(
`Empty response got for delete_all on snode ${ed25519Str(
`Empty response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}`
);
}
try {
const parsedResponse = JSON.parse(ret.body);
const { swarm } = parsedResponse;
const firstResultParsedBody = ret[0].body;
const { swarm } = firstResultParsedBody;
if (!swarm) {
throw new Error(
`Invalid JSON swarm response got for delete_all on snode ${ed25519Str(
`Invalid JSON swarm response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}, ${ret?.body}`
)}, ${firstResultParsedBody}`
);
}
const swarmAsArray = Object.entries(swarm) as Array<Array<any>>;
if (!swarmAsArray.length) {
throw new Error(
`Invalid JSON swarmAsArray response got for delete_all on snode ${ed25519Str(
`Invalid JSON swarmAsArray response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}, ${ret?.body}`
)}, ${firstResultParsedBody}`
);
}
// results will only contains the snode pubkeys which returned invalid/empty results
@ -99,19 +92,19 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
const statusCode = snodeJson.code;
if (reason && statusCode) {
window?.log?.warn(
`Could not delete data from ${ed25519Str(
`Could not ${method} from ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)} due to error: ${reason}: ${statusCode}`
);
// if we tried to make the delete on a snode not in our swarm, just trigger a pRetry error so the outer block here finds new snodes to make the request to.
if (statusCode === 421) {
throw new pRetry.AbortError(
'421 error on network delete_all. Retrying with a new snode'
`421 error on network ${method}. Retrying with a new snode`
);
}
} else {
window?.log?.warn(
`Could not delete data from ${ed25519Str(
`Could not ${method} from ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}`
);
@ -122,7 +115,9 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
const hashes = snodeJson.deleted as Array<string>;
const signatureSnode = snodeJson.signature as string;
// The signature format is ( PUBKEY_HEX || TIMESTAMP || DELETEDHASH[0] || ... || DELETEDHASH[N] )
const dataToVerify = `${userX25519PublicKey}${timestamp}${hashes.join('')}`;
const dataToVerify = `${userX25519PublicKey}${signOpts.timestamp}${hashes.join(
''
)}`;
const dataToVerifyUtf8 = StringUtils.encode(dataToVerify, 'utf8');
const isValid = sodium.crypto_sign_verify_detached(
fromBase64ToArray(signatureSnode),
@ -139,9 +134,9 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
return results;
} catch (e) {
throw new Error(
`Invalid JSON response got for delete_all on snode ${ed25519Str(
`Invalid JSON response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}, ${ret?.body}`
)}, ${ret}`
);
}
},
@ -150,7 +145,7 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
minTimeout: SnodeAPI.TEST_getMinTimeout(),
onFailedAttempt: e => {
window?.log?.warn(
`delete_all INNER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
`${method} INNER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
);
},
}
@ -161,7 +156,7 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
minTimeout: SnodeAPI.TEST_getMinTimeout(),
onFailedAttempt: e => {
window?.log?.warn(
`delete_all OUTER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... ${e.message}`
`${method} OUTER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... ${e.message}`
);
},
}
@ -169,7 +164,7 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
return maliciousSnodes;
} catch (e) {
window?.log?.warn('failed to delete everything on network:', e);
window?.log?.warn(`failed to ${method} everything on network:`, e);
return null;
}
};
@ -180,7 +175,6 @@ const TEST_getMinTimeout = () => 500;
/**
* Locally deletes message and deletes message on the network (all nodes that contain the message)
*/
// tslint:disable-next-line: max-func-body-length
const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<string> | null> => {
const sodium = await getSodiumRenderer();
const userX25519PublicKey = UserUtils.getOurPubKeyStrFromCache();
@ -191,14 +185,13 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
window?.log?.warn('Cannot networkDeleteMessages, did not find user ed25519 key.');
return null;
}
const edKeyPriv = userED25519KeyPair.privKey;
const method = 'delete' as const;
try {
const maliciousSnodes = await pRetry(
async () => {
const userSwarm = await getSwarmFor(userX25519PublicKey);
const snodeToMakeRequestTo: Snode | undefined = sample(userSwarm);
const edKeyPrivBytes = fromHexToArray(edKeyPriv);
if (!snodeToMakeRequestTo) {
window?.log?.warn('Cannot networkDeleteMessages, without a valid swarm node.');
@ -207,48 +200,44 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
return pRetry(
async () => {
const verificationData = StringUtils.encode(`delete${hashes.join('')}`, 'utf8');
const message = new Uint8Array(verificationData);
const signature = sodium.crypto_sign_detached(message, edKeyPrivBytes);
const signatureBase64 = fromUInt8ArrayToBase64(signature);
const deleteMessageParams = {
pubkey: userX25519PublicKey,
pubkey_ed25519: userED25519KeyPair.pubKey.toUpperCase(),
const signOpts = await SnodeSignature.getSnodeSignatureByHashesParams({
messages: hashes,
signature: signatureBase64,
};
const ret = await snodeRpc({
method: 'delete',
params: deleteMessageParams,
targetNode: snodeToMakeRequestTo,
associatedWith: userX25519PublicKey,
method,
pubkey: userX25519PublicKey,
});
if (!ret) {
const ret = await doSnodeBatchRequest(
[{ method, params: signOpts }],
snodeToMakeRequestTo,
10000,
userX25519PublicKey
);
if (!ret || !ret?.[0].body || ret[0].code !== 200) {
throw new Error(
`Empty response got for delete on snode ${ed25519Str(
`Empty response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}`
);
}
try {
const parsedResponse = JSON.parse(ret.body);
const { swarm } = parsedResponse;
const firstResultParsedBody = ret[0].body;
const { swarm } = firstResultParsedBody;
if (!swarm) {
throw new Error(
`Invalid JSON swarm response got for delete on snode ${ed25519Str(
`Invalid JSON swarm response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}, ${ret?.body}`
)}, ${firstResultParsedBody}`
);
}
const swarmAsArray = Object.entries(swarm) as Array<Array<any>>;
if (!swarmAsArray.length) {
throw new Error(
`Invalid JSON swarmAsArray response got for delete on snode ${ed25519Str(
`Invalid JSON swarmAsArray response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}, ${ret?.body}`
)}, ${firstResultParsedBody}`
);
}
// results will only contains the snode pubkeys which returned invalid/empty results
@ -257,7 +246,6 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
const snodePubkey = snode[0];
const snodeJson = snode[1];
//#region failure handling
const isFailed = snodeJson.failed || false;
if (isFailed) {
@ -265,28 +253,26 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
const statusCode = snodeJson.code;
if (reason && statusCode) {
window?.log?.warn(
`Could not delete msgs from ${ed25519Str(
`Could not ${method} from ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)} due to error: ${reason}: ${statusCode}`
);
// if we tried to make the delete on a snode not in our swarm, just trigger a pRetry error so the outer block here finds new snodes to make the request to.
if (statusCode === 421) {
throw new pRetry.AbortError(
'421 error on network delete_all. Retrying with a new snode'
`421 error on network ${method}. Retrying with a new snode`
);
}
} else {
window?.log?.info(
`Could not delete msgs from ${ed25519Str(
window?.log?.warn(
`Could not ${method} from ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}`
);
}
return snodePubkey;
}
//#endregion
//#region verification
const responseHashes = snodeJson.deleted as Array<string>;
const signatureSnode = snodeJson.signature as string;
// The signature looks like ( PUBKEY_HEX || RMSG[0] || ... || RMSG[N] || DMSG[0] || ... || DMSG[M] )
@ -303,16 +289,15 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
return snodePubkey;
}
return null;
//#endregion
})
);
return results;
} catch (e) {
throw new Error(
`Invalid JSON response got for delete on snode ${ed25519Str(
`Invalid JSON response got for ${method} on snode ${ed25519Str(
snodeToMakeRequestTo.pubkey_ed25519
)}, ${ret?.body}`
)}, ${ret}`
);
}
},
@ -321,7 +306,7 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
minTimeout: SnodeAPI.TEST_getMinTimeout(),
onFailedAttempt: e => {
window?.log?.warn(
`delete INNER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
`${method} INNER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
);
},
}
@ -332,7 +317,7 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
minTimeout: SnodeAPI.TEST_getMinTimeout(),
onFailedAttempt: e => {
window?.log?.warn(
`delete OUTER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
`${method} OUTER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... ${e.message}`
);
},
}
@ -340,7 +325,7 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
return maliciousSnodes;
} catch (e) {
window?.log?.warn('failed to delete message on network:', e);
window?.log?.warn(`failed to ${method} message on network:`, e);
return null;
}
};

View file

@ -1,3 +1,4 @@
import { SharedConfigMessage } from '../../messages/outgoing/controlMessage/SharedConfigMessage';
import { SnodeNamespaces } from './namespaces';
export type SwarmForSubRequest = { method: 'get_swarm'; params: { pubkey: string } };
@ -51,30 +52,6 @@ export type RetrieveSubRequestType =
| RetrievePubkeySubRequestType
| RetrieveSubKeySubRequestType;
// FIXME those store types are not right
// type StoreAlwaysNeeded = { pubkey: string; timestamp: number; data: string };
// type StoreUnauthenticatedSubRequest = {
// method: 'store';
// params: {
// ttl?: string; // required, unless expiry is given
// expiry?: number; // required, unless ttl is given
// namespace: UnauthenticatedNamespaces; // we can only store without authentication on namespaces divisible by 10 (...-60...0...60...)
// } & StoreAlwaysNeeded;
// };
// type StoreAuthenticatedSubRequest = {
// method: 'store';
// params: {
// ttl?: string; // required, unless expiry is given
// expiry?: number; // required, unless ttl is given
// subkey?: string;
// signature: string; // base64 encoded
// pubkey_ed25519: string;
// sig_timestamp?: number;
// } & StoreAlwaysNeeded;
// };
/**
* OXEND_REQUESTS
*/
@ -105,19 +82,54 @@ export type GetServiceNodesSubRequest = {
};
};
export type StoreOnNodeParamsNoSig = {
pubkey: string;
ttl: number;
timestamp: number;
data64: string;
namespace: number;
};
export type StoreOnNodeParams = {
pubkey: string;
ttl: string;
timestamp: string;
ttl: number;
timestamp: number;
data: string;
namespace: number;
signature?: string;
pubkey_ed25519?: string;
};
export type DeleteFromNodeWithTimestampParams = {
timestamp: string | number;
} & DeleteSigParameters;
export type DeleteByHashesFromNodeParams = { messages: Array<string> } & DeleteSigParameters;
export type StoreOnNodeMessage = {
pubkey: string;
timestamp: number;
namespace: number;
message: SharedConfigMessage;
};
export type StoreOnNodeSubRequest = { method: 'store'; params: StoreOnNodeParams };
export type NetworkTimeSubRequest = { method: 'info'; params: {} };
type DeleteSigParameters = {
pubkey: string;
pubkey_ed25519: string;
signature: string;
};
export type DeleteAllFromNodeSubRequest = {
method: 'delete_all';
params: DeleteFromNodeWithTimestampParams;
};
export type DeleteFromNodeSubRequest = {
method: 'delete';
params: DeleteByHashesFromNodeParams;
};
export type OxendSubRequest = OnsResolveSubRequest | GetServiceNodesSubRequest;
export type SnodeApiSubRequests =
@ -125,7 +137,9 @@ export type SnodeApiSubRequests =
| SwarmForSubRequest
| OxendSubRequest
| StoreOnNodeSubRequest
| NetworkTimeSubRequest;
| NetworkTimeSubRequest
| DeleteFromNodeSubRequest
| DeleteAllFromNodeSubRequest;
// tslint:disable: array-type
export type NonEmptyArray<T> = [T, ...T[]];

View file

@ -5,22 +5,29 @@ import { snodeRpc } from './sessionRpc';
import { NotEmptyArrayOfBatchResults, SnodeApiSubRequests } from './SnodeRequestTypes';
/**
* This is the equivalent to the batch send on sogs. The target node runs each sub request and returns a list of all the sub status and bodies.
* This is the equivalent to the batch send on sogs. The target node runs each sub request and returns a list of all the sub status and bodies.
* If the global status code is not 200, an exception is thrown.
* The body is already parsed from json and is enforced to be an Array of at least one element
* @param subRequests the list of requests to do
* @param targetNode the node to do the request to, once all the onion routing is done
* @param timeout the timeout at which we should cancel this request.
* @param associatedWith used mostly for handling 421 errors, we need the pubkey the change is associated to
* @param method can be either batch or sequence. A batch call will run all calls even if one of them fails. A sequence call will stop as soon as the first one fails
*/
export async function doSnodeBatchRequest(
subRequests: Array<SnodeApiSubRequests>,
targetNode: Snode,
timeout: number,
associatedWith?: string
associatedWith?: string,
method: 'batch' | 'sequence' = 'batch'
): Promise<NotEmptyArrayOfBatchResults> {
console.warn(
`doSnodeBatchRequest "${method}":`,
subRequests.map(m => m.method),
subRequests
);
const result = await snodeRpc({
method: 'batch',
method,
params: { requests: subRequests },
targetNode,
associatedWith,

View file

@ -0,0 +1,82 @@
// import { isEmpty, sample } from 'lodash';
// import pRetry from 'p-retry';
// import { Snode } from '../../../data/data';
// import { ed25519Str } from '../../onions/onionPath';
// import { SingleDestinationChanges } from '../../utils/job_runners/jobs/ConfigurationSyncJob';
// import { doSnodeBatchRequest } from './batchRequest';
// import { SnodeAPI } from './SNodeAPI';
// import { getSwarmFor } from './snodePool';
// import { StoreOnNodeSubRequest } from './SnodeRequestTypes';
// function prepareRequest(singleDestChange: SingleDestinationChanges): Array<StoreOnNodeSubRequest> {
// if (isEmpty(singleDestChange) || isEmpty(singleDestChange.messages)) {
// return [];
// }
// return singleDestChange.messages.map(message => {
// return { method: 'store', params: {} };
// });
// }
/**
* Locally deletes message and deletes message on the network (all nodes that contain the message)
*/
// const sendConfigMessages = async (
// singleDestChange: SingleDestinationChanges
// ): Promise<Array<string> | null> => {
// if (isEmpty(singleDestChange) || isEmpty(singleDestChange.messages)) {
// return true;
// }
// try {
// const result = await pRetry(
// async () => {
// const swarmToSendTo = await getSwarmFor(singleDestChange.destination);
// const snodeToMakeRequestTo: Snode | undefined = sample(swarmToSendTo);
// if (!snodeToMakeRequestTo) {
// window?.log?.warn('Cannot networkDeleteMessages, without a valid swarm node.');
// return null;
// }
// return pRetry(
// async () => {
// const ret = await doSnodeBatchRequest([{method: 'store', params: {}}]);
// if (!ret) {
// throw new Error(
// `Empty response got for delete on snode ${ed25519Str(
// snodeToMakeRequestTo.pubkey_ed25519
// )}`
// );
// }
// return results;
// }
// },
// {
// retries: 3,
// minTimeout: SnodeAPI.TEST_getMinTimeout(),
// onFailedAttempt: e => {
// window?.log?.warn(
// `delete INNER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
// );
// },
// }
// );
// },
// {
// retries: 3,
// minTimeout: SnodeAPI.TEST_getMinTimeout(),
// onFailedAttempt: e => {
// window?.log?.warn(
// `delete OUTER request attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
// );
// },
// }
// );
// return maliciousSnodes;
// } catch (e) {
// window?.log?.warn('failed to delete message on network:', e);
// return null;
// }
// };

View file

@ -56,7 +56,7 @@ async function buildRetrieveRequest(
if (pubkey !== ourPubkey) {
throw new Error('not a legacy closed group. pubkey can only be ours');
}
const signatureArgs = { ...retrieveParam, method: 'retrieve' as 'retrieve', ourPubkey };
const signatureArgs = { ...retrieveParam, method: 'retrieve' as const, ourPubkey };
const signatureBuilt = await SnodeSignature.getSnodeSignatureParams(signatureArgs);
const retrieve: RetrieveSubRequestType = {
method: 'retrieve',

View file

@ -19,7 +19,7 @@ export interface LokiFetchOptions {
* A small wrapper around node-fetch which deserializes response
* returns insecureNodeFetch response or false
*/
async function lokiFetch({
async function doRequest({
options,
url,
associatedWith,
@ -61,7 +61,7 @@ async function lokiFetch({
}
if (url.match(/https:\/\//)) {
// import that this does not get set in lokiFetch fetchOptions
// import that this does not get set in doRequest fetchOptions
fetchOptions.agent = snodeHttpsAgent;
}
@ -71,7 +71,7 @@ async function lokiFetch({
'Content-Type': APPLICATION_JSON,
};
window?.log?.warn(`insecureNodeFetch => lokiFetch of ${url}`);
window?.log?.warn(`insecureNodeFetch => doRequest of ${url}`);
const response = await insecureNodeFetch(url, {
...fetchOptions,
@ -136,7 +136,7 @@ export async function snodeRpc(
agent: null,
};
return lokiFetch({
return doRequest({
url,
options: fetchOptions,
targetNode,

View file

@ -1,5 +1,5 @@
import { getSodiumRenderer } from '../../crypto';
import { UserUtils, StringUtils } from '../../utils';
import { StringUtils, UserUtils } from '../../utils';
import { fromHexToArray, fromUInt8ArrayToBase64 } from '../../utils/String';
import { GetNetworkTime } from './getNetworkTime';
@ -7,14 +7,54 @@ export type SnodeSignatureResult = {
timestamp: number;
signature: string;
pubkey_ed25519: string;
namespace: number;
pubkey: string; // this is the x25519 key of the pubkey we are doing the request to (ourself for our swarm usually)
};
async function getSnodeSignatureByHashesParams({
messages,
method,
pubkey,
}: {
pubkey: string;
messages: Array<string>;
method: 'delete';
}): Promise<
Pick<SnodeSignatureResult, 'pubkey_ed25519' | 'signature' | 'pubkey'> & {
messages: Array<string>;
}
> {
const ourEd25519Key = await UserUtils.getUserED25519KeyPair();
if (!ourEd25519Key) {
const err = `getSnodeSignatureParams "${method}": User has no getUserED25519KeyPair()`;
window.log.warn(err);
throw new Error(err);
}
const edKeyPrivBytes = fromHexToArray(ourEd25519Key?.privKey);
const verificationData = StringUtils.encode(`${method}${messages.join('')}`, 'utf8');
const message = new Uint8Array(verificationData);
const sodium = await getSodiumRenderer();
try {
const signature = sodium.crypto_sign_detached(message, edKeyPrivBytes);
const signatureBase64 = fromUInt8ArrayToBase64(signature);
return {
signature: signatureBase64,
pubkey_ed25519: ourEd25519Key.pubKey,
pubkey,
messages,
};
} catch (e) {
window.log.warn('getSnodeSignatureParams failed with: ', e.message);
throw e;
}
}
async function getSnodeSignatureParams(params: {
pubkey: string;
namespace: number;
ourPubkey: string;
method: 'retrieve' | 'store';
namespace: number | null;
method: 'retrieve' | 'store' | 'delete_all';
}): Promise<SnodeSignatureResult> {
const ourEd25519Key = await UserUtils.getUserED25519KeyPair();
@ -44,7 +84,7 @@ async function getSnodeSignatureParams(params: {
timestamp: signatureTimestamp,
signature: signatureBase64,
pubkey_ed25519: ourEd25519Key.pubKey,
namespace,
pubkey: params.pubkey,
};
} catch (e) {
window.log.warn('getSnodeSignatureParams failed with: ', e.message);
@ -52,4 +92,4 @@ async function getSnodeSignatureParams(params: {
}
}
export const SnodeSignature = { getSnodeSignatureParams };
export const SnodeSignature = { getSnodeSignatureParams, getSnodeSignatureByHashesParams };

View file

@ -1,23 +1,63 @@
import { isEmpty } from 'lodash';
import { Snode } from '../../../data/data';
import { doSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { StoreOnNodeParams, StoreOnNodeSubRequest } from './SnodeRequestTypes';
import {
DeleteByHashesFromNodeParams,
DeleteFromNodeSubRequest,
NotEmptyArrayOfBatchResults,
StoreOnNodeParams,
StoreOnNodeSubRequest,
} from './SnodeRequestTypes';
function buildStoreRequests(params: StoreOnNodeParams): Array<StoreOnNodeSubRequest> {
const request: StoreOnNodeSubRequest = {
method: 'store',
params,
};
return [request];
function justStores(params: Array<StoreOnNodeParams>) {
return params.map(p => {
return {
method: 'store',
params: p,
} as StoreOnNodeSubRequest;
});
}
function buildStoreRequests(
params: Array<StoreOnNodeParams>,
toDeleteOnSequence: DeleteByHashesFromNodeParams | null
): Array<StoreOnNodeSubRequest | DeleteFromNodeSubRequest> {
if (!toDeleteOnSequence || isEmpty(toDeleteOnSequence)) {
return justStores(params);
}
return [...justStores(params), ...buildDeleteByHashesSubRequest(toDeleteOnSequence)];
}
function buildDeleteByHashesSubRequest(
params: DeleteByHashesFromNodeParams
): Array<DeleteFromNodeSubRequest> {
return [
{
method: 'delete',
params,
},
];
}
/**
* Send a 'store' request to the specified targetNode, using params as argument
* @returns the Array of stored hashes if it is a success, or null
*/
async function storeOnNode(
targetNode: Snode,
params: StoreOnNodeParams
): Promise<string | null | boolean> {
params: Array<StoreOnNodeParams>,
toDeleteOnSequence: DeleteByHashesFromNodeParams | null
): Promise<NotEmptyArrayOfBatchResults> {
try {
const subRequests = buildStoreRequests(params);
const result = await doSnodeBatchRequest(subRequests, targetNode, 4000, params.pubkey);
const subRequests = buildStoreRequests(params, toDeleteOnSequence);
const result = await doSnodeBatchRequest(
subRequests,
targetNode,
4000,
params[0].pubkey,
toDeleteOnSequence ? 'sequence' : 'batch'
);
if (!result || !result.length) {
window?.log?.warn(
@ -28,24 +68,16 @@ async function storeOnNode(
}
const firstResult = result[0];
console.warn('we should probably check other results code');
if (firstResult.code !== 200) {
window?.log?.warn('first result status is not 200 for storeOnNode but: ', firstResult.code);
throw new Error('storeOnNode: Invalid status code');
}
// no retry here. If an issue is with the path this is handled in lokiOnionFetch
// if there is an issue with the targetNode, we still send a few times this request to a few snodes in // already so it's handled
GetNetworkTime.handleTimestampOffsetFromNetwork('store', firstResult.body.t);
const parsed = firstResult.body;
GetNetworkTime.handleTimestampOffsetFromNetwork('store', parsed.t);
const messageHash = parsed.hash;
if (messageHash) {
return messageHash;
}
return true;
return result;
} catch (e) {
window?.log?.warn('store - send error:', e, `destination ${targetNode.ip}:${targetNode.port}`);
throw e;

View file

@ -16,6 +16,7 @@ export const TTL_DEFAULT = {
TYPING_MESSAGE: 20 * DURATION.SECONDS,
CALL_MESSAGE: 5 * 60 * DURATION.SECONDS,
TTL_MAX: 14 * DURATION.DAYS,
TTL_CONFIG: 30 * DURATION.DAYS,
};
export const SWARM_POLLING_TIMEOUT = {

View file

@ -4,6 +4,7 @@ import { SignalService } from '../../../../protobuf';
import { MessageParams } from '../Message';
import { ContentMessage } from '..';
import Long from 'long';
import { TTL_DEFAULT } from '../../../constants';
interface SharedConfigParams extends MessageParams {
seqno: Long;
@ -29,6 +30,10 @@ export class SharedConfigMessage extends ContentMessage {
});
}
public ttl(): number {
return TTL_DEFAULT.TTL_CONFIG;
}
protected sharedConfigProto(): SignalService.SharedConfigMessage {
return new SignalService.SharedConfigMessage({
data: this.data,

View file

@ -1,8 +1,8 @@
import { to_hex } from 'libsodium-wrappers-sumo';
import { isEmpty } from 'lodash';
import { getConversationController } from '../conversations';
import { UserUtils } from '../utils';
import { AvatarDownload } from '../utils/job_runners/jobs/AvatarDownloadJob';
import { toHex } from '../utils/String';
/**
* This can be used to update our conversation display name with the given name right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it
@ -47,7 +47,8 @@ async function updateProfileOfContact(
}
// add an avatar download job only if needed
const profileKeyHex = !profileKey || isEmpty(profileKey) ? null : to_hex(profileKey);
debugger;
const profileKeyHex = !profileKey || isEmpty(profileKey) ? null : toHex(profileKey);
await AvatarDownload.addAvatarDownloadJobIfNeeded({
profileKeyHex,

View file

@ -1,7 +1,7 @@
import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, MessageUtils, UserUtils } from '../utils';
import { PubKey, RawMessage } from '../types';
import { MessageSender } from '.';
import { MessageSender } from './';
import { ClosedGroupMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupMessage';
import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage';
import { ClosedGroupNameChangeMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNameChangeMessage';
@ -321,19 +321,13 @@ export class MessageQueue {
isGroup = false
): Promise<void> {
// Don't send to ourselves
const currentDevice = UserUtils.getOurPubKeyFromCache();
const us = UserUtils.getOurPubKeyFromCache();
let isSyncMessage = false;
if (currentDevice && destinationPk.isEqual(currentDevice)) {
if (us && destinationPk.isEqual(us)) {
// We allow a message for ourselve only if it's a ConfigurationMessage, a ClosedGroupNewMessage,
// or a message with a syncTarget set.
if (
message instanceof ConfigurationMessage ||
message instanceof ClosedGroupNewMessage ||
message instanceof UnsendMessage ||
message instanceof SharedConfigMessage ||
(message as any).syncTarget?.length > 0
) {
if (MessageSender.isSyncMessage(message)) {
window?.log?.warn('OutgoingMessageQueue: Processing sync message');
isSyncMessage = true;
} else {

View file

@ -1,38 +1,46 @@
// REMOVE COMMENT AFTER: This can just export pure functions as it doesn't need state
import { RawMessage } from '../types/RawMessage';
import { SignalService } from '../../protobuf';
import { MessageEncrypter } from '../crypto';
import { AbortController } from 'abort-controller';
import ByteBuffer from 'bytebuffer';
import _, { isEmpty, isNil, isString, sample } from 'lodash';
import pRetry from 'p-retry';
import { PubKey } from '../types';
import { Data } from '../../../ts/data/data';
import { SignalService } from '../../protobuf';
import { OpenGroupRequestCommonType } from '../apis/open_group_api/opengroupV2/ApiUtil';
import { OpenGroupMessageV2 } from '../apis/open_group_api/opengroupV2/OpenGroupMessageV2';
import { fromUInt8ArrayToBase64 } from '../utils/String';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { addMessagePadding } from '../crypto/BufferPadding';
import _ from 'lodash';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import { MessageSender } from '.';
import { Data, Snode } from '../../../ts/data/data';
import { getConversationController } from '../conversations';
import { ed25519Str } from '../onions/onionPath';
import { EmptySwarmError } from '../utils/errors';
import ByteBuffer from 'bytebuffer';
import {
sendMessageOnionV4BlindedRequest,
sendSogsMessageOnionV4,
} from '../apis/open_group_api/sogsv3/sogsV3SendMessage';
import { AbortController } from 'abort-controller';
import { SnodeAPIStore } from '../apis/snode_api/storeMessage';
import { StoreOnNodeParams } from '../apis/snode_api/SnodeRequestTypes';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
import { SnodeNamespace, SnodeNamespaces } from '../apis/snode_api/namespaces';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import {
NotEmptyArrayOfBatchResults,
StoreOnNodeMessage,
StoreOnNodeParams,
StoreOnNodeParamsNoSig,
} from '../apis/snode_api/SnodeRequestTypes';
import { SnodeSignature, SnodeSignatureResult } from '../apis/snode_api/snodeSignatures';
import { UserUtils } from '../utils';
import { SnodeAPIStore } from '../apis/snode_api/storeMessage';
import { getConversationController } from '../conversations';
import { MessageEncrypter } from '../crypto';
import { addMessagePadding } from '../crypto/BufferPadding';
import { ContentMessage } from '../messages/outgoing';
import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage';
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { SharedConfigMessage } from '../messages/outgoing/controlMessage/SharedConfigMessage';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { ed25519Str } from '../onions/onionPath';
import { PubKey } from '../types';
import { RawMessage } from '../types/RawMessage';
import { EmptySwarmError } from '../utils/errors';
import { fromUInt8ArrayToBase64 } from '../utils/String';
// ================ SNODE STORE ================
function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) {
function overwriteOutgoingTimestampWithNetworkTimestamp(message: { plainTextBuffer: Uint8Array }) {
const networkTimestamp = GetNetworkTime.getNowWithNetworkOffset();
const { plainTextBuffer } = message;
@ -63,89 +71,106 @@ function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) {
return { overRiddenTimestampBuffer, networkTimestamp };
}
export function getMinRetryTimeout() {
function getMinRetryTimeout() {
return 1000;
}
function isSyncMessage(message: ContentMessage) {
if (
message instanceof ConfigurationMessage ||
message instanceof ClosedGroupNewMessage ||
message instanceof UnsendMessage ||
message instanceof SharedConfigMessage ||
(message as any).syncTarget?.length > 0
) {
return true;
} else {
return false;
}
}
/**
* Send a message via service nodes.
* Send a single message via service nodes.
*
* @param message The message to send.
* @param attempts The amount of times to attempt sending. Minimum value is 1.
*/
export async function send(
async function send(
message: RawMessage,
attempts: number = 3,
retryMinTimeout?: number, // in ms
isSyncMessage?: boolean
isASyncMessage?: boolean
): Promise<{ wrappedEnvelope: Uint8Array; effectiveTimestamp: number }> {
return pRetry(
async () => {
const recipient = PubKey.cast(message.device);
const { encryption, ttl } = message;
let namespace = message.namespace;
const { ttl } = message;
const {
overRiddenTimestampBuffer,
networkTimestamp,
} = overwriteOutgoingTimestampWithNetworkTimestamp(message);
// we can only have a single message in this send function for now
const [encryptedAndWrapped] = await encryptMessagesAndWrap([
{
destination: message.device,
plainTextBuffer: message.plainTextBuffer,
namespace: message.namespace,
ttl,
identifier: message.identifier,
isSyncMessage: Boolean(isASyncMessage),
},
]);
const { envelopeType, cipherText } = await MessageEncrypter.encrypt(
recipient,
overRiddenTimestampBuffer,
encryption
);
const envelope = await buildEnvelope(
envelopeType,
recipient.key,
networkTimestamp,
cipherText
);
const data = wrapEnvelope(envelope);
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
// before we return from the await below.
// and the isDuplicate messages relies on sent_at timestamp to be valid.
const found = await Data.getMessageById(message.identifier);
const found = await Data.getMessageById(encryptedAndWrapped.identifier);
// make sure to not update the sent timestamp if this a currently syncing message
if (found && !found.get('sentSync')) {
found.set({ sent_at: networkTimestamp });
found.set({ sent_at: encryptedAndWrapped.networkTimestamp });
await found.commit();
}
// right when we upgrade from not having namespaces stored in the outgoing cached messages our messages won't have a namespace associated.
// So we need to keep doing the lookup of where they should go if the namespace is not set.
if (namespace === null || namespace === undefined) {
namespace = getConversationController()
.get(recipient.key)
?.isClosedGroup()
? SnodeNamespaces.ClosedGroupMessage
: SnodeNamespaces.UserMessages;
const batchResult = await MessageSender.sendMessagesDataToSnode(
[
{
pubkey: recipient.key,
data64: encryptedAndWrapped.data64,
ttl,
timestamp: encryptedAndWrapped.networkTimestamp,
namespace: encryptedAndWrapped.namespace,
},
],
recipient.key,
null
);
const isDestinationClosedGroup = getConversationController()
.get(recipient.key)
?.isClosedGroup();
// If message also has a sync message, save that hash. Otherwise save the hash from the regular message send i.e. only closed groups in this case.
if (
encryptedAndWrapped.identifier &&
(encryptedAndWrapped.isSyncMessage || isDestinationClosedGroup) &&
batchResult &&
!isEmpty(batchResult) &&
batchResult[0].code === 200 &&
!isEmpty(batchResult[0].body.hash)
) {
const messageSendHash = batchResult[0].body.hash;
const foundMessage = await Data.getMessageById(encryptedAndWrapped.identifier);
if (foundMessage) {
await foundMessage.updateMessageHash(messageSendHash);
await foundMessage.commit();
window?.log?.info(
`updated message ${foundMessage.get('id')} with hash: ${foundMessage.get(
'messageHash'
)}`
);
}
}
const timestamp = networkTimestamp;
// the user config namespaces requires a signature to be added
let signOpts: SnodeSignatureResult | undefined;
if (SnodeNamespace.isUserConfigNamespace(namespace)) {
signOpts = await SnodeSignature.getSnodeSignatureParams({
method: 'store' as 'store',
namespace,
ourPubkey: UserUtils.getOurPubKeyStrFromCache(),
pubkey: recipient.key,
});
}
await MessageSender.sendMessageToSnode({
pubKey: recipient.key,
data,
ttl,
timestamp,
isSyncMessage,
messageId: message.identifier,
namespace,
...signOpts,
});
return { wrappedEnvelope: data, effectiveTimestamp: networkTimestamp };
return {
wrappedEnvelope: encryptedAndWrapped.data,
effectiveTimestamp: encryptedAndWrapped.networkTimestamp,
};
},
{
retries: Math.max(attempts - 1, 0),
@ -155,100 +180,267 @@ export async function send(
);
}
export type SendMessageSignatureOpts = {
signature?: string; // needed for some namespaces
namespace: SnodeNamespaces;
pubkey_ed25519?: string;
timestamp: number;
};
async function sendMessagesDataToSnode(
params: Array<StoreOnNodeParamsNoSig>,
destination: string,
oldMessageHashes: Array<string> | null
): Promise<NotEmptyArrayOfBatchResults> {
const rightDestination = params.filter(m => m.pubkey === destination);
const swarm = await getSwarmFor(destination);
// tslint:disable-next-line: function-name
export async function sendMessageToSnode({
data,
namespace,
pubKey,
timestamp,
ttl,
isSyncMessage,
signature,
messageId,
pubkey_ed25519,
}: {
pubKey: string;
data: Uint8Array;
ttl: number;
namespace: SnodeNamespaces;
isSyncMessage?: boolean;
messageId?: string;
} & SendMessageSignatureOpts): Promise<void> {
const data64 = ByteBuffer.wrap(data).toString('base64');
const swarm = await getSwarmFor(pubKey);
const withSigWhenRequired: Array<StoreOnNodeParams> = await Promise.all(
rightDestination.map(async item => {
// some namespaces require a signature to be added
let signOpts: SnodeSignatureResult | undefined;
if (SnodeNamespace.isUserConfigNamespace(item.namespace)) {
signOpts = await SnodeSignature.getSnodeSignatureParams({
method: 'store' as const,
namespace: item.namespace,
pubkey: destination,
});
}
const store: StoreOnNodeParams = {
data: item.data64,
namespace: item.namespace,
pubkey: item.pubkey,
timestamp: item.timestamp,
ttl: item.ttl,
...signOpts,
};
return store;
})
);
const conversation = getConversationController().get(pubKey);
const isClosedGroup = conversation?.isClosedGroup();
debugger;
const signedDeleteOldHashesRequest = oldMessageHashes?.length
? await SnodeSignature.getSnodeSignatureByHashesParams({
method: 'delete' as const,
messages: oldMessageHashes,
pubkey: destination,
})
: null;
// send parameters
const params: StoreOnNodeParams = {
pubkey: pubKey,
ttl: `${ttl}`,
timestamp: `${timestamp}`,
data: data64,
namespace,
};
if (signature && pubkey_ed25519) {
params.signature = signature;
params.pubkey_ed25519 = pubkey_ed25519;
const snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(destination, 'Ran out of swarm nodes to query');
}
const usedNodes = _.slice(swarm, 0, 1);
if (!usedNodes || usedNodes.length === 0) {
throw new EmptySwarmError(pubKey, 'Ran out of swarm nodes to query');
}
let successfulSendHash: string | undefined;
let snode: Snode | undefined;
const snodeTried = usedNodes[0];
try {
// No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch.
// the only case we could care about a retry would be when the usedNode is not correct,
// but considering we trigger this request with a few snode in //, this should be fine.
const successfulSend = await SnodeAPIStore.storeOnNode(snodeTried, params);
const storeResults = await SnodeAPIStore.storeOnNode(
snode,
withSigWhenRequired,
signedDeleteOldHashesRequest
);
if (successfulSend) {
if (_.isString(successfulSend)) {
successfulSendHash = successfulSend;
}
snode = snodeTried;
if (snode) {
window?.log?.info(
`sendMessagesToSnode - Successfully stored messages to ${ed25519Str(destination)} via ${
snode.ip
}:${snode.port} on namespaces: ${rightDestination.map(m => m.namespace).join(',')}`
);
}
return storeResults;
} catch (e) {
const snodeStr = snodeTried ? `${snodeTried.ip}:${snodeTried.port}` : 'null';
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`loki_message:::sendMessage - "${e.code}:${e.message}" to ${pubKey} via snode:${snodeStr}`
`sendMessagesToSnode - "${e.code}:${e.message}" to ${destination} via snode:${snodeStr}`
);
throw e;
}
}
// If message also has a sync message, save that hash. Otherwise save the hash from the regular message send i.e. only closed groups in this case.
if (messageId && (isSyncMessage || isClosedGroup) && successfulSendHash) {
const message = await Data.getMessageById(messageId);
if (message) {
await message.updateMessageHash(successfulSendHash);
await message.commit();
window?.log?.info(
`updated message ${message.get('id')} with hash: ${message.get('messageHash')}`
);
}
function encryptionBasedOnConversation(destination: PubKey) {
if (
getConversationController()
.get(destination.key)
?.isClosedGroup()
) {
return SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE;
} else {
return SignalService.Envelope.Type.SESSION_MESSAGE;
}
}
if (snode) {
window?.log?.info(
`loki_message:::sendMessage - Successfully stored message to ${ed25519Str(pubKey)} via ${
snode.ip
}:${snode.port} on namespace: ${namespace}`
type SharedEncryptAndWrap = {
ttl: number;
identifier: string;
isSyncMessage: boolean;
};
type EncryptAndWrapMessage = {
plainTextBuffer: Uint8Array;
destination: string;
namespace: number | null;
} & SharedEncryptAndWrap;
type EncryptAndWrapMessageResults = {
data64: string;
networkTimestamp: number;
data: Uint8Array;
namespace: number;
} & SharedEncryptAndWrap;
async function encryptMessageAndWrap(
params: EncryptAndWrapMessage
): Promise<EncryptAndWrapMessageResults> {
const {
destination,
identifier,
isSyncMessage: syncMessage,
namespace,
plainTextBuffer,
ttl,
} = params;
const {
overRiddenTimestampBuffer,
networkTimestamp,
} = overwriteOutgoingTimestampWithNetworkTimestamp({ plainTextBuffer });
const recipient = PubKey.cast(destination);
const { envelopeType, cipherText } = await MessageEncrypter.encrypt(
recipient,
overRiddenTimestampBuffer,
encryptionBasedOnConversation(recipient)
);
const envelope = await buildEnvelope(envelopeType, recipient.key, networkTimestamp, cipherText);
const data = wrapEnvelope(envelope);
const data64 = ByteBuffer.wrap(data).toString('base64');
// override the namespaces if those are unset in the incoming messages
// right when we upgrade from not having namespaces stored in the outgoing cached messages our messages won't have a namespace associated.
// So we need to keep doing the lookup of where they should go if the namespace is not set.
const overridenNamespace = !isNil(namespace)
? namespace
: getConversationController()
.get(recipient.key)
?.isClosedGroup()
? SnodeNamespaces.ClosedGroupMessage
: SnodeNamespaces.UserMessages;
return {
data64,
networkTimestamp,
data,
namespace: overridenNamespace,
ttl,
identifier,
isSyncMessage: syncMessage,
};
}
async function encryptMessagesAndWrap(
messages: Array<EncryptAndWrapMessage>
): Promise<Array<EncryptAndWrapMessageResults>> {
return Promise.all(messages.map(encryptMessageAndWrap));
}
/**
* Send a list of messages to a single service node.
* Used currently only for sending SharedConfigMessage to multiple messages at a time.
*
* @param params the messages to deposit
* @param destination the pubkey we should deposit those message for
* @returns the hashes of successful deposit
*/
async function sendMessagesToSnode(
params: Array<StoreOnNodeMessage>,
destination: string,
oldMessageHashes: Array<string> | null
): Promise<NotEmptyArrayOfBatchResults | null> {
try {
const recipient = PubKey.cast(destination);
const encryptedAndWrapped = await encryptMessagesAndWrap(
params.map(m => ({
destination: m.pubkey,
plainTextBuffer: m.message.plainTextBuffer(),
namespace: m.namespace,
ttl: m.message.ttl(),
identifier: m.message.identifier,
isSyncMessage: MessageSender.isSyncMessage(m.message),
}))
);
// first update all the associated timestamps of our messages in DB, if the outgoing messages are associated with one.
await Promise.all(
encryptedAndWrapped.map(async (m, index) => {
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
// before we return from the await below.
// and the isDuplicate messages relies on sent_at timestamp to be valid.
const found = await Data.getMessageById(m.identifier);
// make sure to not update the sent timestamp if this a currently syncing message
if (found && !found.get('sentSync')) {
found.set({ sent_at: encryptedAndWrapped[index].networkTimestamp });
await found.commit();
}
})
);
const batchResults = await pRetry(
async () => {
return MessageSender.sendMessagesDataToSnode(
encryptedAndWrapped.map(wrapped => ({
pubkey: recipient.key,
data64: wrapped.data64,
ttl: wrapped.ttl,
timestamp: wrapped.networkTimestamp,
namespace: wrapped.namespace,
})),
recipient.key,
oldMessageHashes
);
},
{
retries: 2,
factor: 1,
minTimeout: MessageSender.getMinRetryTimeout(),
}
);
if (!batchResults || isEmpty(batchResults)) {
throw new Error('result is empty for sendMessagesToSnode');
}
const isDestinationClosedGroup = getConversationController()
.get(recipient.key)
?.isClosedGroup();
await Promise.all(
encryptedAndWrapped.map(async (message, index) => {
// If message also has a sync message, save that hash. Otherwise save the hash from the regular message send i.e. only closed groups in this case.
if (
message.identifier &&
(message.isSyncMessage || isDestinationClosedGroup) &&
batchResults[index] &&
!isEmpty(batchResults[index]) &&
isString(batchResults[index].body.hash)
) {
const hashFoundInResponse = batchResults[index].body.hash;
const foundMessage = await Data.getMessageById(message.identifier);
if (foundMessage) {
await foundMessage.updateMessageHash(hashFoundInResponse);
await foundMessage.commit();
window?.log?.info(
`updated message ${foundMessage.get('id')} with hash: ${foundMessage.get(
'messageHash'
)}`
);
}
}
})
);
return batchResults;
} catch (e) {
window.log.warn(`sendMessagesToSnode failed with ${e.message}`);
return null;
}
}
@ -296,7 +488,7 @@ function wrapEnvelope(envelope: SignalService.Envelope): Uint8Array {
* Send a message to an open group v2.
* @param message The open group message.
*/
export async function sendToOpenGroupV2(
async function sendToOpenGroupV2(
rawMessage: OpenGroupVisibleMessage,
roomInfos: OpenGroupRequestCommonType,
blinded: boolean,
@ -324,7 +516,7 @@ export async function sendToOpenGroupV2(
* Send a message to an open group v2.
* @param message The open group message.
*/
export async function sendToOpenGroupV2BlindedRequest(
async function sendToOpenGroupV2BlindedRequest(
encryptedContent: Uint8Array,
roomInfos: OpenGroupRequestCommonType,
recipientBlindedId: string
@ -344,3 +536,13 @@ export async function sendToOpenGroupV2BlindedRequest(
);
return msg;
}
export const MessageSender = {
sendToOpenGroupV2BlindedRequest,
sendMessagesDataToSnode,
sendMessagesToSnode,
getMinRetryTimeout,
sendToOpenGroupV2,
send,
isSyncMessage,
};

View file

@ -1,6 +1,3 @@
// TS 3.8 supports export * as X from 'Y'
import * as MessageSender from './MessageSender';
export { MessageSender };
export * from './PendingMessageCache';
export * from './MessageQueue';
export * from './MessageSender';

View file

@ -188,7 +188,7 @@ export async function waitUntil(check: () => Return<boolean>, timeoutMs: number
);
}
export async function timeout<T>(promise: Promise<T>, timeoutMs: number = 2000): Promise<T> {
export async function timeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
const timeoutPromise = new Promise<T>((_, rej) => {
const wait = setTimeout(() => {
clearTimeout(wait);

View file

@ -1,5 +1,6 @@
import { cloneDeep, compact, isArray, isString } from 'lodash';
import { Data } from '../../../data/data';
import { timeout } from '../Promise';
import { persistedJobFromData } from './JobDeserialization';
import { JobRunnerType } from './jobs/JobRunnerType';
import {
@ -286,7 +287,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
}
this.currentJob = nextJob;
success = await this.currentJob.runJob();
success = await timeout(this.currentJob.runJob(), this.currentJob.getJobTimeoutMs());
if (success !== RunJobResult.Success) {
throw new Error(`job ${nextJob.persistedData.identifier} failed`);

View file

@ -129,6 +129,8 @@ export abstract class PersistedJob<T extends PersistedJobData> {
: null;
}
public abstract getJobTimeoutMs(): number;
/**
* This function will be called by the runner do run the logic of that job.
* It **must** return true if that job is a success and doesn't need to be retried.

View file

@ -60,6 +60,7 @@ async function addAvatarDownloadJobIfNeeded({
profileKeyHex: string | null | undefined;
}) {
if (profileKeyHex && shouldAddAvatarDownloadJob({ pubkey, profileUrl, profileKeyHex })) {
debugger;
const avatarDownloadJob = new AvatarDownloadJob({
conversationId: pubkey,
profileKeyHex,
@ -278,6 +279,10 @@ class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData> {
}
return null;
}
public getJobTimeoutMs(): number {
return 10000;
}
}
export const AvatarDownload = {

View file

@ -1,16 +1,17 @@
import { from_string } from 'libsodium-wrappers-sumo';
import { isNumber } from 'lodash';
import Long from 'long';
import { compact, groupBy, isArray, isEmpty, isNumber, isString, uniq } from 'lodash';
import { v4 } from 'uuid';
import { UserUtils } from '../..';
import { SignalService } from '../../../../protobuf';
import { UserConfigWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../../../apis/snode_api/namespaces';
import { ConfigDumpData } from '../../../../data/configDump/configDump';
import {
GenericWrapperActions,
UserConfigWrapperActions,
} from '../../../../webworker/workers/browser/libsession_worker_interface';
import { NotEmptyArrayOfBatchResults } from '../../../apis/snode_api/SnodeRequestTypes';
import { getConversationController } from '../../../conversations';
import { SharedConfigMessage } from '../../../messages/outgoing/controlMessage/SharedConfigMessage';
import { getMessageQueue } from '../../../sending';
import { PubKey } from '../../../types';
import { MessageSender } from '../../../sending/MessageSender';
import { LibSessionUtil, OutgoingConfResult } from '../../libsession/libsession_utils';
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
@ -22,6 +23,148 @@ import {
const defaultMsBetweenRetries = 3000;
const defaultMaxAttempts = 3;
export type SingleDestinationChanges = {
destination: string;
messages: Array<OutgoingConfResult>;
allOldHashes: Array<string>;
};
type SuccessfulChange = {
message: SharedConfigMessage;
publicKey: string;
updatedHash: Array<string>;
};
/**
* Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call.
* To make this easier, this function prebuilds and merges together all the changes for each pubkey.
*/
async function retrieveSingleDestinationChanges(): Promise<Array<SingleDestinationChanges>> {
const outgoingConfResults = await LibSessionUtil.pendingChangesForPubkey(
UserUtils.getOurPubKeyStrFromCache()
);
const groupedByDestination = groupBy(outgoingConfResults, m => m.destination);
const singleDestChanges: Array<SingleDestinationChanges> = Object.keys(groupedByDestination).map(
destination => {
const messages = groupedByDestination[destination];
const uniqHashes = compact(
uniq(messages.filter(m => m.oldMessageHashes).map(m => m.oldMessageHashes)).flat()
);
return { allOldHashes: uniqHashes, destination, messages };
}
);
return singleDestChanges;
}
/**
* This function is run once we get the results from the multiple batch-send.
* For each results, it checks wha
*/
function resultsToSuccessfulChange(
allResults: Array<PromiseSettledResult<NotEmptyArrayOfBatchResults | null>>,
requests: Array<SingleDestinationChanges>
): Array<SuccessfulChange> {
const successfulChanges: Array<SuccessfulChange> = [];
/**
* For each batch request, we get as result
* - status code + hash of the new config message
* - status code of the delete of all messages as given by the request hashes.
*
* As it is a sequence, the delete might have failed but the new config message might still be posted.
* So we need to check which request failed, and if it is the delete by hashes, we need to add the hash of the posted message to the list of hashes
*/
debugger;
try {
for (let i = 0; i < allResults.length; i++) {
const result = allResults[i];
// the batch send was rejected. Let's skip handling those results altogether. Another job will handle the retry logic.
if (result.status !== 'fulfilled') {
continue;
}
const resultValue = result.value;
if (!resultValue) {
continue;
}
const request = requests?.[i];
if (!result) {
continue;
}
const didDeleteOldConfigMessages = Boolean(
!isEmpty(request.allOldHashes) &&
resultValue &&
resultValue?.length &&
request &&
resultValue[resultValue.length - 1].code === 200
);
for (let j = 0; j < resultValue.length; j++) {
const batchResult = resultValue[j];
const messagePostedHashes = batchResult?.body?.hash;
if (
batchResult.code === 200 &&
isString(messagePostedHashes) &&
request.messages?.[j].message &&
request.destination
) {
// a message was posted. We need to add it to the tracked list of hashes
console.warn(
`messagePostedHashes for j:${j}; didDeleteOldConfigMessages:${didDeleteOldConfigMessages}: `,
messagePostedHashes
);
successfulChanges.push({
publicKey: request.destination,
updatedHash: didDeleteOldConfigMessages
? [messagePostedHashes]
: [...request.allOldHashes, messagePostedHashes],
message: request.messages?.[j].message,
});
}
}
}
} catch (e) {
console.warn('eeee', e);
debugger;
throw e;
}
return successfulChanges;
}
async function buildAndSaveDumpsToDB(changes: Array<SuccessfulChange>): Promise<void> {
for (let i = 0; i < changes.length; i++) {
const change = changes[i];
const variant = LibSessionUtil.kindToVariant(change.message.kind);
const needsDump = await LibSessionUtil.markAsPushed(
variant,
change.publicKey,
change.message.seqno.toNumber()
);
if (!needsDump) {
continue;
}
const dump = await GenericWrapperActions.dump(variant);
console.warn('change.updatedHash', change.updatedHash);
await ConfigDumpData.saveConfigDump({
data: dump,
publicKey: change.publicKey,
variant,
combinedMessageHashes: change.updatedHash,
});
}
}
class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData> {
constructor({
identifier,
@ -45,11 +188,16 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
}
public async run(): Promise<RunJobResult> {
if (!window.sessionFeatureFlags.useSharedUtilForUserConfig) {
return RunJobResult.Success;
}
window.log.debug(`ConfigurationSyncJob starting ${this.persistedData.identifier}`);
const us = UserUtils.getOurPubKeyStrFromCache();
const ed25519Key = await UserUtils.getUserED25519KeyPairBytes();
const conversation = getConversationController().get(us);
if (!us || !conversation) {
if (!us || !conversation || !ed25519Key) {
// we check for ed25519Key because it is needed for authenticated requests
window.log.warn('did not find our own conversation');
return RunJobResult.PermanentFailure;
}
@ -64,33 +212,49 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
await UserConfigWrapperActions.setProfilePicture('', new Uint8Array());
}
const data = await UserConfigWrapperActions.push();
const singleDestChanges = await retrieveSingleDestinationChanges();
const message = new SharedConfigMessage({
data: data.data,
kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE,
seqno: Long.fromNumber(data.seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
});
// If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't scuedule another run in this case)
const result = await getMessageQueue().sendToPubKeyNonDurably({
message,
namespace: SnodeNamespaces.UserProfile,
pubkey: PubKey.cast(us),
});
console.warn(
`ConfigurationSyncJob sendToPubKeyNonDurably ${this.persistedData.identifier} returned: "${result}"`
);
if (isNumber(result)) {
// try {
// markAsPushed
// }
debugger;
if (isEmpty(singleDestChanges)) {
return RunJobResult.Success;
}
return RunJobResult.RetryJobIfPossible;
const allResults = await Promise.allSettled(
singleDestChanges.map(async dest => {
const msgs = dest.messages.map(item => {
return {
namespace: item.namespace,
pubkey: item.destination,
timestamp: item.message.timestamp,
ttl: item.message.ttl(),
message: item.message,
};
});
return MessageSender.sendMessagesToSnode(msgs, dest.destination, dest.allOldHashes);
})
);
console.warn(
`ConfigurationSyncJob sendToPubKeyNonDurably ${this.persistedData.identifier} returned: "${allResults}"`
);
// we do a sequence call here. If we do not have the right expected number of results, consider it
debugger;
if (!isArray(allResults) || allResults.length !== singleDestChanges.length) {
return RunJobResult.RetryJobIfPossible;
}
const changes = resultsToSuccessfulChange(allResults, singleDestChanges);
if (isEmpty(changes)) {
return RunJobResult.RetryJobIfPossible;
}
// Now that we have the successful changes, we need to mark them as pushed and
// generate any config dumps which need to be stored
await buildAndSaveDumpsToDB(changes);
return RunJobResult.Success;
}
public serializeJob(): ConfigurationSyncPersistedData {
@ -110,6 +274,10 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
public nonRunningJobsToRemove(_jobs: Array<ConfigurationSyncPersistedData>) {
return [];
}
public getJobTimeoutMs(): number {
return 20000;
}
}
/**

View file

@ -1,50 +1,90 @@
import { difference, isEqual } from 'lodash';
import { from_hex } from 'libsodium-wrappers-sumo';
import { difference, isEqual, omit } from 'lodash';
import Long from 'long';
import { UserUtils } from '..';
import { ConfigDumpData } from '../../../data/configDump/configDump';
import { SignalService } from '../../../protobuf';
import { ConfigWrapperObjectTypes } from '../../../webworker/workers/browser/libsession_worker_functions';
import {
GenericWrapperActions,
UserConfigWrapperActions,
} from '../../../webworker/workers/browser/libsession_worker_interface';
import { GetNetworkTime } from '../../apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../../apis/snode_api/namespaces';
import { getConversationController } from '../../conversations';
import { SharedConfigMessage } from '../../messages/outgoing/controlMessage/SharedConfigMessage';
import { ConfigurationSync } from '../job_runners/jobs/ConfigurationSyncJob';
// TODO complete this list
const requiredVariants: Array<ConfigWrapperObjectTypes> = ['UserConfig', 'ContactsConfig']; // 'conversations'
const requiredUserDumpVariants: Array<ConfigWrapperObjectTypes> = ['UserConfig', 'ContactsConfig']; // 'conversations'
async function insertUserProfileIntoWrapper() {
export type IncomingConfResult = {
needsPush: boolean;
needsDump: boolean;
messageHashes: Array<string>;
latestSentTimestamp: number;
};
export type OutgoingConfResult = {
message: SharedConfigMessage;
namespace: SnodeNamespaces;
destination: string;
oldMessageHashes?: Array<string>;
};
async function insertUserProfileIntoWrapperIfChanged() {
const us = UserUtils.getOurPubKeyStrFromCache();
const ourConvo = getConversationController().get(us);
if (!ourConvo) {
throw new Error('insertUserProfileIntoWrapper needs a ourConvo to exist');
}
const currentWrapperName = await UserConfigWrapperActions.getName();
const currentWrapperProfileUrl = await UserConfigWrapperActions.getProfilePicture();
const wrapperName = await UserConfigWrapperActions.getName();
const wrapperProfilePicture = await UserConfigWrapperActions.getProfilePicture();
const wrapperProfileUrl = wrapperProfilePicture.url || '';
const wrapperProfileKey = wrapperProfilePicture.key || '';
const currentDbName = ourConvo.get('displayNameInProfile') || '';
if (!isEqual(currentDbName, currentWrapperName)) {
await UserConfigWrapperActions.setName(currentDbName);
const dbName = ourConvo.get('displayNameInProfile') || '';
const dbProfileUrl = ourConvo.get('avatarPointer') || '';
const dbProfileKey = from_hex(ourConvo.get('profileKey') || '');
if (!isEqual(dbName, wrapperName)) {
await UserConfigWrapperActions.setName(dbName);
}
if (!isEqual(dbProfileUrl, wrapperProfileUrl) || !isEqual(dbProfileKey, wrapperProfileKey)) {
await UserConfigWrapperActions.setProfilePicture(wrapperProfileUrl, dbProfileKey);
}
}
/**
* Right after we migrated, we won't have any dumps in DB. We must create them from our database state,
*/
async function createConfigDumpsFromDbFirstStart(privateKeyEd25519: Uint8Array) {
let countCreated = 0;
async function createConfigDumpsFromDbFirstStart(
privateKeyEd25519: Uint8Array
): Promise<Array<ConfigWrapperObjectTypes>> {
const justCreated: Array<ConfigWrapperObjectTypes> = [];
try {
console.warn('createConfigDumpsFromDbFirstStart');
// build the userconfig
await UserConfigWrapperActions.init(privateKeyEd25519, null);
countCreated++;
await insertUserProfileIntoWrapperIfChanged();
const data = await UserConfigWrapperActions.dump();
// save it to the DB
await ConfigDumpData.saveConfigDump({
combinedMessageHashes: [],
data,
publicKey: UserUtils.getOurPubKeyStrFromCache(),
variant: 'UserConfig',
});
justCreated.push('UserConfig');
} catch (e) {
window.log.warn('Failed to init the UserConfig with createConfigDumpsFromDbFirstStart');
}
if (countCreated > 0) {
await ConfigurationSync.queueNewJobIfNeeded();
if (justCreated.length > 0) {
setTimeout(() => ConfigurationSync.queueNewJobIfNeeded, 3000);
}
return justCreated;
}
async function initializeLibSessionUtilWrappers() {
@ -55,24 +95,31 @@ async function initializeLibSessionUtilWrappers() {
const privateKeyEd25519 = keypair.privKeyBytes;
let dumps = await ConfigDumpData.getAllDumpsWithData();
let createdDuringFirstStart: Array<ConfigWrapperObjectTypes> = [];
if (!dumps?.length) {
await createConfigDumpsFromDbFirstStart(privateKeyEd25519);
createdDuringFirstStart = await createConfigDumpsFromDbFirstStart(privateKeyEd25519);
}
// refetch them as the createConfigDumpsFromDb might have created them
dumps = await ConfigDumpData.getAllDumpsWithData();
console.warn(
'dumps',
dumps.map(m => omit(m, 'data'))
);
const userVariantsBuildWithoutErrors = new Set<ConfigWrapperObjectTypes>();
for (let index = 0; index < dumps.length; index++) {
const dump = dumps[index];
console.warn('forl oop init', dump.variant);
try {
await GenericWrapperActions.init(
dump.variant,
privateKeyEd25519,
dump.data.length ? dump.data : null
);
if (!createdDuringFirstStart.includes(dump.variant)) {
await GenericWrapperActions.init(
dump.variant,
privateKeyEd25519,
dump.data.length ? dump.data : null
);
}
userVariantsBuildWithoutErrors.add(dump.variant);
} catch (e) {
window.log.warn(`init of UserConfig failed with ${e.message} `);
@ -82,9 +129,10 @@ async function initializeLibSessionUtilWrappers() {
console.warn('requiredVariants: FIXME add conversation volatile wrapper as required ');
const missingRequiredVariants: Array<ConfigWrapperObjectTypes> = difference(requiredVariants, [
...userVariantsBuildWithoutErrors.values(),
]);
const missingRequiredVariants: Array<ConfigWrapperObjectTypes> = difference(
requiredUserDumpVariants,
[...userVariantsBuildWithoutErrors.values()]
);
for (let index = 0; index < missingRequiredVariants.length; index++) {
const missingVariant = missingRequiredVariants[index];
@ -92,4 +140,87 @@ async function initializeLibSessionUtilWrappers() {
}
}
export const LibSessionUtil = { initializeLibSessionUtilWrappers };
async function pendingChangesForPubkey(pubkey: string): Promise<Array<OutgoingConfResult>> {
const dumps = await ConfigDumpData.getAllDumpsWithoutData();
const us = UserUtils.getOurPubKeyStrFromCache();
// Ensure we always check the required user config types for changes even if there is no dump
// data yet (to deal with first launch cases)
if (pubkey === us) {
LibSessionUtil.requiredUserDumpVariants.forEach(requiredVariant => {
if (!dumps.find(m => m.publicKey === us && m.variant === requiredVariant)) {
dumps.push({ publicKey: us, variant: requiredVariant, combinedMessageHashes: [] });
}
});
}
const results: Array<OutgoingConfResult> = [];
for (let index = 0; index < dumps.length; index++) {
const dump = dumps[index];
const variant = dump.variant;
const needsPush = await GenericWrapperActions.needsPush(variant);
if (!needsPush) {
continue;
}
const { data, seqno } = await GenericWrapperActions.push(variant);
const kind = variantToKind(variant);
const namespace = await GenericWrapperActions.storageNamespace(variant);
results.push({
destination: pubkey,
message: new SharedConfigMessage({
data,
kind,
seqno: Long.fromNumber(seqno),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
}),
oldMessageHashes: dump.combinedMessageHashes,
namespace,
});
}
return results;
}
function kindToVariant(kind: SignalService.SharedConfigMessage.Kind): ConfigWrapperObjectTypes {
switch (kind) {
case SignalService.SharedConfigMessage.Kind.USER_PROFILE:
return 'UserConfig';
case SignalService.SharedConfigMessage.Kind.CONTACTS:
return 'ContactsConfig';
default:
throw new Error(`kindToVariant: Unsupported variant: "${kind}"`);
}
}
function variantToKind(variant: ConfigWrapperObjectTypes): SignalService.SharedConfigMessage.Kind {
switch (variant) {
case 'UserConfig':
return SignalService.SharedConfigMessage.Kind.USER_PROFILE;
case 'ContactsConfig':
return SignalService.SharedConfigMessage.Kind.CONTACTS;
default:
throw new Error(`variantToKind: Unsupported variant: "${variant}"`);
}
}
/**
* Returns true if the config needs to be dumped afterwards
*/
async function markAsPushed(variant: ConfigWrapperObjectTypes, pubkey: string, seqno: number) {
if (pubkey !== UserUtils.getOurPubKeyStrFromCache()) {
//FIXME libsession closed group
throw new Error('FIXME, generic case is to be done');
}
await GenericWrapperActions.confirmPushed(variant, seqno);
return GenericWrapperActions.needsDump(variant);
}
export const LibSessionUtil = {
initializeLibSessionUtilWrappers,
requiredUserDumpVariants,
pendingChangesForPubkey,
kindToVariant,
markAsPushed,
};

View file

@ -35,7 +35,7 @@ describe('MessageSender', () => {
let encryptStub: sinon.SinonStub<[PubKey, Uint8Array, SignalService.Envelope.Type]>;
beforeEach(() => {
sessionMessageAPISendStub = Sinon.stub(MessageSender, 'sendMessageToSnode').resolves();
sessionMessageAPISendStub = Sinon.stub(MessageSender, 'sendMessagesToSnode').resolves();
stubData('getMessageById').resolves();

View file

@ -67,6 +67,10 @@ export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData>
public nonRunningJobsToRemove(_jobs: Array<FakeSleepForMultiJobData>) {
return [];
}
public getJobTimeoutMs(): number {
return 10000;
}
}
export class FakeSleepForJob extends PersistedJob<FakeSleepJobData> {
@ -119,4 +123,8 @@ export class FakeSleepForJob extends PersistedJob<FakeSleepJobData> {
public nonRunningJobsToRemove(_jobs: Array<FakeSleepJobData>) {
return [];
}
public getJobTimeoutMs(): number {
return 10000;
}
}

View file

@ -59,6 +59,10 @@ export type ConfigDumpDataNode = {
getAllDumpsWithData: () => Array<ConfigDumpRow>;
getAllDumpsWithoutData: () => Array<ConfigDumpRowWithoutData>;
getCombinedHashesByVariantAndPubkey: (
variant: ConfigWrapperObjectTypes,
pubkey: string
) => Set<string>;
};
// ========== unprocessed

View file

@ -55,6 +55,7 @@ function assertUserWrapperType(wrapperType: ConfigWrapperObjectTypes): UserWrapp
*/
function initUserWrapper(options: Array<any>, wrapperType: UserWrapperType): BaseConfigWrapper {
const wrapper = getUserWrapper(wrapperType);
console.warn('initUserWrapper: ', wrapperType);
if (wrapper) {
throw new Error(`${wrapperType} already init`);
}
@ -68,7 +69,7 @@ function initUserWrapper(options: Array<any>, wrapperType: UserWrapperType): Bas
}
if (!isNull(dump) && !isUInt8Array(dump)) {
throw new Error('${wrapperType} init needs a valid dump');
throw new Error(`${wrapperType} init needs a valid dump`);
}
const userType = assertUserWrapperType(wrapperType);
switch (userType) {