Remove swarm nodes from conversations

This commit is contained in:
Maxim Shishmarev 2020-07-06 15:28:22 +10:00
parent 26e3eca1a2
commit d5a98b8b47
14 changed files with 142 additions and 319 deletions

View File

@ -92,7 +92,8 @@ module.exports = {
removeAllSessions,
getAllSessions,
getSwarmNodesByPubkey,
getSwarmNodesForPubkey,
updateSwarmNodesForPubkey,
getGuardNodes,
updateGuardNodes,
@ -808,6 +809,7 @@ const LOKI_SCHEMA_VERSIONS = [
updateToLokiSchemaVersion2,
updateToLokiSchemaVersion3,
updateToLokiSchemaVersion4,
updateToLokiSchemaVersion5,
];
async function updateToLokiSchemaVersion1(currentVersion, instance) {
@ -967,6 +969,36 @@ async function updateToLokiSchemaVersion4(currentVersion, instance) {
console.log('updateToLokiSchemaVersion4: success!');
}
const NODES_FOR_PUBKEY_TABLE = 'nodesForPubkey';
async function updateToLokiSchemaVersion5(currentVersion, instance) {
if (currentVersion >= 5) {
return;
}
console.log('updateToLokiSchemaVersion5: starting...');
await instance.run('BEGIN TRANSACTION;');
await instance.run(
`CREATE TABLE ${NODES_FOR_PUBKEY_TABLE} (
pubkey TEXT PRIMARY KEY,
json TEXT
);`
);
await instance.run(
`INSERT INTO loki_schema (
version
) values (
5
);`
);
await instance.run('COMMIT TRANSACTION;');
console.log('updateToLokiSchemaVersion5: success!');
}
async function updateLokiSchema(instance) {
const result = await instance.get(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name='loki_schema';"
@ -1643,16 +1675,35 @@ async function getAllFromTable(table) {
// Conversations
async function getSwarmNodesByPubkey(pubkey) {
const row = await db.get('SELECT * FROM conversations WHERE id = $pubkey;', {
$pubkey: pubkey,
});
async function getSwarmNodesForPubkey(pubkey) {
const row = await db.get(
`SELECT * FROM ${NODES_FOR_PUBKEY_TABLE} WHERE pubkey = $pubkey;`,
{
$pubkey: pubkey,
}
);
if (!row) {
return [];
}
return jsonToObject(row.json).swarmNodes;
return jsonToObject(row.json);
}
async function updateSwarmNodesForPubkey(pubkey, snodeEdKeys) {
await db.run(
`INSERT OR REPLACE INTO ${NODES_FOR_PUBKEY_TABLE} (
pubkey,
json
) values (
$pubkey,
$json
);`,
{
$pubkey: pubkey,
$json: objectToJSON(snodeEdKeys),
}
);
}
const CONVERSATIONS_TABLE = 'conversations';
@ -2682,6 +2733,7 @@ function getRemoveConfigurationPromises() {
db.run('DELETE FROM servers;'),
db.run('DELETE FROM lastHashes;'),
db.run(`DELETE FROM ${SENDER_KEYS_TABLE};`),
db.run(`DELETE FROM ${NODES_FOR_PUBKEY_TABLE};`),
db.run('DELETE FROM seenMessages;'),
];
}

View File

@ -1,10 +0,0 @@
/* global log */
class StubLokiSnodeAPI {
// eslint-disable-next-line class-methods-use-this
async refreshSwarmNodesForPubKey(pubKey) {
log.info('refreshSwarmNodesForPubkey: ', pubKey);
}
}
module.exports = StubLokiSnodeAPI;

View File

@ -1,9 +0,0 @@
/* eslint-disable class-methods-use-this */
class StubSnodeAPI {
async refreshSwarmNodesForPubKey() {
return [];
}
}
module.exports = StubSnodeAPI;

View File

@ -139,7 +139,8 @@
if (!conversation.isPublic() && !conversation.isRss()) {
Promise.all([
conversation.updateProfileAvatar(),
window.lokiSnodeAPI.refreshSwarmNodesForPubKey(id),
// NOTE: we request snodes updating the cache, but ignore the result
window.SnodePool.getSnodesFor(id),
]);
}
});

View File

@ -81,7 +81,6 @@
unreadCount: 0,
verified: textsecure.storage.protocol.VerifiedStatus.DEFAULT,
sessionResetStatus: SessionResetEnum.none,
swarmNodes: [],
groupAdmins: [],
isKickedFromGroup: false,
isOnline: false,
@ -1523,13 +1522,6 @@
})
);
},
async updateSwarmNodes(swarmNodes) {
this.set({ swarmNodes });
await window.Signal.Data.updateConversation(this.id, this.attributes, {
Conversation: Whisper.Conversation,
});
},
async updateLastMessage() {
if (!this.id) {
return;

View File

@ -1142,18 +1142,6 @@
return null;
}
},
isReplayableError(e) {
return (
e.name === 'MessageError' ||
e.name === 'OutgoingMessageError' ||
e.name === 'SendMessageNetworkError' ||
e.name === 'SignedPreKeyRotationError' ||
e.name === 'OutgoingIdentityKeyError' ||
e.name === 'DNSResolutionError' ||
e.name === 'EmptySwarmError' ||
e.name === 'PoWError'
);
},
// Called when the user ran into an error with a specific user, wants to send to them
// One caller today: ConversationView.forceSend()

View File

@ -233,6 +233,11 @@ export function saveSeenMessageHash(data: {
hash: string;
}): Promise<void>;
export function getSwarmNodesForPubkey(pubkey: string): Promise<Array<string>>;
export function updateSwarmNodesForPubkey(
pubkey: string,
snodeEdKeys: Array<string>
): Promise<void>;
// TODO: Strictly type the following
export function updateLastHash(data: any): Promise<any>;
export function saveSeenMessageHashes(data: any): Promise<any>;

View File

@ -110,8 +110,8 @@ module.exports = {
removeAllSessions,
getAllSessions,
// Doesn't look like this is used at all
getSwarmNodesByPubkey,
getSwarmNodesForPubkey,
updateSwarmNodesForPubkey,
getConversationCount,
saveConversation,
@ -736,12 +736,15 @@ async function getAllSessions(id) {
return sessions;
}
// Conversation
async function getSwarmNodesByPubkey(pubkey) {
return channels.getSwarmNodesByPubkey(pubkey);
async function getSwarmNodesForPubkey(pubkey) {
return channels.getSwarmNodesForPubkey(pubkey);
}
async function updateSwarmNodesForPubkey(pubkey, snodeEdKeys) {
await channels.updateSwarmNodesForPubkey(pubkey, snodeEdKeys);
}
// Conversation
async function getConversationCount() {
return channels.getConversationCount();
}
@ -773,7 +776,6 @@ async function updateConversation(id, data, { Conversation }) {
// it will take a union of old and new members and that's not
// what we want for member deletion, so:
merged.members = data.members;
merged.swarmNodes = data.swarmNodes;
// Don't save the online status of the object
const cleaned = omit(merged, 'isOnline');

View File

@ -1,6 +1,6 @@
/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker, lokiSnodeAPI, textsecure */
/* global log, dcodeIO, window, callWorker, textsecure */
const _ = require('lodash');
const primitives = require('./loki_primitives');
@ -14,11 +14,18 @@ const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => {
return callWorker('calcPoW', timestamp, ttl, pubKey, data64, difficulty);
};
class LokiMessageAPI {
constructor() {
this.sendingData = {};
async function _openSendConnection(snode, params) {
// TODO: Revert back to using snode address instead of IP
const successfulSend = await window.NewSnodeAPI.storeOnNode(snode, params);
if (successfulSend) {
return snode;
}
// should we mark snode as bad if it can't store our message?
return false;
}
class LokiMessageAPI {
/**
* Refactor note: We should really clean this up ... it's very messy
*
@ -69,14 +76,7 @@ class LokiMessageAPI {
ttl
);
// Using timestamp as a unique identifier
const swarm = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey);
this.sendingData[timestamp] = {
swarm,
hasFreshList: false,
};
if (this.sendingData[timestamp].swarm.length < numConnections) {
await this.refreshSendingSwarm(pubKey, timestamp);
}
const swarm = await window.SnodePool.getSnodesFor(pubKey);
// send parameters
const params = {
@ -86,17 +86,10 @@ class LokiMessageAPI {
timestamp: timestamp.toString(),
data: data64,
};
const promises = [];
let completedConnections = 0;
for (let i = 0; i < numConnections; i += 1) {
const connectionPromise = this._openSendConnection(params).finally(() => {
completedConnections += 1;
if (completedConnections >= numConnections) {
delete this.sendingData[timestamp];
}
});
promises.push(connectionPromise);
}
const promises = _.slice(swarm, 0, numConnections).map(snode =>
_openSendConnection(snode, params)
);
let snode;
try {
@ -122,50 +115,11 @@ class LokiMessageAPI {
pubKey,
'Ran out of swarm nodes to query'
);
}
log.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${snode.ip}:${snode.port}`
);
}
async refreshSendingSwarm(pubKey, timestamp) {
const freshNodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(pubKey);
this.sendingData[timestamp].swarm = freshNodes;
this.sendingData[timestamp].hasFreshList = true;
return true;
}
async _openSendConnection(params) {
// timestamp is likely the current second...
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await window.NewSnodeAPI.storeOnNode(
snode,
params
} else {
log.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${snode.ip}:${snode.port}`
);
if (successfulSend) {
return snode;
}
// should we mark snode as bad if it can't store our message?
}
if (!this.sendingData[params.timestamp].hasFreshList) {
// Ensure that there is only a single refresh per outgoing message
if (!this.sendingData[params.timestamp].refreshPromise) {
this.sendingData[
params.timestamp
].refreshPromise = this.refreshSendingSwarm(
params.pubKey,
params.timestamp
);
}
await this.sendingData[params.timestamp].refreshPromise;
// Retry with a fresh list again
return this._openSendConnection(params);
}
return false;
}
}

View File

@ -1,5 +1,5 @@
/* eslint-disable class-methods-use-this */
/* global window, textsecure, ConversationController, log, process, Buffer, StringView, dcodeIO */
/* global window, textsecure, log, process, Buffer, StringView, dcodeIO */
// not sure I like this name but it's been than util
const primitives = require('./loki_primitives');
@ -7,12 +7,8 @@ const primitives = require('./loki_primitives');
const is = require('@sindresorhus/is');
const nodeFetch = require('node-fetch');
const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const MIN_GUARD_COUNT = 2;
const compareSnodes = (current, search) =>
current.pubkey_ed25519 === search.pubkey_ed25519;
class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) {
if (!is.string(serverUrl)) {
@ -316,103 +312,6 @@ class LokiSnodeAPI {
});
}
async updateLastHash(convoId, snodeAddress, hash, expiresAt) {
// FIXME: handle rejections
await window.Signal.Data.updateLastHash({
convoId,
snode: snodeAddress,
hash,
expiresAt,
});
}
// called by loki_message:::sendMessage & loki_message:::startLongPolling
async getSwarmNodesForPubKey(pubKey, options = {}) {
const { fetchHashes } = options;
try {
const conversation = ConversationController.get(pubKey);
if (!conversation) {
throw new Error('Could not find conversation ', pubKey);
}
const swarmNodes = [...conversation.get('swarmNodes')];
// always? include lashHash
if (fetchHashes) {
await Promise.all(
Object.keys(swarmNodes).map(async j => {
const node = swarmNodes[j];
// FIXME make a batch function call
const lastHash = await window.Signal.Data.getLastHashBySnode(
pubKey,
node.address
);
log.debug(
`LokiSnodeAPI::getSwarmNodesForPubKey - ${j} ${node.ip}:${node.port}`
);
swarmNodes[j] = {
...node,
lastHash,
};
})
);
}
return swarmNodes;
} catch (e) {
log.error('getSwarmNodesForPubKey expection: ', e);
throw new window.textsecure.ReplayableError({
message: 'Could not get conversation',
});
}
}
async updateSwarmNodes(pubKey, newNodes) {
try {
const filteredNodes = newNodes.filter(snode => snode.ip !== '0.0.0.0');
const conversation = ConversationController.get(pubKey);
await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
} catch (e) {
log.error(
`LokiSnodeAPI::updateSwarmNodes - error ${e.code} ${e.message}`
);
throw new window.textsecure.ReplayableError({
message: 'Could not get conversation',
});
}
}
// FIXME: in it's own PR, reorder functions: put _getFreshSwarmNodes and it's callee
// only loki_message::startLongPolling calls this...
async refreshSwarmNodesForPubKey(pubKey) {
// FIXME: handle rejections
const newNodes = await this._getFreshSwarmNodes(pubKey);
log.debug(
'LokiSnodeAPI::refreshSwarmNodesForPubKey - newNodes',
newNodes.length
);
const filteredNodes = this.updateSwarmNodes(pubKey, newNodes);
return filteredNodes;
}
async _getFreshSwarmNodes(pubKey) {
return primitives.allowOnlyOneAtATime(`swarmRefresh${pubKey}`, async () => {
let newSwarmNodes = [];
try {
newSwarmNodes = await this._getSwarmNodes(pubKey);
} catch (e) {
log.error(
'LokiSnodeAPI::_getFreshSwarmNodes - error',
e.code,
e.message
);
// TODO: Handle these errors sensibly
newSwarmNodes = [];
}
return newSwarmNodes;
});
}
async getLnsMapping(lnsName, timeout) {
// Returns { pubkey, error }
// pubkey is
@ -530,33 +429,6 @@ class LokiSnodeAPI {
return { pubkey, error };
}
async _getSwarmNodes(pubKey) {
const snodes = [];
// creates a range: [0, 1, 2]
const questions = [...Array(RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM).keys()];
// FIXME: handle rejections
await Promise.all(
questions.map(async qNum => {
// allow exceptions to pass through upwards
const resList = await window.NewSnodeAPI.getSnodesForPubkey(pubKey);
log.info(
`LokiSnodeAPI::_getSwarmNodes - question ${qNum} got`,
resList.length,
'snodes'
);
resList.map(item => {
const hasItem = snodes.some(n => compareSnodes(n, item));
if (!hasItem) {
snodes.push(item);
}
return true;
});
})
);
// should we only activate entries that are in all results? yes
return snodes;
}
}
module.exports = LokiSnodeAPI;

View File

@ -347,19 +347,7 @@ if (process.env.USE_STUBBED_NETWORK) {
const StubAppDotNetAPI = require('./integration_test/stubs/stub_app_dot_net_api');
window.LokiAppDotNetServerAPI = StubAppDotNetAPI;
const StubSnodeAPI = require('./integration_test/stubs/stub_snode_api');
window.lokiSnodeAPI = new StubSnodeAPI({
serverUrl: config.serverUrl,
localUrl: config.localUrl,
});
} else {
window.lokiSnodeAPI = new LokiSnodeAPI({
serverUrl: config.serverUrl,
localUrl: config.localUrl,
});
window.LokiMessageAPI = require('./js/modules/loki_message_api');
window.LokiAppDotNetServerAPI = require('./js/modules/loki_app_dot_net_api');
@ -495,9 +483,6 @@ if (
};
/* eslint-enable global-require, import/no-extraneous-dependencies */
window.lokiFeatureFlags = {};
// eslint-disable-next-line global-require
window.StubLokiSnodeAPI = require('./integration_test/stubs/stub_loki_snode_api');
window.lokiSnodeAPI = new window.StubLokiSnodeAPI(); // no need stub out each function here
}
if (config.environment.includes('test-integration')) {
window.lokiFeatureFlags = {

View File

@ -53,7 +53,7 @@ async function processProxyResponse(
'after 3 retries'
);
if (options.ourPubKey) {
void SnodePool.markUnreachableForPubkey(options.ourPubKey, targetNode);
SnodePool.markNodeUnreachable(targetNode);
}
return false;
}
@ -92,7 +92,7 @@ async function processProxyResponse(
`lokiRpc:::sendToProxy - Failing ${targetNode.ip}:${targetNode.port} after 5 retries`
);
if (options.ourPubKey) {
void SnodePool.markUnreachableForPubkey(options.ourPubKey, targetNode);
SnodePool.markNodeUnreachable(targetNode);
}
return false;
}

View File

@ -12,8 +12,8 @@ import { sleepFor } from '../../../js/modules/loki_primitives';
import {
getRandomSnodeAddress,
markNodeUnreachable,
markUnreachableForPubkey,
Snode,
updateSnodesFor,
} from './snodePool';
const snodeHttpsAgent = new https.Agent({
@ -143,9 +143,6 @@ export async function getSnodesFromSeedUrl(urlObj: URL): Promise<Array<any>> {
}
}
// Not entirely sure what this is used for
const sendingData: any = {};
interface SendParams {
pubKey: string;
ttl: string;
@ -319,9 +316,7 @@ export async function storeOnNode(
);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
sendingData[params.timestamp].swarm = newSwarm;
sendingData[params.timestamp].hasFreshList = true;
await updateSnodesFor(params.pubKey, newSwarm);
return false;
} else if (e instanceof textsecure.WrongDifficultyError) {
const { newDifficulty } = e;
@ -342,12 +337,9 @@ export async function storeOnNode(
successiveFailures += 1;
}
}
const remainingSwarmSnodes = await markUnreachableForPubkey(
params.pubKey,
targetNode
);
markNodeUnreachable(targetNode);
log.error(
`loki_message:::storeOnNode - Too many successive failures trying to send to node ${targetNode.ip}:${targetNode.port}, ${remainingSwarmSnodes.length} remaining swarm nodes`
`loki_message:::storeOnNode - Too many successive failures trying to send to node ${targetNode.ip}:${targetNode.port}`
);
return false;
}

View File

@ -9,6 +9,8 @@ import {
getVersion,
} from './serviceNodeAPI';
import * as Data from '../../../js/modules/data';
import semver from 'semver';
import _ from 'lodash';
@ -29,7 +31,7 @@ let randomSnodePool: Array<Snode> = [];
let stopGetAllVersionPromiseControl: any = false;
// We only store nodes' identifiers here,
const nodesForPubkey: { [key: string]: Array<SnodeEdKey> } = {};
const nodesForPubkey: Map<string, Array<SnodeEdKey>> = new Map();
// just get the filtered list
async function tryGetSnodeListFromLokidSeednode(
@ -96,43 +98,16 @@ async function tryGetSnodeListFromLokidSeednode(
return [];
}
// This simply removes the node from the conversation, but not in the pool! (fix this?)
export async function markUnreachableForPubkey(
pubKey: string,
unreachableNode: Snode
): Promise<Array<Snode>> {
const { log, ConversationController } = window;
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
if (typeof unreachableNode === 'string') {
log.warn(
'LokiSnodeAPI::unreachableNode - String passed as unreachableNode to unreachableNode'
);
return swarmNodes;
}
const filteredNodes = swarmNodes.filter(node =>
compareSnodes(unreachableNode, node)
);
if (filteredNodes.length === swarmNodes.length) {
log.warn(
`LokiSnodeAPI::unreachableNode - snode ${unreachableNode.ip}:${unreachableNode.port} has already been marked as bad`
);
}
try {
await conversation.updateSwarmNodes(filteredNodes);
} catch (e) {
log.error(`LokiSnodeAPI::unreachableNode - error ${e.code} ${e.message}`);
throw e;
}
return filteredNodes;
}
export function markNodeUnreachable(snode: Snode): void {
const { log } = window;
randomSnodePool = _.without(randomSnodePool, snode);
_.remove(randomSnodePool, x => x.pubkey_ed25519 === snode.pubkey_ed25519);
for (const [pubkey, nodes] of nodesForPubkey) {
const edkeys = _.filter(nodes, edkey => edkey !== snode.pubkey_ed25519);
// tslint:disable-next-line no-floating-promises
internalUpdateSnodesFor(pubkey, edkeys);
}
log.warn(
`Marking ${snode.ip}:${snode.port} as unreachable, ${randomSnodePool.length} snodes remaining in randomPool`
@ -342,22 +317,46 @@ export async function refreshRandomPool(seedNodes?: Array<any>): Promise<void> {
});
}
export async function updateSnodesFor(
pubkey: string,
snodes: Array<Snode>
): Promise<void> {
const edkeys = snodes.map((sn: Snode) => sn.pubkey_ed25519);
await internalUpdateSnodesFor(pubkey, edkeys);
}
async function internalUpdateSnodesFor(pubkey: string, edkeys: Array<string>) {
nodesForPubkey.set(pubkey, edkeys);
await Data.updateSwarmNodesForPubkey(pubkey, edkeys);
}
export async function getSnodesFor(pubkey: string): Promise<Array<Snode>> {
const nodes = nodesForPubkey[pubkey];
let maybeNodes = nodesForPubkey.get(pubkey);
let nodes: Array<string>;
maybeNodes = [];
// NOTE: important that maybeNodes is not [] here
if (maybeNodes === undefined) {
// First time access, try the database:
nodes = await Data.getSwarmNodesForPubkey(pubkey);
nodesForPubkey.set(pubkey, nodes);
} else {
nodes = maybeNodes;
}
// See how many are actually still reachable
const goodNodes = nodes
? randomSnodePool.filter(
(n: Snode) => nodes.indexOf(n.pubkey_ed25519) !== -1
)
: [];
const goodNodes = randomSnodePool.filter(
(n: Snode) => nodes.indexOf(n.pubkey_ed25519) !== -1
);
if (goodNodes.length < MIN_NODES) {
// Request new node list from the network
const freshNodes = await getSnodesForPubkey(pubkey);
const edkeys = freshNodes.map((n: Snode) => n.pubkey_ed25519);
nodesForPubkey[pubkey] = edkeys;
// tslint:disable-next-line no-floating-promises
internalUpdateSnodesFor(pubkey, edkeys);
// TODO: We could probably check that the retuned sndoes are not "unreachable"
return freshNodes;