scoring system kind of working

This commit is contained in:
Audric Ackermann 2021-05-17 10:07:27 +10:00
parent c63d5a4581
commit a83a2afa4f
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4
9 changed files with 532 additions and 170 deletions

View File

@ -11,7 +11,7 @@ import {
ConversationTypeEnum,
} from '../../models/conversation';
import { BlockedNumberController } from '../../util';
import { getSwarm } from '../snode_api/snodePool';
import { getSwarmFor } from '../snode_api/snodePool';
import { PubKey } from '../types';
import { actions as conversationActions } from '../../state/ducks/conversations';
import { getV2OpenGroupRoom, removeV2OpenGroupRoom } from '../../data/opengroups';
@ -120,7 +120,7 @@ export class ConversationController {
await Promise.all([
conversation.updateProfileAvatar(),
// NOTE: we request snodes updating the cache, but ignore the result
void getSwarm(id),
void getSwarmFor(id),
]);
}
});

View File

@ -3,19 +3,23 @@ import * as SnodePool from '../snode_api/snodePool';
import _ from 'lodash';
import { default as insecureNodeFetch } from 'node-fetch';
import { UserUtils } from '../utils';
import { snodeHttpsAgent } from '../snode_api/onions';
import { getPathString, incrementBadSnodeCountOrDrop, snodeHttpsAgent } from '../snode_api/onions';
import { allowOnlyOneAtATime } from '../utils/Promise';
const desiredGuardCount = 3;
const minimumGuardCount = 2;
interface SnodePath {
path: Array<SnodePool.Snode>;
bad: boolean;
}
type SnodePath = Array<SnodePool.Snode>;
const onionRequestHops = 3;
let onionPaths: Array<SnodePath> = [];
// hold the failure count of the path starting with the snode ed25519 pubkey
const pathFailureCount: Record<string, number> = {};
// The number of times a path can fail before it's replaced.
const pathFailureThreshold = 3;
// This array is meant to store nodes will full info,
// so using GuardNode would not be correct (there is
// some naming issue here it seems)
@ -29,15 +33,59 @@ export async function buildNewOnionPaths() {
});
}
/**
* Once a snode is causing too much trouble, we remove it from the path it is used in.
* If we can rebuild a new path right away (in sync) we do it, otherwise we throw an error.
*
* The process to rebuild a path is easy:
* 1. remove the snode causing issue in the path where it is used
* 2. get a random snode from the pool excluding all current snodes in use in all paths
* 3. append the random snode to the old path which was failing
* 4. you have rebuilt path
*
* @param snodeEd25519 the snode pubkey to drop
*/
export async function dropSnodeFromPath(snodeEd25519: string) {
const pathWithSnodeIndex = onionPaths.findIndex(path =>
path.some(snode => snode.pubkey_ed25519 === snodeEd25519)
);
if (pathWithSnodeIndex === -1) {
return;
}
// make a copy now so we don't alter the real one while doing stuff here
const oldPaths = _.cloneDeep(onionPaths);
let pathtoPatchUp = oldPaths[pathWithSnodeIndex];
// remove the snode causing issue from this path
const nodeToRemoveIndex = pathtoPatchUp.findIndex(snode => snode.pubkey_ed25519 === snodeEd25519);
// this should not happen, but well...
if (nodeToRemoveIndex === -1) {
return;
}
console.warn('removing ', snodeEd25519, ' from path ', getPathString(pathtoPatchUp));
pathtoPatchUp = pathtoPatchUp.filter(snode => snode.pubkey_ed25519 !== snodeEd25519);
console.warn('removed:', getPathString(pathtoPatchUp));
const pubKeyToExclude = _.flatten(oldPaths.map(p => p.map(m => m.pubkey_ed25519)));
// this call throws if it cannot return a valid snode.
const snodeToAppendToPath = await SnodePool.getRandomSnode(pubKeyToExclude);
// Don't test the new snode as this would reveal the user's IP
pathtoPatchUp.push(snodeToAppendToPath);
console.warn('Updated path:', getPathString(pathtoPatchUp));
onionPaths[pathWithSnodeIndex] = pathtoPatchUp;
}
export async function getOnionPath(toExclude?: SnodePool.Snode): Promise<Array<SnodePool.Snode>> {
const { log } = window;
let goodPaths = onionPaths.filter(x => !x.bad);
let attemptNumber = 0;
while (goodPaths.length < minimumGuardCount) {
while (onionPaths.length < minimumGuardCount) {
log.error(
`Must have at least 2 good onion paths, actual: ${goodPaths.length}, attempt #${attemptNumber} fetching more...`
`Must have at least 2 good onion paths, actual: ${onionPaths.length}, attempt #${attemptNumber} fetching more...`
);
// eslint-disable-next-line no-await-in-loop
await buildNewOnionPaths();
@ -45,26 +93,25 @@ export async function getOnionPath(toExclude?: SnodePool.Snode): Promise<Array<S
// reload goodPaths now
attemptNumber += 1;
goodPaths = onionPaths.filter(x => !x.bad);
}
const paths = _.shuffle(goodPaths);
const paths = _.shuffle(onionPaths);
if (!toExclude) {
if (!paths[0]) {
log.error('LokiSnodeAPI::getOnionPath - no path in', paths);
return [];
}
if (!paths[0].path) {
if (!paths[0]) {
log.error('LokiSnodeAPI::getOnionPath - no path in', paths[0]);
}
return paths[0].path;
return paths[0];
}
// Select a path that doesn't contain `toExclude`
const otherPaths = paths.filter(
path => !_.some(path.path, node => node.pubkey_ed25519 === toExclude.pubkey_ed25519)
path => !_.some(path, node => node.pubkey_ed25519 === toExclude.pubkey_ed25519)
);
if (otherPaths.length === 0) {
@ -78,31 +125,67 @@ export async function getOnionPath(toExclude?: SnodePool.Snode): Promise<Array<S
'path count',
paths.length,
'goodPath count',
goodPaths.length,
onionPaths.length,
'paths',
paths
);
throw new Error('No onion paths available after filtering');
}
if (!otherPaths[0].path) {
if (!otherPaths[0]) {
log.error('LokiSnodeAPI::getOnionPath - otherPaths no path in', otherPaths[0]);
}
return otherPaths[0].path;
return otherPaths[0];
}
export function markPathAsBad(path: Array<SnodePool.Snode>) {
// TODO: we might want to remove the nodes from the
// node pool (but we don't know which node on the path
// is causing issues)
/**
* If we don't know which nodes is causing trouble, increment the issue with this full path.
*/
export async function incrementBadPathCountOrDrop(guardNodeEd25519: string) {
const pathIndex = onionPaths.findIndex(p => p[0].pubkey_ed25519 === guardNodeEd25519);
onionPaths.forEach(p => {
if (_.isEqual(p.path, path)) {
// eslint-disable-next-line no-param-reassign
p.bad = true;
if (pathIndex === -1) {
window.log.info('Did not find path with this guard node');
return;
}
const pathFailing = onionPaths[pathIndex];
console.warn('handling bad path for path index', pathIndex);
const oldPathFailureCount = pathFailureCount[guardNodeEd25519] || 0;
const newPathFailureCount = oldPathFailureCount + 1;
if (newPathFailureCount >= pathFailureThreshold) {
// tslint:disable-next-line: prefer-for-of
for (let index = 0; index < pathFailing.length; index++) {
const snode = pathFailing[index];
await incrementBadSnodeCountOrDrop(snode.pubkey_ed25519);
}
});
return dropPathStartingWithGuardNode(guardNodeEd25519);
}
// the path is not yet THAT bad. keep it for now
pathFailureCount[guardNodeEd25519] = newPathFailureCount;
}
/**
* This function is used to drop a path and its corresponding guard node.
* It writes to the db the updated list of guardNodes.
* @param ed25519Key the guard node ed25519 pubkey
*/
async function dropPathStartingWithGuardNode(ed25519Key: string) {
// we are dropping it. Reset the counter in case this same guard gets used later
pathFailureCount[ed25519Key] = 0;
const failingPathIndex = onionPaths.findIndex(p => p[0].pubkey_ed25519 === ed25519Key);
if (failingPathIndex === -1) {
console.warn('No such path starts with this guard node ');
return;
}
onionPaths = onionPaths.filter(p => p[0].pubkey_ed25519 !== ed25519Key);
const edKeys = guardNodes.filter(g => g.pubkey_ed25519 !== ed25519Key).map(n => n.pubkey_ed25519);
await updateGuardNodes(edKeys);
}
export function assignOnionRequestNumber() {
@ -247,7 +330,7 @@ async function buildNewOnionPathsWorker() {
}
// TODO: select one guard node and 2 other nodes randomly
let otherNodes = _.difference(allNodes, guardNodes);
let otherNodes = _.differenceBy(allNodes, guardNodes, 'pubkey_ed25519');
if (otherNodes.length < 2) {
log.warn(
@ -280,7 +363,7 @@ async function buildNewOnionPathsWorker() {
for (let j = 0; j < nodesNeededPerPaths; j += 1) {
path.push(otherNodes[i * nodesNeededPerPaths + j]);
}
onionPaths.push({ path, bad: false });
onionPaths.push(path);
}
log.info(`Built ${onionPaths.length} onion paths`);

View File

@ -45,10 +45,7 @@ const handleSendViaOnionRetry = async (
fetchOptions: OnionFetchOptions,
abortSignal?: AbortSignal
) => {
window.log.error(
'sendOnionRequestLsrpcDest() returned a number indicating an error: ',
result === RequestError.BAD_PATH ? 'BAD_PATH' : 'OTHER'
);
window.log.error('sendOnionRequestLsrpcDest() returned a RequestError: ', result);
if (options.retry && options.retry >= MAX_SEND_ONION_RETRIES) {
window.log.error(`sendViaOnion too many retries: ${options.retry}. Stopping retries.`);
@ -192,6 +189,7 @@ export const sendViaOnion = async (
// RequestError return type is seen as number (as it is an enum)
if (typeof result === 'string') {
console.warn('above string type is not correct');
if (result === RequestError.ABORTED) {
window.log.info('sendViaOnion aborted. not retrying');
return null;

View File

@ -1,6 +1,6 @@
import _ from 'lodash';
import { SendParams, storeOnNode } from '../snode_api/SNodeAPI';
import { getSwarm, Snode } from '../snode_api/snodePool';
import { getSwarmFor, Snode } from '../snode_api/snodePool';
import { firstTrue } from '../utils/Promise';
const DEFAULT_CONNECTIONS = 3;
@ -45,7 +45,7 @@ export async function sendMessage(
const data64 = window.dcodeIO.ByteBuffer.wrap(data).toString('base64');
// Using timestamp as a unique identifier
const swarm = await getSwarm(pubKey);
const swarm = await getSwarmFor(pubKey);
// send parameters
const params = {

View File

@ -11,14 +11,14 @@ import Electron from 'electron';
const { remote } = Electron;
import { snodeRpc } from './lokiRpc';
import { sendOnionRequestLsrpcDest, snodeHttpsAgent, SnodeResponse } from './onions';
import { sendOnionRequestLsrpcDest, SnodeResponse } from './onions';
export { sendOnionRequestLsrpcDest };
import {
getRandomSnodeAddress,
getRandomSnode,
getRandomSnodePool,
getSwarm,
getSwarmFor,
markNodeUnreachable,
requiredSnodesForAgreement,
Snode,
@ -200,13 +200,14 @@ export async function requestSnodesForPubkey(pubKey: string): Promise<Array<Snod
let targetNode;
try {
targetNode = await getRandomSnodeAddress();
targetNode = await getRandomSnode();
const result = await snodeRpc(
'get_snodes_for_pubkey',
{
pubKey,
},
targetNode
targetNode,
pubKey
);
if (!result) {
@ -241,7 +242,7 @@ export async function requestSnodesForPubkey(pubKey: string): Promise<Array<Snod
return [];
}
} catch (e) {
log.error('LokiSnodeAPI::requestSnodesForPubkey - error', e.code, e.message);
log.error('LokiSnodeAPI::requestSnodesForPubkey - error', e);
if (targetNode) {
markNodeUnreachable(targetNode);
@ -407,13 +408,13 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
// but that may chew up the bandwidth...
await sleepFor(successiveFailures * 500);
try {
const result = await snodeRpc('store', params, targetNode);
const result = await snodeRpc('store', params, targetNode, params.pubKey);
// do not return true if we get false here...
if (!result) {
// this means the node we asked for is likely down
log.warn(
`loki_message:::storeOnNode - Try #${successiveFailures}/${maxAcceptableFailuresStoreOnNode} ${targetNode.ip}:${targetNode.port} failed`
`loki_message:::store - Try #${successiveFailures}/${maxAcceptableFailuresStoreOnNode} ${targetNode.ip}:${targetNode.port} failed`
);
successiveFailures += 1;
// eslint-disable-next-line no-continue
@ -431,7 +432,7 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
return true;
} catch (e) {
log.warn(
'loki_message:::storeOnNode - send error:',
'loki_message:::store - send error:',
e.code,
e.message,
`destination ${targetNode.ip}:${targetNode.port}`
@ -443,17 +444,17 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
} else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error
} else if (e instanceof textsecure.TimestampError) {
log.warn('loki_message:::storeOnNode - Timestamp is invalid');
log.warn('loki_message:::store - Timestamp is invalid');
throw e;
} else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response
const body = await e.response.text();
log.warn('loki_message:::storeOnNode - HTTPError body:', body);
log.warn('loki_message:::store - HTTPError body:', body);
} else if (e instanceof window.textsecure.InvalidateSwarm) {
window.log.warn(
'Got an `InvalidateSwarm` error, removing this node from this swarm of this pubkey'
);
const existingSwarm = await getSwarm(params.pubKey);
const existingSwarm = await getSwarmFor(params.pubKey);
const updatedSwarm = existingSwarm.filter(
node => node.pubkey_ed25519 !== targetNode.pubkey_ed25519
);
@ -465,7 +466,7 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
}
markNodeUnreachable(targetNode);
log.error(
`loki_message:::storeOnNode - Too many successive failures trying to send to node ${targetNode.ip}:${targetNode.port}`
`loki_message:::store - Too many successive failures trying to send to node ${targetNode.ip}:${targetNode.port}`
);
return false;
}
@ -481,7 +482,7 @@ export async function retrieveNextMessages(
};
// let exceptions bubble up
const result = await snodeRpc('retrieve', params, targetNode);
const result = await snodeRpc('retrieve', params, targetNode, pubkey);
if (!result) {
window.log.warn(
@ -500,7 +501,7 @@ export async function retrieveNextMessages(
await updateSwarmFor(params.pubKey, newSwarm);
return [];
} else if (e instanceof window.textsecure.InvalidateSwarm) {
const existingSwarm = await getSwarm(params.pubKey);
const existingSwarm = await getSwarmFor(params.pubKey);
const updatedSwarm = existingSwarm.filter(
node => node.pubkey_ed25519 !== targetNode.pubkey_ed25519
);

View File

@ -15,7 +15,8 @@ interface FetchOptions {
async function lokiFetch(
url: string,
options: FetchOptions,
targetNode?: Snode
targetNode?: Snode,
associatedWith?: string
): Promise<undefined | SnodeResponse> {
const timeout = 10000;
const method = options.method || 'GET';
@ -30,7 +31,7 @@ async function lokiFetch(
// Absence of targetNode indicates that we want a direct connection
// (e.g. to connect to a seed node for the first time)
if (window.lokiFeatureFlags.useOnionRequests && targetNode) {
const fetchResult = await lokiOnionFetch(targetNode, fetchOptions.body);
const fetchResult = await lokiOnionFetch(targetNode, fetchOptions.body, associatedWith);
if (!fetchResult) {
return undefined;
}
@ -67,7 +68,8 @@ async function lokiFetch(
export async function snodeRpc(
method: string,
params: any,
targetNode: Snode
targetNode: Snode,
associatedWith?: string //the user pubkey this call is for. if the onion request fails, this is used to handle the error for this user swarm for isntance
): Promise<undefined | SnodeResponse> {
const url = `https://${targetNode.ip}:${targetNode.port}/storage_rpc/v1`;
@ -95,5 +97,5 @@ export async function snodeRpc(
},
};
return lokiFetch(url, fetchOptions, targetNode);
return lokiFetch(url, fetchOptions, targetNode, associatedWith);
}

View File

@ -1,7 +1,12 @@
import { default as insecureNodeFetch, Response } from 'node-fetch';
import https from 'https';
import { Snode } from './snodePool';
import {
dropSnodeFromSnodePool,
dropSnodeFromSwarmIfNeeded,
Snode,
updateSwarmFor,
} from './snodePool';
import ByteBuffer from 'bytebuffer';
import { OnionPaths } from '../onions';
import { fromBase64ToArrayBuffer, toHex } from '../utils/String';
@ -12,6 +17,11 @@ export enum RequestError {
ABORTED = 'ABORTED',
}
// hold the ed25519 key of a snode against the time it fails. Used to remove a snode only after a few failures (snodeFailureThreshold failures)
const snodeFailureCount: Record<string, number> = {};
// The number of times a snode can fail before it's replaced.
const snodeFailureThreshold = 3;
/**
* When sending a request over onion, we might get two status.
* The first one, on the request itself, the other one in the json returned.
@ -187,15 +197,16 @@ async function buildOnionGuardNodePayload(
// Process a response as it arrives from `fetch`, handling
// http errors and attempting to decrypt the body with `sharedKey`
// May return false BAD_PATH, indicating that we should try a new path.
const processOnionResponse = async (
async function processOnionResponse(
reqIdx: number,
response: Response,
symmetricKey: ArrayBuffer,
debug: boolean,
abortSignal?: AbortSignal
): Promise<SnodeResponse | RequestError> => {
const { log, libloki } = window;
): Promise<
| SnodeResponse
| { requestError: RequestError; nodeInFault?: string; statusCode?: number; body?: string }
> {
let ciphertext = '';
try {
@ -205,8 +216,19 @@ const processOnionResponse = async (
}
if (abortSignal?.aborted) {
log.warn(`(${reqIdx}) [path] Call aborted`);
return RequestError.ABORTED;
window.log.warn(`(${reqIdx}) [path] Call aborted`);
return { requestError: RequestError.ABORTED };
}
console.warn('clocko ut of sync todo');
if (response.status === 406) {
// clock out of sync
console.warn('clocko ut of sync todo');
}
if (response.status === 421) {
// clock out of sync
window.log.info('Invalidating swarm');
}
// detect SNode is deregisted, or SNode is not ready (not in swarm; not done syncing, ...)
@ -215,18 +237,29 @@ const processOnionResponse = async (
response.status === 503 ||
response.status === 504 ||
response.status === 404 ||
response.status !== 200
response.status !== 200 // this is pretty strong. a 400 (Oxen server error) will be handled as a bad path.
) {
log.warn(`(${reqIdx}) [path] Got status: ${response.status}`);
window.log.warn(`(${reqIdx}) [path] Got status: ${response.status}`);
const prefix = 'Next node not found: ';
let nodeNotFound;
if (ciphertext && ciphertext.startsWith(prefix)) {
nodeNotFound = ciphertext.substr(prefix.length);
console.warn('nodeNotFound', nodeNotFound);
}
return RequestError.BAD_PATH;
return {
requestError: RequestError.BAD_PATH,
nodeInFault: nodeNotFound,
statusCode: response.status,
body: ciphertext,
};
}
if (!ciphertext) {
log.warn(
window.log.warn(
`(${reqIdx}) [path] lokiRpc::processingOnionResponse - Target node return empty ciphertext`
);
return RequestError.OTHER;
return { requestError: RequestError.OTHER };
}
let plaintext;
@ -240,25 +273,28 @@ const processOnionResponse = async (
}
try {
ciphertextBuffer = fromBase64ToArrayBuffer(ciphertext);
const plaintextBuffer = await libloki.crypto.DecryptAESGCM(symmetricKey, ciphertextBuffer);
const plaintextBuffer = await window.libloki.crypto.DecryptAESGCM(
symmetricKey,
ciphertextBuffer
);
plaintext = new TextDecoder().decode(plaintextBuffer);
} catch (e) {
log.error(`(${reqIdx}) [path] lokiRpc::processingOnionResponse - decode error`, e);
log.error(
window.log.error(`(${reqIdx}) [path] lokiRpc::processingOnionResponse - decode error`, e);
window.log.error(
`(${reqIdx}) [path] lokiRpc::processingOnionResponse - symmetricKey`,
toHex(symmetricKey)
);
if (ciphertextBuffer) {
log.error(
window.log.error(
`(${reqIdx}) [path] lokiRpc::processingOnionResponse - ciphertextBuffer`,
toHex(ciphertextBuffer)
);
}
return RequestError.OTHER;
return { requestError: RequestError.OTHER };
}
if (debug) {
log.debug('lokiRpc::processingOnionResponse - plaintext', plaintext);
window.log.debug('lokiRpc::processingOnionResponse - plaintext', plaintext);
}
try {
@ -271,12 +307,12 @@ const processOnionResponse = async (
return jsonRes;
} catch (e) {
log.error(
window.log.error(
`(${reqIdx}) [path] lokiRpc::processingOnionResponse - parse error outer json ${e.code} ${e.message} json: '${plaintext}'`
);
return RequestError.OTHER;
return { requestError: RequestError.OTHER };
}
};
}
export const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false,
@ -300,6 +336,176 @@ export type FinalDestinationOptions = {
body?: string;
};
function isSnodeResponse(arg: any): arg is SnodeResponse {
return arg.status !== undefined;
}
/**
* Handle a 421. The body is supposed to be the new swarm nodes for this publickey.
* @param snodeEd25519 the snode gaving the reply
* @param body the new swarm not parsed. If an error happens while parsing this we will drop the snode.
* @param associatedWith the specific publickey associated with this call
*/
async function handle421InvalidSwarm(snodeEd25519: string, body: string, associatedWith?: string) {
// The snode isn't associated with the given public key anymore
// this does not make much sense to have a 421 without a publicKey set.
if (associatedWith) {
try {
const json = JSON.parse(body);
// The snode isn't associated with the given public key anymore
if (json.snodes?.length) {
// the snode gave us the new swarm. Save it for the next retry
window.log.warn('Wrong swarm, now looking at snodes', json.snodes);
return updateSwarmFor(associatedWith, json.snodes);
}
// remove this node from the swarm of this pubkey
return dropSnodeFromSwarmIfNeeded(associatedWith, snodeEd25519);
} catch (e) {
window.log.warn(
'Got error while parsing 421 result. Dropping this snode from the swarm of this pubkey',
e
);
// could not parse result. Consider that this snode as invalid
return dropSnodeFromSwarmIfNeeded(associatedWith, snodeEd25519);
}
}
window.log.warn('Got a 421 without an associatedWith publickey');
}
/**
* 406 => clock out of sync
* 421 => swarm changed for this associatedWith publicKey
* 500, 502, 503, AND default => bad snode.
*/
export async function handleOnionRequestErrors(
statusCode: number,
snodeEd25519: string,
body: string,
associatedWith?: string
) {
switch (statusCode) {
case 406:
// FIXME audric
console.warn('Clockoutofsync TODO');
window.log.warn('The users clock is out of sync with the service node network.');
debugger;
throw new Error('ClockOutOfSync TODO');
// return ClockOutOfSync;
case 421:
return handle421InvalidSwarm(snodeEd25519, body, associatedWith);
default:
return incrementBadSnodeCountOrDrop(snodeEd25519, associatedWith);
}
}
/**
* Handle a bad snode result.
* The `snodeFailureCount` for that node is incremented. If it's more than `snodeFailureThreshold`,
* we drop this node from the snode pool and from the associatedWith publicKey swarm if this is set.
*
* So after this call, if the snode keeps getting errors, we won't contact it again
*
* @param snodeEd25519 the snode ed25519 which cause issues
* @param associatedWith if set, we will drop this snode from the swarm of the pubkey too
*/
export async function incrementBadSnodeCountOrDrop(snodeEd25519: string, associatedWith?: string) {
const oldFailureCount = snodeFailureCount[snodeEd25519] || 0;
const newFailureCount = oldFailureCount + 1;
snodeFailureCount[snodeEd25519] = newFailureCount;
window.log.warn(
`Couldn't reach snode at: ${snodeEd25519}; setting failure count to ${newFailureCount}`
);
if (newFailureCount >= snodeFailureThreshold) {
window.log.warn(`Failure threshold reached for: ${snodeEd25519}; dropping it.`);
if (associatedWith) {
console.warn(`Dropping ${snodeEd25519} from swarm of ${associatedWith}`);
await dropSnodeFromSwarmIfNeeded(associatedWith, snodeEd25519);
}
console.warn(`Dropping ${snodeEd25519} from snodepool`);
dropSnodeFromSnodePool(snodeEd25519);
// the snode was ejected from the pool so it won't be used again.
// in case of snode pool refresh, we need to be able to try to contact this node again so reset its failure count to 0.
snodeFailureCount[snodeEd25519] = 0;
try {
await OnionPaths.dropSnodeFromPath(snodeEd25519);
} catch (e) {
console.warn('dropSnodeFromPath, patchingup', e);
// if dropSnodeFromPath throws, it means there is an issue patching up the path, increment the whole path issues
await OnionPaths.incrementBadPathCountOrDrop(snodeEd25519);
}
}
}
/**
* This call tries to send the request via onion. If we get a bad path, it handles the snode removing of the swarm and snode pool.
* But the caller needs to handle the retry (and rebuild the path on his side if needed)
*/
const sendOnionRequestHandlingSnodeEject = async ({
reqIdx,
destX25519Any,
finalDestOptions,
nodePath,
abortSignal,
associatedWith,
finalRelayOptions,
lsrpcIdx,
}: {
reqIdx: number;
nodePath: Array<Snode>;
destX25519Any: string;
finalDestOptions: {
destination_ed25519_hex?: string;
headers?: Record<string, string>;
body?: string;
};
finalRelayOptions?: FinalRelayOptions;
lsrpcIdx?: number;
abortSignal?: AbortSignal;
associatedWith?: string;
}): Promise<SnodeResponse | RequestError> => {
const { response, decodingSymmetricKey } = await sendOnionRequest({
reqIdx,
nodePath,
destX25519Any,
finalDestOptions,
finalRelayOptions,
lsrpcIdx,
abortSignal,
});
const processed = await processOnionResponse(
reqIdx,
response,
decodingSymmetricKey,
false,
abortSignal
);
if (isSnodeResponse(processed)) {
return processed;
} else {
// If we get a bad path here, do what we gotta do to invalidate/increment the failure count of the node/path.
// This does not retry, it just takes care of ejecting a node if needed. It is to the caller to do the retry
const { nodeInFault: nodeInFaultEd25519, requestError, statusCode, body } = processed;
if (requestError === RequestError.BAD_PATH) {
if (nodeInFaultEd25519) {
// we have a specific node in fault. This a `Next node not found :` suffix returned by a snode.
// we can exclude just this node
await handleOnionRequestErrors(
statusCode || 0,
nodeInFaultEd25519,
body || '',
associatedWith
);
}
}
return requestError;
}
};
/**
*
* Onion request looks like this
@ -311,19 +517,27 @@ export type FinalDestinationOptions = {
* @param finalDestOptions those are the options for the request from 3 to R. It contains for instance the payload and headers.
* @param finalRelayOptions those are the options 3 will use to make a request to R. It contains for instance the host to make the request to
*/
const sendOnionRequest = async (
reqIdx: number,
nodePath: Array<Snode>,
destX25519Any: string,
const sendOnionRequest = async ({
reqIdx,
nodePath,
destX25519Any,
finalDestOptions,
finalRelayOptions,
lsrpcIdx,
abortSignal,
}: {
reqIdx: number;
nodePath: Array<Snode>;
destX25519Any: string;
finalDestOptions: {
destination_ed25519_hex?: string;
headers?: Record<string, string>;
body?: string;
},
finalRelayOptions?: FinalRelayOptions,
lsrpcIdx?: number,
abortSignal?: AbortSignal
): Promise<SnodeResponse | RequestError> => {
};
finalRelayOptions?: FinalRelayOptions;
lsrpcIdx?: number;
abortSignal?: AbortSignal;
}) => {
const { log } = window;
let id = '';
@ -410,29 +624,32 @@ const sendOnionRequest = async (
// window.log.info('insecureNodeFetch => plaintext for sendOnionRequest');
const response = await insecureNodeFetch(guardUrl, guardFetchOptions);
return processOnionResponse(reqIdx, response, destCtx.symmetricKey, false, abortSignal);
return { response, decodingSymmetricKey: destCtx.symmetricKey };
};
async function sendOnionRequestSnodeDest(
reqIdx: any,
onionPath: Array<Snode>,
targetNode: Snode,
plaintext?: string
plaintext?: string,
associatedWith?: string
) {
return sendOnionRequest(
return sendOnionRequestHandlingSnodeEject({
reqIdx,
onionPath,
targetNode.pubkey_x25519,
{
nodePath: onionPath,
destX25519Any: targetNode.pubkey_x25519,
finalDestOptions: {
destination_ed25519_hex: targetNode.pubkey_ed25519,
body: plaintext,
},
undefined,
undefined
);
associatedWith,
});
}
// need relay node's pubkey_x25519_hex
/**
* This call tries to send the request via onion. If we get a bad path, it handles the snode removing of the swarm and snode pool.
* But the caller needs to handle the retry (and rebuild the path on his side if needed)
*/
export async function sendOnionRequestLsrpcDest(
reqIdx: number,
onionPath: Array<Snode>,
@ -442,70 +659,70 @@ export async function sendOnionRequestLsrpcDest(
lsrpcIdx: number,
abortSignal?: AbortSignal
): Promise<SnodeResponse | RequestError> {
return sendOnionRequest(
return sendOnionRequestHandlingSnodeEject({
reqIdx,
onionPath,
nodePath: onionPath,
destX25519Any,
payloadObj,
finalDestOptions: payloadObj,
finalRelayOptions,
lsrpcIdx,
abortSignal
);
abortSignal,
});
}
function getPathString(pathObjArr: Array<any>): string {
export function getPathString(pathObjArr: Array<{ ip: string; port: number }>): string {
return pathObjArr.map(node => `${node.ip}:${node.port}`).join(', ');
}
export async function lokiOnionFetch(
targetNode: Snode,
body?: string
body?: string,
associatedWith?: string
): Promise<SnodeResponse | false> {
const { log } = window;
// Loop until the result is not BAD_PATH
// tslint:disable no-constant-condition
while (true) {
// Get a path excluding `targetNode`:
// eslint-disable no-await-in-loop
const path = await OnionPaths.getOnionPath(targetNode);
const thisIdx = OnionPaths.assignOnionRequestNumber();
// Get a path excluding `targetNode`:
// eslint-disable no-await-in-loop
const path = await OnionPaths.getOnionPath(targetNode);
const thisIdx = OnionPaths.assignOnionRequestNumber();
// At this point I only care about BAD_PATH
console.warn('lokiOnionFetch with path', path);
path[2].pubkey_ed25519 = '11edd12a6f29011a1beb5b245a06b16548f2796eec4057a6c191700ffa780f5c';
const result = await sendOnionRequestSnodeDest(thisIdx, path, targetNode, body);
// At this point I only care about BAD_PATH
console.warn('lokiOnionFetch with path', path);
// FIXME audric to remove, just used to break onion routing
path[2].pubkey_ed25519 = '11edd12a6f29011a1beb5b245a06b16548f2796eec4057a6c191700ffa780f5c';
console.warn('FIXME audric to remove, just used to break onion routing');
if (result === RequestError.BAD_PATH) {
log.error(
`[path] Error on the path: ${getPathString(path)} to ${targetNode.ip}:${targetNode.port}`
);
OnionPaths.markPathAsBad(path);
return false;
} else if (result === RequestError.OTHER) {
// could mean, fail to parse results
// or status code wasn't 200
// or can't decrypt
// it's not a bad_path, so we don't need to mark the path as bad
log.error(
`[path] sendOnionRequest gave false for path: ${getPathString(path)} to ${targetNode.ip}:${
targetNode.port
}`
);
return false;
} else if (result === RequestError.ABORTED) {
// could mean, fail to parse results
// or status code wasn't 200
// or can't decrypt
// it's not a bad_path, so we don't need to mark the path as bad
log.error(
`[path] sendOnionRequest gave aborted for path: ${getPathString(path)} to ${
targetNode.ip
}:${targetNode.port}`
);
return false;
} else {
return result;
}
const result = await sendOnionRequestSnodeDest(thisIdx, path, targetNode, body, associatedWith);
if (result === RequestError.BAD_PATH) {
log.error(
`[path] Error on the path: ${getPathString(path)} to ${targetNode.ip}:${targetNode.port}`
);
// BAD_PATH are now handled in sendOnionRequest directly
return false;
} else if (result === RequestError.OTHER) {
// could mean, fail to parse results
// or status code wasn't 200
// or can't decrypt
// it's not a bad_path, so we don't need to mark the path as bad
log.error(
`[path] sendOnionRequest gave false for path: ${getPathString(path)} to ${targetNode.ip}:${
targetNode.port
}`
);
return false;
} else if (result === RequestError.ABORTED) {
// could mean, fail to parse results
// or status code wasn't 200
// or can't decrypt
// it's not a bad_path, so we don't need to mark the path as bad
log.error(
`[path] sendOnionRequest gave aborted for path: ${getPathString(path)} to ${targetNode.ip}:${
targetNode.port
}`
);
return false;
} else {
return result;
}
}

View File

@ -39,7 +39,7 @@ export interface Snode {
let randomSnodePool: Array<Snode> = [];
// We only store nodes' identifiers here,
const nodesForPubkey: Map<string, Array<string>> = new Map();
const swarmCache: Map<string, Array<string>> = new Map();
export type SeedNode = {
url: string;
@ -108,9 +108,11 @@ async function tryGetSnodeListFromLokidSeednode(seedNodes: Array<SeedNode>): Pro
export function markNodeUnreachable(snode: Snode): void {
const { log } = window;
debugger;
// we should probably get rid of this call
_.remove(randomSnodePool, x => x.pubkey_ed25519 === snode.pubkey_ed25519);
for (const [pubkey, nodes] of nodesForPubkey) {
for (const [pubkey, nodes] of swarmCache) {
const edkeys = _.filter(nodes, edkey => edkey !== snode.pubkey_ed25519);
void internalUpdateSwarmFor(pubkey, edkeys);
@ -121,7 +123,24 @@ export function markNodeUnreachable(snode: Snode): void {
);
}
export async function getRandomSnodeAddress(): Promise<Snode> {
/**
* Drop a snode from the snode pool. This does not update the swarm containing this snode.
* Use `dropSnodeFromSwarmIfNeeded` for that
* @param snodeEd25519 the snode ed25519 to drop from the snode pool
*/
export function dropSnodeFromSnodePool(snodeEd25519: string) {
_.remove(randomSnodePool, x => x.pubkey_ed25519 === snodeEd25519);
window.log.warn(
`Marking ${snodeEd25519} as unreachable, ${randomSnodePool.length} snodes remaining in randomPool`
);
}
/**
*
* @param excluding can be used to exclude some nodes from the random list. Useful to rebuild a path excluding existing node already in a path
*/
export async function getRandomSnode(excludingEd25519Snode?: Array<string>): Promise<Snode> {
// resolve random snode
if (randomSnodePool.length === 0) {
// TODO: ensure that we only call this once at a time
@ -132,9 +151,23 @@ export async function getRandomSnodeAddress(): Promise<Snode> {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
}
// We know the pool can't be empty at this point
return _.sample(randomSnodePool) as Snode;
if (!excludingEd25519Snode) {
return _.sample(randomSnodePool) as Snode;
}
// we have to double check even after removing the nodes to exclude we still have some nodes in the list
const snodePoolExcluding = randomSnodePool.filter(
e => !excludingEd25519Snode.includes(e.pubkey_ed25519)
);
if (!snodePoolExcluding) {
throw new window.textsecure.SeedNodeError(
'Not enough snodes with excluding length',
excludingEd25519Snode.length
);
}
return _.sample(snodePoolExcluding) as Snode;
}
/**
@ -283,6 +316,26 @@ export async function refreshRandomPool(): Promise<void> {
});
}
/**
* Drop a snode from the list of swarm for that specific publicKey
* @param pubkey the associatedWith publicKey
* @param snodeToDropEd25519 the snode pubkey to drop
*/
export async function dropSnodeFromSwarmIfNeeded(
pubkey: string,
snodeToDropEd25519: string
): Promise<void> {
// this call either used the cache or fetch the swarm from the db
const existingSwarm = await getSwarmFromCacheOrDb(pubkey);
if (!existingSwarm.includes(snodeToDropEd25519)) {
return;
}
const updatedSwarm = existingSwarm.filter(ed25519 => ed25519 !== snodeToDropEd25519);
await internalUpdateSwarmFor(pubkey, updatedSwarm);
}
export async function updateSwarmFor(pubkey: string, snodes: Array<Snode>): Promise<void> {
const edkeys = snodes.map((sn: Snode) => sn.pubkey_ed25519);
await internalUpdateSwarmFor(pubkey, edkeys);
@ -290,36 +343,44 @@ export async function updateSwarmFor(pubkey: string, snodes: Array<Snode>): Prom
async function internalUpdateSwarmFor(pubkey: string, edkeys: Array<string>) {
// update our in-memory cache
nodesForPubkey.set(pubkey, edkeys);
swarmCache.set(pubkey, edkeys);
// write this change to the db
await Data.updateSwarmNodesForPubkey(pubkey, edkeys);
}
export async function getSwarm(pubkey: string): Promise<Array<Snode>> {
const maybeNodes = nodesForPubkey.get(pubkey);
let nodes: Array<string>;
export async function getSwarmFromCacheOrDb(pubkey: string): Promise<Array<string>> {
// NOTE: important that maybeNodes is not [] here
if (maybeNodes === undefined) {
const existingCache = swarmCache.get(pubkey);
if (existingCache === undefined) {
// First time access, no cache yet, let's try the database.
nodes = await Data.getSwarmNodesForPubkey(pubkey);
nodesForPubkey.set(pubkey, nodes);
} else {
nodes = maybeNodes;
const nodes = await Data.getSwarmNodesForPubkey(pubkey);
// if no db entry, this returns []
swarmCache.set(pubkey, nodes);
return nodes;
}
// cache already set, use it
return existingCache;
}
/**
* This call fetch from cache or db the swarm and extract only the one currently reachable.
* If not enough snodes valid are in the swarm, if fetches new snodes for this pubkey from the network.
*/
export async function getSwarmFor(pubkey: string): Promise<Array<Snode>> {
const nodes = await getSwarmFromCacheOrDb(pubkey);
// See how many are actually still reachable
const goodNodes = randomSnodePool.filter((n: Snode) => nodes.indexOf(n.pubkey_ed25519) !== -1);
if (goodNodes.length < minSwarmSnodeCount) {
// Request new node list from the network
const freshNodes = _.shuffle(await requestSnodesForPubkey(pubkey));
const edkeys = freshNodes.map((n: Snode) => n.pubkey_ed25519);
await internalUpdateSwarmFor(pubkey, edkeys);
return freshNodes;
} else {
if (goodNodes.length >= minSwarmSnodeCount) {
return goodNodes;
}
// Request new node list from the network
const freshNodes = _.shuffle(await requestSnodesForPubkey(pubkey));
const edkeys = freshNodes.map((n: Snode) => n.pubkey_ed25519);
await internalUpdateSwarmFor(pubkey, edkeys);
return freshNodes;
}

View File

@ -1,5 +1,5 @@
import { PubKey } from '../types';
import { getSwarm, Snode } from './snodePool';
import { getSwarmFor, Snode } from './snodePool';
import { retrieveNextMessages } from './SNodeAPI';
import { SignalService } from '../../protobuf';
import * as Receiver from '../../receiver/receiver';
@ -91,7 +91,7 @@ export class SwarmPolling {
// accept both until this is fixed:
const pkStr = pubkey.key;
const snodes = await getSwarm(pkStr);
const snodes = await getSwarmFor(pkStr);
// Select nodes for which we already have lastHashes
const alreadyPolled = snodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);