Merge pull request #1210 from msgmaxim/fix-some-tests

Stub out swarm polling
This commit is contained in:
Maxim Shishmarev 2020-07-03 15:18:13 +10:00 committed by GitHub
commit 6a41aa1d88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 126 additions and 262 deletions

View file

@ -56,18 +56,15 @@ describe('Add friends', function() {
.should.eventually.equal(common.TEST_PUBKEY2);
await app.client.element(ConversationPage.nextButton).click();
await app.client.waitForExist(
ConversationPage.sendFriendRequestTextarea,
1000
);
await app.client.waitForExist(ConversationPage.sendMessageTextarea, 1000);
// send a text message to that user (will be a friend request)
await app.client
.element(ConversationPage.sendFriendRequestTextarea)
.element(ConversationPage.sendMessageTextarea)
.setValue(textMessage);
await app.client.keys('Enter');
await app.client.waitForExist(
ConversationPage.existingFriendRequestText(textMessage),
ConversationPage.existingSendMessageText(textMessage),
1000
);
@ -76,63 +73,13 @@ describe('Add friends', function() {
await app.client.isExisting(ConversationPage.retrySendButton).should
.eventually.be.false;
// wait for left notification Friend Request count to go to 1 and click it
await app2.client.waitForExist(
ConversationPage.oneNotificationFriendRequestLeft,
5000
);
await app2.client
.element(ConversationPage.oneNotificationFriendRequestLeft)
.click();
// open the dropdown from the top friend request count
await app2.client.isExisting(
ConversationPage.oneNotificationFriendRequestTop
).should.eventually.be.true;
await app2.client
.element(ConversationPage.oneNotificationFriendRequestTop)
.click();
await app2.client.waitForExist(ConversationPage.conversationItem, 5000);
// we should have our app1 friend request here
await app2.client.isExisting(
ConversationPage.friendRequestFromUser(
common.TEST_DISPLAY_NAME1,
common.TEST_PUBKEY1
)
).should.eventually.be.true;
await app2.client.isExisting(ConversationPage.acceptFriendRequestButton)
.should.eventually.be.true;
await app2.client.element(ConversationPage.conversationItem).click();
// accept the friend request and validate that on both side the "accepted FR" message is shown
await app2.client
.element(ConversationPage.acceptFriendRequestButton)
.click();
await app2.client.waitForExist(
ConversationPage.acceptedFriendRequestMessage,
ConversationPage.existingReceivedMessageText(textMessage),
1000
);
await app.client.waitForExist(
ConversationPage.acceptedFriendRequestMessage,
5000
);
// app trigger the friend request logic first
const aliceLogs = await app.client.getRenderProcessLogs();
const bobLogs = await app2.client.getRenderProcessLogs();
await common.logsContains(
aliceLogs,
`Sending undefined:friend-request message to ${common.TEST_PUBKEY2}`
);
await common.logsContains(
bobLogs,
`Received a NORMAL_FRIEND_REQUEST from source: ${common.TEST_PUBKEY1}, primarySource: ${common.TEST_PUBKEY1},`
);
await common.logsContains(
bobLogs,
`Sending incoming-friend-request-accept:onlineBroadcast message to ${common.TEST_PUBKEY1}`
);
await common.logsContains(
aliceLogs,
`Sending outgoing-friend-request-accepted:onlineBroadcast message to ${common.TEST_PUBKEY2}`
);
});
});

View file

@ -28,6 +28,8 @@ module.exports = {
`${number} members`
),
conversationItem: commonPage.divWithClass('module-conversation-list-item'),
attachmentInput: '//*[contains(@class, "choose-file")]/input[@type="file"]',
attachmentButton: '//*[contains(@class, "choose-file")]/button',

View file

@ -443,8 +443,13 @@ window.NewSnodeAPI = require('./ts/session/snode_api/serviceNodeAPI');
window.SnodePool = require('./ts/session/snode_api/snodePool');
const { SwarmPolling } = require('./ts/session/snode_api/swarmPolling');
const { SwarmPollingStub } = require('./ts/session/snode_api/swarmPollingStub');
window.SwarmPolling = new SwarmPolling();
if (process.env.USE_STUBBED_NETWORK) {
window.SwarmPolling = new SwarmPollingStub();
} else {
window.SwarmPolling = new SwarmPolling();
}
window.shortenPubkey = pubkey => {
const pk = pubkey.key ? pubkey.key : pubkey;

View file

@ -352,123 +352,6 @@ export async function storeOnNode(
return false;
}
// export async function openRetrieveConnection(pSwarmPool: any, stopPollingPromise: Promise<any>, onMessages: any) {
// const swarmPool = pSwarmPool; // lint
// let stopPollingResult = false;
// // When message_receiver restarts from onoffline/ononline events it closes
// // http-resources, which will then resolve the stopPollingPromise with true. We then
// // want to cancel these polling connections because new ones will be created
// // tslint:disable-next-line no-floating-promises
// stopPollingPromise.then((result: any) => {
// stopPollingResult = result;
// });
// while (!stopPollingResult && !_.isEmpty(swarmPool)) {
// const address = Object.keys(swarmPool)[0]; // X.snode hostname
// const nodeData = swarmPool[address];
// delete swarmPool[address];
// let successiveFailures = 0;
// while (
// !stopPollingResult &&
// successiveFailures < MAX_ACCEPTABLE_FAILURES
// ) {
// // TODO: Revert back to using snode address instead of IP
// try {
// // in general, I think we want exceptions to bubble up
// // so the user facing UI can report unhandled errors
// // except in this case of living inside http-resource pollServer
// // because it just restarts more connections...
// let messages = await retrieveNextMessages(
// nodeData,
// nodeData.lastHash,
// this.ourKey
// );
// // this only tracks retrieval failures
// // won't include parsing failures...
// successiveFailures = 0;
// if (messages.length) {
// const lastMessage = _.last(messages);
// nodeData.lastHash = lastMessage.hash;
// await lokiSnodeAPI.updateLastHash(
// this.ourKey,
// address,
// lastMessage.hash,
// lastMessage.expiration
// );
// messages = await this.jobQueue.add(() =>
// filterIncomingMessages(messages)
// );
// }
// // Execute callback even with empty array to signal online status
// onMessages(messages);
// } catch (e) {
// log.warn(
// 'loki_message:::_openRetrieveConnection - retrieve error:',
// e.code,
// e.message,
// `on ${nodeData.ip}:${nodeData.port}`
// );
// if (e instanceof textsecure.WrongSwarmError) {
// const { newSwarm } = e;
// // Is this a security concern that we replace the list of snodes
// // based on a response from a single snode?
// await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
// // FIXME: restart all openRetrieves when this happens...
// // FIXME: lokiSnode should handle this
// for (let i = 0; i < newSwarm.length; i += 1) {
// const lastHash = await window.Signal.Data.getLastHashBySnode(
// this.ourKey,
// newSwarm[i]
// );
// swarmPool[newSwarm[i]] = {
// lastHash,
// };
// }
// // Try another snode
// break;
// } else if (e instanceof textsecure.NotFoundError) {
// // DNS/Lokinet error, needs to bubble up
// throw new window.textsecure.DNSResolutionError(
// 'Retrieving messages'
// );
// }
// successiveFailures += 1;
// }
// // Always wait a bit as we are no longer long-polling
// await sleepFor(Math.max(successiveFailures, 2) * 1000);
// }
// if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
// const remainingSwarmSnodes = await markUnreachableForPubkey(
// this.ourKey,
// nodeData
// );
// log.warn(
// `loki_message:::_openRetrieveConnection - too many successive failures, removing ${
// nodeData.ip
// }:${nodeData.port} from our swarm pool. We have ${
// Object.keys(swarmPool).length
// } usable swarm nodes left for our connection (${
// remainingSwarmSnodes.length
// } in local db)`
// );
// }
// }
// // if not stopPollingResult
// if (_.isEmpty(swarmPool)) {
// log.error(
// 'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
// );
// return false;
// }
// return true;
// }
// mark private (_ prefix) since no error handling is done here...
export async function retrieveNextMessages(
nodeData: Snode,
lastHash: string,

View file

@ -142,12 +142,8 @@ export function markNodeUnreachable(snode: Snode): void {
export async function getRandomSnodeAddress(): Promise<Snode> {
// resolve random snode
if (randomSnodePool.length === 0) {
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await refreshRandomPool([]);
} catch (e) {
throw e;
}
await refreshRandomPool([]);
if (randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}

View file

@ -65,6 +65,82 @@ export class SwarmPolling {
this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key));
}
protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
// NOTE: sometimes pubkey is string, sometimes it is object, so
// accept both until this is fixed:
const pk = (pubkey.key ? pubkey.key : pubkey) as string;
const snodes = await getSnodesFor(pk);
// Select nodes for which we already have lastHashes
const alreadyPolled = snodes.filter(
(n: Snode) => this.lastHashes[n.pubkey_ed25519]
);
// If we need more nodes, select randomly from the remaining nodes:
// Use 1 node for now:
const COUNT = 1;
let nodesToPoll = _.sampleSize(alreadyPolled, COUNT);
if (nodesToPoll.length < COUNT) {
const notPolled = _.difference(snodes, alreadyPolled);
const newNeeded = COUNT - alreadyPolled.length;
const newNodes = _.sampleSize(notPolled, newNeeded);
nodesToPoll = _.concat(nodesToPoll, newNodes);
}
const results = await Promise.all(
nodesToPoll.map(async (n: Snode) => {
return this.pollNodeForKey(n, pubkey);
})
);
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
const newMessages = await this.handleSeenMessages(messages);
newMessages.forEach((m: Message) => {
const options = isGroup ? { conversationId: pk } : {};
processMessage(m.data, options);
});
}
// Fetches messages for `pubkey` from `node` potentially updating
// the lash hash record
protected async pollNodeForKey(
node: Snode,
pubkey: PubKey
): Promise<Array<any>> {
const edkey = node.pubkey_ed25519;
const pkStr = pubkey.key ? pubkey.key : pubkey;
const prevHash = await this.getLastHash(edkey, pkStr as string);
const messages = await retrieveNextMessages(node, prevHash, pubkey);
if (!messages.length) {
return [];
}
const lastMessage = _.last(messages);
this.updateLastHash(
edkey,
pubkey,
lastMessage.hash,
lastMessage.expiration
);
return messages;
}
private loadGroupIds() {
// Start polling for medium size groups as well (they might be in different swarms)
const convos = window
@ -103,11 +179,11 @@ export class SwarmPolling {
private async pollForAllKeys() {
const directPromises = this.pubkeys.map(async pk => {
return this.pollOnceForKey(pk);
return this.pollOnceForKey(pk, false);
});
const groupPromises = this.groupPubkeys.map(async pk => {
return this.pollOnceForKey(pk);
return this.pollOnceForKey(pk, true);
});
await Promise.all(_.concat(directPromises, groupPromises));
@ -154,81 +230,4 @@ export class SwarmPolling {
return nodeRecords[pubkey];
}
}
// Fetches messages for `pubkey` from `node` potentially updating
// the lash hash record
private async pollNodeForKey(
node: Snode,
pubkey: PubKey
): Promise<Array<any>> {
const edkey = node.pubkey_ed25519;
const pkStr = pubkey.key ? pubkey.key : pubkey;
const prevHash = await this.getLastHash(edkey, pkStr as string);
const messages = await retrieveNextMessages(node, prevHash, pubkey);
if (!messages.length) {
return [];
}
const lastMessage = _.last(messages);
this.updateLastHash(
edkey,
pubkey,
lastMessage.hash,
lastMessage.expiration
);
return messages;
}
private async pollOnceForKey(pubkey: PubKey) {
// NOTE: sometimes pubkey is string, sometimes it is object, so
// accept both until this is fixed:
const pk = pubkey.key ? pubkey.key : pubkey;
const snodes = await getSnodesFor(pk as string);
// Select nodes for which we already have lastHashes
const alreadyPolled = snodes.filter(
(n: Snode) => this.lastHashes[n.pubkey_ed25519]
);
// If we need more nodes, select randomly from the remaining nodes:
// Use 1 node for now:
const COUNT = 1;
let nodesToPoll = _.sampleSize(alreadyPolled, COUNT);
if (nodesToPoll.length < COUNT) {
const notPolled = _.difference(snodes, alreadyPolled);
const newNeeded = COUNT - alreadyPolled.length;
const newNodes = _.sampleSize(notPolled, newNeeded);
nodesToPoll = _.concat(nodesToPoll, newNodes);
}
const results = await Promise.all(
nodesToPoll.map(async (n: Snode) => {
return this.pollNodeForKey(n, pubkey);
})
);
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
const newMessages = await this.handleSeenMessages(messages);
newMessages.forEach((m: Message) => {
processMessage(m.data, { conversationId: pubkey.key });
});
// TODO: `onMessages`
}
}

View file

@ -0,0 +1,32 @@
import { processMessage, SwarmPolling } from './swarmPolling';
import fetch from 'node-fetch';
import { PubKey } from '../types';
export class SwarmPollingStub extends SwarmPolling {
private readonly baseUrl = 'http://localhost:3000';
protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
const pubkeyStr = pubkey.key ? pubkey.key : pubkey;
const get = {
method: 'GET',
};
const res = await fetch(
`${this.baseUrl}/messages?pubkey=${pubkeyStr}`,
get
);
try {
const json = await res.json();
const options = isGroup ? { conversationId: pubkeyStr } : {};
json.messages.forEach((m: any) => {
processMessage(m.data, options);
});
} catch (e) {
window.log.error('invalid json: ', e);
}
}
}