WIP
This commit is contained in:
parent
a83a2afa4f
commit
c599d0b629
|
@ -77,7 +77,6 @@
|
|||
window.log.info('background page reloaded');
|
||||
window.log.info('environment:', window.getEnvironment());
|
||||
const restartReason = localStorage.getItem('restart-reason');
|
||||
window.log.info('restartReason:', restartReason);
|
||||
|
||||
if (restartReason === 'unlink') {
|
||||
setTimeout(() => {
|
||||
|
|
|
@ -59,17 +59,6 @@
|
|||
}
|
||||
inherit(ReplayableError, EmptySwarmError);
|
||||
|
||||
function InvalidateSwarm(number, message) {
|
||||
// eslint-disable-next-line prefer-destructuring
|
||||
this.number = number.split('.')[0];
|
||||
|
||||
ReplayableError.call(this, {
|
||||
name: 'InvalidateSwarm',
|
||||
message,
|
||||
});
|
||||
}
|
||||
inherit(ReplayableError, InvalidateSwarm);
|
||||
|
||||
function NotFoundError(message, error) {
|
||||
this.name = 'NotFoundError';
|
||||
this.message = message;
|
||||
|
@ -112,19 +101,6 @@
|
|||
}
|
||||
}
|
||||
|
||||
function WrongSwarmError(newSwarm) {
|
||||
this.name = 'WrongSwarmError';
|
||||
this.newSwarm = newSwarm;
|
||||
|
||||
Error.call(this, this.name);
|
||||
|
||||
// Maintains proper stack trace, where our error was thrown (only available on V8)
|
||||
// via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
|
||||
if (Error.captureStackTrace) {
|
||||
Error.captureStackTrace(this);
|
||||
}
|
||||
}
|
||||
|
||||
function TimestampError(message) {
|
||||
this.name = 'TimeStampError';
|
||||
|
||||
|
@ -153,7 +129,6 @@
|
|||
window.textsecure.SeedNodeError = SeedNodeError;
|
||||
window.textsecure.HTTPError = HTTPError;
|
||||
window.textsecure.NotFoundError = NotFoundError;
|
||||
window.textsecure.WrongSwarmError = WrongSwarmError;
|
||||
window.textsecure.TimestampError = TimestampError;
|
||||
window.textsecure.PublicChatError = PublicChatError;
|
||||
})();
|
||||
|
|
|
@ -7,11 +7,9 @@ export interface LibTextsecure {
|
|||
SendMessageNetworkError: any;
|
||||
ReplayableError: any;
|
||||
EmptySwarmError: any;
|
||||
InvalidateSwarm: any;
|
||||
SeedNodeError: any;
|
||||
HTTPError: any;
|
||||
NotFoundError: any;
|
||||
WrongSwarmError: any;
|
||||
TimestampError: any;
|
||||
PublicChatError: any;
|
||||
createTaskWithTimeout(task: any, id: any, options?: any): Promise<any>;
|
||||
|
|
|
@ -399,7 +399,7 @@ window.addEventListener('contextmenu', e => {
|
|||
|
||||
window.NewReceiver = require('./ts/receiver/receiver');
|
||||
window.DataMessageReceiver = require('./ts/receiver/dataMessage');
|
||||
window.NewSnodeAPI = require('./ts/session/snode_api/serviceNodeAPI');
|
||||
window.NewSnodeAPI = require('./ts/session/snode_api/SNodeAPI');
|
||||
window.SnodePool = require('./ts/session/snode_api/snodePool');
|
||||
|
||||
// eslint-disable-next-line no-extend-native,func-names
|
||||
|
|
|
@ -800,7 +800,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
|
|||
return;
|
||||
}
|
||||
if (!this.get('active_at')) {
|
||||
window.log.info('Skipping update last message as active_at is falsy');
|
||||
return;
|
||||
}
|
||||
const messages = await getMessagesByConversation(this.id, {
|
||||
|
|
|
@ -342,7 +342,6 @@ async function handleRegularMessage(
|
|||
message.set({ expirationStartTimestamp: now });
|
||||
}
|
||||
|
||||
conversation.set({ active_at: now });
|
||||
// Expire timer updates are now explicit.
|
||||
// We don't handle an expire timer from a incoming message except if it is an ExpireTimerUpdate message.
|
||||
|
||||
|
@ -470,7 +469,7 @@ export async function handleMessageJob(
|
|||
// call it after we have an id for this message, because the jobs refer back
|
||||
// to their source message.
|
||||
|
||||
await queueAttachmentDownloads(message, conversation);
|
||||
void queueAttachmentDownloads(message, conversation);
|
||||
|
||||
const unreadCount = await conversation.getUnreadCount();
|
||||
conversation.set({ unreadCount });
|
||||
|
|
|
@ -34,9 +34,9 @@ export async function encrypt(
|
|||
const plainText = addMessagePadding(plainTextBuffer);
|
||||
|
||||
if (encryptForClosedGroup) {
|
||||
window?.log?.info(
|
||||
'Encrypting message with SessionProtocol and envelope type is CLOSED_GROUP_CIPHERTEXT'
|
||||
);
|
||||
// window?.log?.info(
|
||||
// 'Encrypting message with SessionProtocol and envelope type is CLOSED_GROUP_CIPHERTEXT'
|
||||
// );
|
||||
const hexEncryptionKeyPair = await getLatestClosedGroupEncryptionKeyPair(device.key);
|
||||
if (!hexEncryptionKeyPair) {
|
||||
window?.log?.warn("Couldn't get key pair for closed group during encryption");
|
||||
|
@ -75,7 +75,7 @@ export async function encryptUsingSessionProtocol(
|
|||
}
|
||||
const sodium = await getSodium();
|
||||
|
||||
window?.log?.info('encryptUsingSessionProtocol for ', recipientHexEncodedX25519PublicKey.key);
|
||||
// window?.log?.info('encryptUsingSessionProtocol for ', recipientHexEncodedX25519PublicKey.key);
|
||||
|
||||
const recipientX25519PublicKey = recipientHexEncodedX25519PublicKey.withoutPrefixToArray();
|
||||
const userED25519PubKeyBytes = fromHexToArray(userED25519KeyPairHex.pubKey);
|
||||
|
|
|
@ -3,7 +3,7 @@ import * as SnodePool from '../snode_api/snodePool';
|
|||
import _ from 'lodash';
|
||||
import { default as insecureNodeFetch } from 'node-fetch';
|
||||
import { UserUtils } from '../utils';
|
||||
import { getPathString, incrementBadSnodeCountOrDrop, snodeHttpsAgent } from '../snode_api/onions';
|
||||
import { incrementBadSnodeCountOrDrop, snodeHttpsAgent } from '../snode_api/onions';
|
||||
import { allowOnlyOneAtATime } from '../utils/Promise';
|
||||
|
||||
const desiredGuardCount = 3;
|
||||
|
@ -53,7 +53,9 @@ export async function dropSnodeFromPath(snodeEd25519: string) {
|
|||
if (pathWithSnodeIndex === -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
window.log.info(
|
||||
`dropping snode (...${snodeEd25519.substr(58)}) from path index: ${pathWithSnodeIndex}`
|
||||
);
|
||||
// make a copy now so we don't alter the real one while doing stuff here
|
||||
const oldPaths = _.cloneDeep(onionPaths);
|
||||
|
||||
|
@ -65,17 +67,14 @@ export async function dropSnodeFromPath(snodeEd25519: string) {
|
|||
if (nodeToRemoveIndex === -1) {
|
||||
return;
|
||||
}
|
||||
console.warn('removing ', snodeEd25519, ' from path ', getPathString(pathtoPatchUp));
|
||||
|
||||
pathtoPatchUp = pathtoPatchUp.filter(snode => snode.pubkey_ed25519 !== snodeEd25519);
|
||||
console.warn('removed:', getPathString(pathtoPatchUp));
|
||||
|
||||
const pubKeyToExclude = _.flatten(oldPaths.map(p => p.map(m => m.pubkey_ed25519)));
|
||||
const ed25519KeysToExclude = _.flatten(oldPaths.map(p => p.map(m => m.pubkey_ed25519)));
|
||||
// this call throws if it cannot return a valid snode.
|
||||
const snodeToAppendToPath = await SnodePool.getRandomSnode(pubKeyToExclude);
|
||||
const snodeToAppendToPath = await SnodePool.getRandomSnode(ed25519KeysToExclude);
|
||||
// Don't test the new snode as this would reveal the user's IP
|
||||
pathtoPatchUp.push(snodeToAppendToPath);
|
||||
console.warn('Updated path:', getPathString(pathtoPatchUp));
|
||||
onionPaths[pathWithSnodeIndex] = pathtoPatchUp;
|
||||
}
|
||||
|
||||
|
@ -95,48 +94,24 @@ export async function getOnionPath(toExclude?: SnodePool.Snode): Promise<Array<S
|
|||
attemptNumber += 1;
|
||||
}
|
||||
|
||||
const paths = _.shuffle(onionPaths);
|
||||
const onionPathsWithoutExcluded = toExclude
|
||||
? onionPaths.filter(
|
||||
path => !_.some(path, node => node.pubkey_ed25519 === toExclude.pubkey_ed25519)
|
||||
)
|
||||
: onionPaths;
|
||||
|
||||
if (!toExclude) {
|
||||
if (!paths[0]) {
|
||||
log.error('LokiSnodeAPI::getOnionPath - no path in', paths);
|
||||
return [];
|
||||
}
|
||||
if (!paths[0]) {
|
||||
log.error('LokiSnodeAPI::getOnionPath - no path in', paths[0]);
|
||||
}
|
||||
|
||||
return paths[0];
|
||||
if (!onionPathsWithoutExcluded) {
|
||||
log.error('LokiSnodeAPI::getOnionPath - no path in', onionPathsWithoutExcluded);
|
||||
return [];
|
||||
}
|
||||
|
||||
// Select a path that doesn't contain `toExclude`
|
||||
const otherPaths = paths.filter(
|
||||
path => !_.some(path, node => node.pubkey_ed25519 === toExclude.pubkey_ed25519)
|
||||
);
|
||||
const randomPath = _.sample(onionPathsWithoutExcluded);
|
||||
|
||||
if (otherPaths.length === 0) {
|
||||
// This should never happen!
|
||||
// well it did happen, should we
|
||||
// await this.buildNewOnionPaths();
|
||||
// and restart call?
|
||||
log.error(
|
||||
'LokiSnodeAPI::getOnionPath - no paths without',
|
||||
toExclude.pubkey_ed25519,
|
||||
'path count',
|
||||
paths.length,
|
||||
'goodPath count',
|
||||
onionPaths.length,
|
||||
'paths',
|
||||
paths
|
||||
);
|
||||
if (!randomPath) {
|
||||
throw new Error('No onion paths available after filtering');
|
||||
}
|
||||
|
||||
if (!otherPaths[0]) {
|
||||
log.error('LokiSnodeAPI::getOnionPath - otherPaths no path in', otherPaths[0]);
|
||||
}
|
||||
|
||||
return otherPaths[0];
|
||||
return randomPath;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -144,24 +119,25 @@ export async function getOnionPath(toExclude?: SnodePool.Snode): Promise<Array<S
|
|||
*/
|
||||
export async function incrementBadPathCountOrDrop(guardNodeEd25519: string) {
|
||||
const pathIndex = onionPaths.findIndex(p => p[0].pubkey_ed25519 === guardNodeEd25519);
|
||||
window.log.info('\t\tincrementBadPathCountOrDrop starting with guard', guardNodeEd25519);
|
||||
|
||||
if (pathIndex === -1) {
|
||||
window.log.info('Did not find path with this guard node');
|
||||
return;
|
||||
}
|
||||
|
||||
const pathFailing = onionPaths[pathIndex];
|
||||
const pathWithIssues = onionPaths[pathIndex];
|
||||
|
||||
console.warn('handling bad path for path index', pathIndex);
|
||||
window.log.info('handling bad path for path index', pathIndex);
|
||||
const oldPathFailureCount = pathFailureCount[guardNodeEd25519] || 0;
|
||||
const newPathFailureCount = oldPathFailureCount + 1;
|
||||
if (newPathFailureCount >= pathFailureThreshold) {
|
||||
// tslint:disable-next-line: prefer-for-of
|
||||
for (let index = 0; index < pathFailing.length; index++) {
|
||||
const snode = pathFailing[index];
|
||||
await incrementBadSnodeCountOrDrop(snode.pubkey_ed25519);
|
||||
}
|
||||
// tslint:disable-next-line: prefer-for-of
|
||||
for (let index = 0; index < pathWithIssues.length; index++) {
|
||||
const snode = pathWithIssues[index];
|
||||
await incrementBadSnodeCountOrDrop(snode.pubkey_ed25519);
|
||||
}
|
||||
|
||||
if (newPathFailureCount >= pathFailureThreshold) {
|
||||
return dropPathStartingWithGuardNode(guardNodeEd25519);
|
||||
}
|
||||
// the path is not yet THAT bad. keep it for now
|
||||
|
@ -174,8 +150,7 @@ export async function incrementBadPathCountOrDrop(guardNodeEd25519: string) {
|
|||
* @param ed25519Key the guard node ed25519 pubkey
|
||||
*/
|
||||
async function dropPathStartingWithGuardNode(ed25519Key: string) {
|
||||
// we are dropping it. Reset the counter in case this same guard gets used later
|
||||
pathFailureCount[ed25519Key] = 0;
|
||||
// we are dropping it. Reset the counter in case this same guard gets choosen later
|
||||
const failingPathIndex = onionPaths.findIndex(p => p[0].pubkey_ed25519 === ed25519Key);
|
||||
if (failingPathIndex === -1) {
|
||||
console.warn('No such path starts with this guard node ');
|
||||
|
@ -185,6 +160,11 @@ async function dropPathStartingWithGuardNode(ed25519Key: string) {
|
|||
|
||||
const edKeys = guardNodes.filter(g => g.pubkey_ed25519 !== ed25519Key).map(n => n.pubkey_ed25519);
|
||||
|
||||
guardNodes = guardNodes.filter(g => g.pubkey_ed25519 !== ed25519Key);
|
||||
pathFailureCount[ed25519Key] = 0;
|
||||
|
||||
// write the updates guard nodes to the db.
|
||||
// the next call to getOnionPath will trigger a rebuild of the path
|
||||
await updateGuardNodes(edKeys);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import {
|
|||
getRandomSnode,
|
||||
getRandomSnodePool,
|
||||
getSwarmFor,
|
||||
markNodeUnreachable,
|
||||
requiredSnodesForAgreement,
|
||||
Snode,
|
||||
updateSwarmFor,
|
||||
|
@ -244,10 +243,6 @@ export async function requestSnodesForPubkey(pubKey: string): Promise<Array<Snod
|
|||
} catch (e) {
|
||||
log.error('LokiSnodeAPI::requestSnodesForPubkey - error', e);
|
||||
|
||||
if (targetNode) {
|
||||
markNodeUnreachable(targetNode);
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
@ -369,105 +364,24 @@ async function getSnodePoolFromSnode(targetNode: Snode): Promise<Array<Snode>> {
|
|||
}
|
||||
}
|
||||
|
||||
function checkResponse(response: SnodeResponse): void {
|
||||
if (response.status === 406) {
|
||||
throw new window.textsecure.TimestampError('Invalid Timestamp (check your clock)');
|
||||
}
|
||||
|
||||
// Wrong/invalid swarm
|
||||
if (response.status === 421) {
|
||||
let json;
|
||||
try {
|
||||
json = JSON.parse(response.body);
|
||||
} catch (e) {
|
||||
// could not parse result. Consider that snode as invalid
|
||||
throw new window.textsecure.InvalidateSwarm();
|
||||
}
|
||||
|
||||
// The snode isn't associated with the given public key anymore
|
||||
window.log.warn('Wrong swarm, now looking at snodes', json.snodes);
|
||||
if (json.snodes?.length) {
|
||||
throw new window.textsecure.WrongSwarmError(json.snodes);
|
||||
}
|
||||
// remove this node from the swarm of this pubkey
|
||||
throw new window.textsecure.InvalidateSwarm();
|
||||
}
|
||||
}
|
||||
|
||||
export async function storeOnNode(targetNode: Snode, params: SendParams): Promise<boolean> {
|
||||
const { log, textsecure } = window;
|
||||
const { log } = window;
|
||||
|
||||
let successiveFailures = 0;
|
||||
try {
|
||||
const result = await snodeRpc('store', params, targetNode, params.pubKey);
|
||||
|
||||
while (successiveFailures < maxAcceptableFailuresStoreOnNode) {
|
||||
// the higher this is, the longer the user delay is
|
||||
// we don't want to burn through all our retries quickly
|
||||
// we need to give the node a chance to heal
|
||||
// also failed the user quickly, just means they pound the retry faster
|
||||
// this favors a lot more retries and lower delays
|
||||
// but that may chew up the bandwidth...
|
||||
await sleepFor(successiveFailures * 500);
|
||||
try {
|
||||
const result = await snodeRpc('store', params, targetNode, params.pubKey);
|
||||
|
||||
// do not return true if we get false here...
|
||||
if (!result) {
|
||||
// this means the node we asked for is likely down
|
||||
log.warn(
|
||||
`loki_message:::store - Try #${successiveFailures}/${maxAcceptableFailuresStoreOnNode} ${targetNode.ip}:${targetNode.port} failed`
|
||||
);
|
||||
successiveFailures += 1;
|
||||
// eslint-disable-next-line no-continue
|
||||
continue;
|
||||
}
|
||||
|
||||
const snodeRes = result;
|
||||
|
||||
checkResponse(snodeRes);
|
||||
|
||||
if (snodeRes.status !== 200) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (e) {
|
||||
log.warn(
|
||||
'loki_message:::store - send error:',
|
||||
e.code,
|
||||
e.message,
|
||||
`destination ${targetNode.ip}:${targetNode.port}`
|
||||
);
|
||||
if (e instanceof textsecure.WrongSwarmError) {
|
||||
const { newSwarm } = e;
|
||||
await updateSwarmFor(params.pubKey, newSwarm);
|
||||
return false;
|
||||
} else if (e instanceof textsecure.NotFoundError) {
|
||||
// TODO: Handle resolution error
|
||||
} else if (e instanceof textsecure.TimestampError) {
|
||||
log.warn('loki_message:::store - Timestamp is invalid');
|
||||
throw e;
|
||||
} else if (e instanceof textsecure.HTTPError) {
|
||||
// TODO: Handle working connection but error response
|
||||
const body = await e.response.text();
|
||||
log.warn('loki_message:::store - HTTPError body:', body);
|
||||
} else if (e instanceof window.textsecure.InvalidateSwarm) {
|
||||
window.log.warn(
|
||||
'Got an `InvalidateSwarm` error, removing this node from this swarm of this pubkey'
|
||||
);
|
||||
const existingSwarm = await getSwarmFor(params.pubKey);
|
||||
const updatedSwarm = existingSwarm.filter(
|
||||
node => node.pubkey_ed25519 !== targetNode.pubkey_ed25519
|
||||
);
|
||||
|
||||
await updateSwarmFor(params.pubKey, updatedSwarm);
|
||||
}
|
||||
successiveFailures += 1;
|
||||
if (!result || result.status !== 200) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (e) {
|
||||
log.warn(
|
||||
'loki_message:::store - send error:',
|
||||
e,
|
||||
`destination ${targetNode.ip}:${targetNode.port}`
|
||||
);
|
||||
}
|
||||
markNodeUnreachable(targetNode);
|
||||
log.error(
|
||||
`loki_message:::store - Too many successive failures trying to send to node ${targetNode.ip}:${targetNode.port}`
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -491,26 +405,6 @@ export async function retrieveNextMessages(
|
|||
return [];
|
||||
}
|
||||
|
||||
// NOTE: we call `checkResponse` to check for "wrong swarm"
|
||||
try {
|
||||
checkResponse(result);
|
||||
} catch (e) {
|
||||
window.log.warn('loki_message:::retrieveNextMessages - send error:', e.code, e.message);
|
||||
if (e instanceof window.textsecure.WrongSwarmError) {
|
||||
const { newSwarm } = e;
|
||||
await updateSwarmFor(params.pubKey, newSwarm);
|
||||
return [];
|
||||
} else if (e instanceof window.textsecure.InvalidateSwarm) {
|
||||
const existingSwarm = await getSwarmFor(params.pubKey);
|
||||
const updatedSwarm = existingSwarm.filter(
|
||||
node => node.pubkey_ed25519 !== targetNode.pubkey_ed25519
|
||||
);
|
||||
|
||||
await updateSwarmFor(params.pubKey, updatedSwarm);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
if (result.status !== 200) {
|
||||
window.log('retrieve result is not 200');
|
||||
return [];
|
||||
|
|
|
@ -10,6 +10,7 @@ import {
|
|||
import ByteBuffer from 'bytebuffer';
|
||||
import { OnionPaths } from '../onions';
|
||||
import { fromBase64ToArrayBuffer, toHex } from '../utils/String';
|
||||
import pRetry from 'p-retry';
|
||||
|
||||
export enum RequestError {
|
||||
BAD_PATH = 'BAD_PATH',
|
||||
|
@ -197,16 +198,15 @@ async function buildOnionGuardNodePayload(
|
|||
// Process a response as it arrives from `fetch`, handling
|
||||
// http errors and attempting to decrypt the body with `sharedKey`
|
||||
// May return false BAD_PATH, indicating that we should try a new path.
|
||||
// tslint:disable-next-line: cyclomatic-complexity
|
||||
async function processOnionResponse(
|
||||
reqIdx: number,
|
||||
response: Response,
|
||||
symmetricKey: ArrayBuffer,
|
||||
debug: boolean,
|
||||
abortSignal?: AbortSignal
|
||||
): Promise<
|
||||
| SnodeResponse
|
||||
| { requestError: RequestError; nodeInFault?: string; statusCode?: number; body?: string }
|
||||
> {
|
||||
abortSignal?: AbortSignal,
|
||||
associatedWith?: string
|
||||
): Promise<SnodeResponse> {
|
||||
let ciphertext = '';
|
||||
|
||||
try {
|
||||
|
@ -217,13 +217,15 @@ async function processOnionResponse(
|
|||
|
||||
if (abortSignal?.aborted) {
|
||||
window.log.warn(`(${reqIdx}) [path] Call aborted`);
|
||||
return { requestError: RequestError.ABORTED };
|
||||
// this will make the pRetry stop
|
||||
throw new pRetry.AbortError('Request got aborted');
|
||||
}
|
||||
console.warn('clocko ut of sync todo');
|
||||
|
||||
if (response.status === 406) {
|
||||
// clock out of sync
|
||||
console.warn('clocko ut of sync todo');
|
||||
// this will make the pRetry stop
|
||||
throw new pRetry.AbortError('You clock is out of sync with the network. Check your clock.');
|
||||
}
|
||||
|
||||
if (response.status === 421) {
|
||||
|
@ -231,35 +233,32 @@ async function processOnionResponse(
|
|||
window.log.info('Invalidating swarm');
|
||||
}
|
||||
|
||||
// detect SNode is deregisted, or SNode is not ready (not in swarm; not done syncing, ...)
|
||||
// this test checks for on error in your path.
|
||||
if (
|
||||
response.status === 502 ||
|
||||
response.status === 503 ||
|
||||
response.status === 504 ||
|
||||
response.status === 404 ||
|
||||
// response.status === 502 ||
|
||||
// response.status === 503 ||
|
||||
// response.status === 504 ||
|
||||
// response.status === 404 ||
|
||||
response.status !== 200 // this is pretty strong. a 400 (Oxen server error) will be handled as a bad path.
|
||||
) {
|
||||
window.log.warn(`(${reqIdx}) [path] Got status: ${response.status}`);
|
||||
//
|
||||
const prefix = 'Next node not found: ';
|
||||
let nodeNotFound;
|
||||
if (ciphertext && ciphertext.startsWith(prefix)) {
|
||||
if (ciphertext?.startsWith(prefix)) {
|
||||
nodeNotFound = ciphertext.substr(prefix.length);
|
||||
console.warn('nodeNotFound', nodeNotFound);
|
||||
}
|
||||
|
||||
return {
|
||||
requestError: RequestError.BAD_PATH,
|
||||
nodeInFault: nodeNotFound,
|
||||
statusCode: response.status,
|
||||
body: ciphertext,
|
||||
};
|
||||
// If we have a specific node in fault we can exclude just this node.
|
||||
// Otherwise we increment the whole path failure count
|
||||
await handleOnionRequestErrors(response.status, nodeNotFound, body || '', associatedWith);
|
||||
}
|
||||
|
||||
if (!ciphertext) {
|
||||
window.log.warn(
|
||||
`(${reqIdx}) [path] lokiRpc::processingOnionResponse - Target node return empty ciphertext`
|
||||
);
|
||||
return { requestError: RequestError.OTHER };
|
||||
throw new Error('Target node return empty ciphertext');
|
||||
}
|
||||
|
||||
let plaintext;
|
||||
|
@ -290,7 +289,7 @@ async function processOnionResponse(
|
|||
toHex(ciphertextBuffer)
|
||||
);
|
||||
}
|
||||
return { requestError: RequestError.OTHER };
|
||||
throw new Error('Ciphertext decode error');
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
|
@ -298,19 +297,23 @@ async function processOnionResponse(
|
|||
}
|
||||
|
||||
try {
|
||||
const jsonRes: SnodeResponse = JSON.parse(plaintext, (key, value) => {
|
||||
const jsonRes = JSON.parse(plaintext, (key, value) => {
|
||||
if (typeof value === 'number' && value > Number.MAX_SAFE_INTEGER) {
|
||||
window.log.warn('Received an out of bounds js number');
|
||||
}
|
||||
return value;
|
||||
});
|
||||
|
||||
return jsonRes;
|
||||
if (jsonRes.status_code) {
|
||||
jsonRes.status = jsonRes.status_code;
|
||||
}
|
||||
|
||||
return jsonRes as SnodeResponse;
|
||||
} catch (e) {
|
||||
window.log.error(
|
||||
`(${reqIdx}) [path] lokiRpc::processingOnionResponse - parse error outer json ${e.code} ${e.message} json: '${plaintext}'`
|
||||
);
|
||||
return { requestError: RequestError.OTHER };
|
||||
throw new Error('Parsing error on outer json');
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -378,7 +381,7 @@ async function handle421InvalidSwarm(snodeEd25519: string, body: string, associa
|
|||
* 421 => swarm changed for this associatedWith publicKey
|
||||
* 500, 502, 503, AND default => bad snode.
|
||||
*/
|
||||
export async function handleOnionRequestErrors(
|
||||
async function handleOnionRequestErrors(
|
||||
statusCode: number,
|
||||
snodeEd25519: string,
|
||||
body: string,
|
||||
|
@ -389,7 +392,6 @@ export async function handleOnionRequestErrors(
|
|||
// FIXME audric
|
||||
console.warn('Clockoutofsync TODO');
|
||||
window.log.warn('The users clock is out of sync with the service node network.');
|
||||
debugger;
|
||||
throw new Error('ClockOutOfSync TODO');
|
||||
// return ClockOutOfSync;
|
||||
case 421:
|
||||
|
@ -414,7 +416,7 @@ export async function incrementBadSnodeCountOrDrop(snodeEd25519: string, associa
|
|||
const newFailureCount = oldFailureCount + 1;
|
||||
snodeFailureCount[snodeEd25519] = newFailureCount;
|
||||
window.log.warn(
|
||||
`Couldn't reach snode at: ${snodeEd25519}; setting failure count to ${newFailureCount}`
|
||||
`Couldn't reach snode at: ${snodeEd25519}; setting his failure count to ${newFailureCount}`
|
||||
);
|
||||
|
||||
if (newFailureCount >= snodeFailureThreshold) {
|
||||
|
@ -434,7 +436,7 @@ export async function incrementBadSnodeCountOrDrop(snodeEd25519: string, associa
|
|||
await OnionPaths.dropSnodeFromPath(snodeEd25519);
|
||||
} catch (e) {
|
||||
console.warn('dropSnodeFromPath, patchingup', e);
|
||||
// if dropSnodeFromPath throws, it means there is an issue patching up the path, increment the whole path issues
|
||||
// if dropSnodeFromPath throws, it means there is an issue patching up the path, increment the whole path issues count
|
||||
await OnionPaths.incrementBadPathCountOrDrop(snodeEd25519);
|
||||
}
|
||||
}
|
||||
|
@ -466,7 +468,7 @@ const sendOnionRequestHandlingSnodeEject = async ({
|
|||
lsrpcIdx?: number;
|
||||
abortSignal?: AbortSignal;
|
||||
associatedWith?: string;
|
||||
}): Promise<SnodeResponse | RequestError> => {
|
||||
}): Promise<SnodeResponse> => {
|
||||
const { response, decodingSymmetricKey } = await sendOnionRequest({
|
||||
reqIdx,
|
||||
nodePath,
|
||||
|
@ -476,34 +478,19 @@ const sendOnionRequestHandlingSnodeEject = async ({
|
|||
lsrpcIdx,
|
||||
abortSignal,
|
||||
});
|
||||
|
||||
// this call will handle the common onion failure logic.
|
||||
// if an error is not retryable a AbortError is triggered, which is handled by pRetry and retries are stopped
|
||||
const processed = await processOnionResponse(
|
||||
reqIdx,
|
||||
response,
|
||||
decodingSymmetricKey,
|
||||
false,
|
||||
abortSignal
|
||||
abortSignal,
|
||||
associatedWith
|
||||
);
|
||||
|
||||
if (isSnodeResponse(processed)) {
|
||||
return processed;
|
||||
} else {
|
||||
// If we get a bad path here, do what we gotta do to invalidate/increment the failure count of the node/path.
|
||||
// This does not retry, it just takes care of ejecting a node if needed. It is to the caller to do the retry
|
||||
const { nodeInFault: nodeInFaultEd25519, requestError, statusCode, body } = processed;
|
||||
if (requestError === RequestError.BAD_PATH) {
|
||||
if (nodeInFaultEd25519) {
|
||||
// we have a specific node in fault. This a `Next node not found :` suffix returned by a snode.
|
||||
// we can exclude just this node
|
||||
await handleOnionRequestErrors(
|
||||
statusCode || 0,
|
||||
nodeInFaultEd25519,
|
||||
body || '',
|
||||
associatedWith
|
||||
);
|
||||
}
|
||||
}
|
||||
return requestError;
|
||||
}
|
||||
return processed;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -674,55 +661,47 @@ export function getPathString(pathObjArr: Array<{ ip: string; port: number }>):
|
|||
return pathObjArr.map(node => `${node.ip}:${node.port}`).join(', ');
|
||||
}
|
||||
|
||||
export async function lokiOnionFetch(
|
||||
async function onionFetchRetryable(
|
||||
targetNode: Snode,
|
||||
requestId: number,
|
||||
body?: string,
|
||||
associatedWith?: string
|
||||
): Promise<SnodeResponse | false> {
|
||||
): Promise<SnodeResponse> {
|
||||
const { log } = window;
|
||||
|
||||
// Get a path excluding `targetNode`:
|
||||
// eslint-disable no-await-in-loop
|
||||
const path = await OnionPaths.getOnionPath(targetNode);
|
||||
const result = await sendOnionRequestSnodeDest(requestId, path, targetNode, body, associatedWith);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the fetch returnes BAD_PATH we retry this call with a new path at most 3 times. If another error happens, we return it. If we have a result we just return it.
|
||||
*/
|
||||
export async function lokiOnionFetch(
|
||||
targetNode: Snode,
|
||||
body?: string,
|
||||
associatedWith?: string
|
||||
): Promise<SnodeResponse | undefined> {
|
||||
// Get a path excluding `targetNode`:
|
||||
const thisIdx = OnionPaths.assignOnionRequestNumber();
|
||||
|
||||
// At this point I only care about BAD_PATH
|
||||
console.warn('lokiOnionFetch with path', path);
|
||||
// FIXME audric to remove, just used to break onion routing
|
||||
path[2].pubkey_ed25519 = '11edd12a6f29011a1beb5b245a06b16548f2796eec4057a6c191700ffa780f5c';
|
||||
console.warn('FIXME audric to remove, just used to break onion routing');
|
||||
try {
|
||||
const retriedResult = await pRetry(
|
||||
async () => {
|
||||
return onionFetchRetryable(targetNode, thisIdx, body, associatedWith);
|
||||
},
|
||||
{
|
||||
retries: 5,
|
||||
factor: 1,
|
||||
minTimeout: 1000,
|
||||
}
|
||||
);
|
||||
|
||||
const result = await sendOnionRequestSnodeDest(thisIdx, path, targetNode, body, associatedWith);
|
||||
|
||||
if (result === RequestError.BAD_PATH) {
|
||||
log.error(
|
||||
`[path] Error on the path: ${getPathString(path)} to ${targetNode.ip}:${targetNode.port}`
|
||||
);
|
||||
// BAD_PATH are now handled in sendOnionRequest directly
|
||||
return false;
|
||||
} else if (result === RequestError.OTHER) {
|
||||
// could mean, fail to parse results
|
||||
// or status code wasn't 200
|
||||
// or can't decrypt
|
||||
// it's not a bad_path, so we don't need to mark the path as bad
|
||||
log.error(
|
||||
`[path] sendOnionRequest gave false for path: ${getPathString(path)} to ${targetNode.ip}:${
|
||||
targetNode.port
|
||||
}`
|
||||
);
|
||||
return false;
|
||||
} else if (result === RequestError.ABORTED) {
|
||||
// could mean, fail to parse results
|
||||
// or status code wasn't 200
|
||||
// or can't decrypt
|
||||
// it's not a bad_path, so we don't need to mark the path as bad
|
||||
log.error(
|
||||
`[path] sendOnionRequest gave aborted for path: ${getPathString(path)} to ${targetNode.ip}:${
|
||||
targetNode.port
|
||||
}`
|
||||
);
|
||||
return false;
|
||||
} else {
|
||||
return result;
|
||||
return retriedResult;
|
||||
} catch (e) {
|
||||
window.log.warn('onionFetchRetryable failed ');
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,23 +106,6 @@ async function tryGetSnodeListFromLokidSeednode(seedNodes: Array<SeedNode>): Pro
|
|||
return [];
|
||||
}
|
||||
|
||||
export function markNodeUnreachable(snode: Snode): void {
|
||||
const { log } = window;
|
||||
debugger;
|
||||
// we should probably get rid of this call
|
||||
_.remove(randomSnodePool, x => x.pubkey_ed25519 === snode.pubkey_ed25519);
|
||||
|
||||
for (const [pubkey, nodes] of swarmCache) {
|
||||
const edkeys = _.filter(nodes, edkey => edkey !== snode.pubkey_ed25519);
|
||||
|
||||
void internalUpdateSwarmFor(pubkey, edkeys);
|
||||
}
|
||||
|
||||
log.warn(
|
||||
`Marking ${snode.ip}:${snode.port} as unreachable, ${randomSnodePool.length} snodes remaining in randomPool`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop a snode from the snode pool. This does not update the swarm containing this snode.
|
||||
* Use `dropSnodeFromSwarmIfNeeded` for that
|
||||
|
@ -143,7 +126,6 @@ export function dropSnodeFromSnodePool(snodeEd25519: string) {
|
|||
export async function getRandomSnode(excludingEd25519Snode?: Array<string>): Promise<Snode> {
|
||||
// resolve random snode
|
||||
if (randomSnodePool.length === 0) {
|
||||
// TODO: ensure that we only call this once at a time
|
||||
// Should not this be saved to the database?
|
||||
await refreshRandomPool();
|
||||
|
||||
|
@ -152,7 +134,6 @@ export async function getRandomSnode(excludingEd25519Snode?: Array<string>): Pro
|
|||
}
|
||||
}
|
||||
// We know the pool can't be empty at this point
|
||||
|
||||
if (!excludingEd25519Snode) {
|
||||
return _.sample(randomSnodePool) as Snode;
|
||||
}
|
||||
|
@ -370,6 +351,7 @@ export async function getSwarmFor(pubkey: string): Promise<Array<Snode>> {
|
|||
const nodes = await getSwarmFromCacheOrDb(pubkey);
|
||||
|
||||
// See how many are actually still reachable
|
||||
// the nodes still reachable are the one still present in the snode pool
|
||||
const goodNodes = randomSnodePool.filter((n: Snode) => nodes.indexOf(n.pubkey_ed25519) !== -1);
|
||||
|
||||
if (goodNodes.length >= minSwarmSnodeCount) {
|
||||
|
|
|
@ -195,9 +195,6 @@ export class SwarmPolling {
|
|||
const groupPromises = this.groupPubkeys.map(async pk => {
|
||||
return this.pollOnceForKey(pk, true);
|
||||
});
|
||||
// if a WrongSwarmError has been triggered, we have to forward it (and in fact we must forward any errors)
|
||||
// but, we also need to make sure the next pollForAllKeys runs no matter if an error is triggered or not
|
||||
// the finally here will be invoked even if the catch is throwing an exception
|
||||
try {
|
||||
await Promise.all(_.concat(directPromises, groupPromises));
|
||||
} catch (e) {
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
// tslint:disable: no-implicit-dependencies max-func-body-length no-unused-expression
|
||||
|
||||
import chai from 'chai';
|
||||
import * as sinon from 'sinon';
|
||||
import _ from 'lodash';
|
||||
import { describe } from 'mocha';
|
||||
|
||||
import { GroupUtils, PromiseUtils, UserUtils } from '../../../../session/utils';
|
||||
import { TestUtils } from '../../../test-utils';
|
||||
import { MessageQueue } from '../../../../session/sending/MessageQueue';
|
||||
import { ContentMessage, OpenGroupMessage } from '../../../../session/messages/outgoing';
|
||||
import { PubKey, RawMessage } from '../../../../session/types';
|
||||
import { MessageSender } from '../../../../session/sending';
|
||||
import { PendingMessageCacheStub } from '../../../test-utils/stubs';
|
||||
import { ClosedGroupMessage } from '../../../../session/messages/outgoing/controlMessage/group/ClosedGroupMessage';
|
||||
|
||||
import chaiAsPromised from 'chai-as-promised';
|
||||
chai.use(chaiAsPromised as any);
|
||||
chai.should();
|
||||
|
||||
const { expect } = chai;
|
||||
|
||||
// tslint:disable-next-line: max-func-body-length
|
||||
describe('Onion', () => {
|
||||
// Initialize new stubbed cache
|
||||
const sandbox = sinon.createSandbox();
|
||||
const ourDevice = TestUtils.generateFakePubKey();
|
||||
const ourNumber = ourDevice.key;
|
||||
|
||||
beforeEach(() => {
|
||||
// Utils Stubs
|
||||
sandbox.stub(UserUtils, 'getOurPubKeyStrFromCache').returns(ourNumber);
|
||||
|
||||
TestUtils.stubWindow('libsignal', {
|
||||
SignalProtocolAddress: sandbox.stub(),
|
||||
} as any);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
TestUtils.restoreStubs();
|
||||
sandbox.restore();
|
||||
});
|
||||
|
||||
describe('processPending', () => {
|
||||
it('will send messages', done => {});
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue