diff --git a/js/models/conversations.js b/js/models/conversations.js index 7f510916e..f5e70fc22 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -642,6 +642,7 @@ await this.respondToAllPendingFriendRequests({ response: 'accepted', }); + window.libloki.api.sendOnlineBroadcastMessage(this.id); return true; } return false; diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index ce4bc1b83..8c05e7c39 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -1,6 +1,6 @@ /* eslint-disable no-await-in-loop */ /* eslint-disable no-loop-func */ -/* global log, dcodeIO, window, callWorker, Whisper */ +/* global log, dcodeIO, window, callWorker, Whisper, lokiP2pAPI */ const nodeFetch = require('node-fetch'); const _ = require('lodash'); @@ -27,9 +27,9 @@ const fetch = async (url, options = {}) => { try { const response = await nodeFetch(url, { + ...options, timeout, method, - ...options, }); if (!response.ok) { @@ -63,9 +63,28 @@ class LokiMessageAPI { this.messageServerPort = messageServerPort ? `:${messageServerPort}` : ''; } - async sendMessage(pubKey, data, messageTimeStamp, ttl) { + async sendMessage(pubKey, data, messageTimeStamp, ttl, forceP2p = false) { const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const timestamp = Math.floor(Date.now() / 1000); + const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); + if (p2pDetails && (forceP2p || p2pDetails.isOnline)) { + try { + const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; + const url = `${p2pDetails.address}${port}/store`; + const fetchOptions = { + method: 'POST', + body: data64, + }; + + await fetch(url, fetchOptions); + lokiP2pAPI.setContactOnline(pubKey); + return; + } catch (e) { + log.warn('Failed to send P2P message, falling back to storage', e); + lokiP2pAPI.setContactOffline(pubKey); + } + } + // Nonce is returned as a base64 string to include in header let nonce; try { diff --git a/js/modules/loki_p2p_api.js b/js/modules/loki_p2p_api.js index 09978299b..77da5c696 100644 --- a/js/modules/loki_p2p_api.js +++ b/js/modules/loki_p2p_api.js @@ -1,21 +1,94 @@ -class LokiP2pAPI { +/* global setTimeout, clearTimeout, window */ + +const EventEmitter = require('events'); + +class LokiP2pAPI extends EventEmitter { constructor() { + super(); this.contactP2pDetails = {}; } - addContactP2pDetails(pubKey, address, port) { - this.contactP2pDetails[pubKey] = { - address, - port, - }; + addContactP2pDetails(pubKey, address, port, resetTimer = false) { + // Stagger the timers so the friends don't ping each other at the same time + this.ourKey = this.ourKey || window.textsecure.storage.user.getNumber(); + const timerDuration = + pubKey < this.ourKey + ? 60 * 1000 // 1 minute + : 2 * 60 * 1000; // 2 minutes + + if (!this.contactP2pDetails[pubKey]) { + // If this is the first time we are getting this contacts details + // then we try to ping them straight away + this.contactP2pDetails[pubKey] = { + address, + port, + timerDuration, + isOnline: false, + pingTimer: null, + }; + this.pingContact(pubKey); + return; + } + + clearTimeout(this.contactP2pDetails[pubKey].pingTimer); + if ( + this.contactP2pDetails[pubKey].address !== address || + this.contactP2pDetails[pubKey].port !== port + ) { + // If this contact has changed their details + // then we try to ping them straight away + this.contactP2pDetails[pubKey].address = address; + this.contactP2pDetails[pubKey].port = port; + this.contactP2pDetails[pubKey].isOnline = false; + this.pingContact(pubKey); + return; + } + + if (resetTimer) { + // If this contact is simply sharing the same details with us + // then we just reset our timer + this.contactP2pDetails[pubKey].pingTimer = setTimeout( + this.pingContact.bind(this), + this.contactP2pDetails[pubKey].timerDuration, + pubKey + ); + return; + } + this.pingContact(pubKey); } getContactP2pDetails(pubKey) { return this.contactP2pDetails[pubKey] || null; } - removeContactP2pDetails(pubKey) { - delete this.contactP2pDetails[pubKey]; + setContactOffline(pubKey) { + this.emit('offline', pubKey); + if (!this.contactP2pDetails[pubKey]) { + return; + } + clearTimeout(this.contactP2pDetails[pubKey].pingTimer); + this.contactP2pDetails[pubKey].isOnline = false; + } + + setContactOnline(pubKey) { + if (!this.contactP2pDetails[pubKey]) { + return; + } + this.emit('online', pubKey); + clearTimeout(this.contactP2pDetails[pubKey].pingTimer); + this.contactP2pDetails[pubKey].isOnline = true; + this.contactP2pDetails[pubKey].pingTimer = setTimeout( + this.pingContact.bind(this), + this.contactP2pDetails[pubKey].timerDuration, + pubKey + ); + } + + pingContact(pubKey) { + if (!this.contactP2pDetails[pubKey]) { + return; + } + window.libloki.api.sendOnlineBroadcastMessage(pubKey, true); } } diff --git a/libloki/api.js b/libloki/api.js index 876cd15e2..83fec66c9 100644 --- a/libloki/api.js +++ b/libloki/api.js @@ -23,10 +23,9 @@ ); } - async function sendOnlineBroadcastMessage(pubKey) { - // TODO: Make this actually get a loki address rather than junk string + async function sendOnlineBroadcastMessage(pubKey, forceP2p = false) { const lokiAddressMessage = new textsecure.protobuf.LokiAddressMessage({ - p2pAddress: 'testAddress', + p2pAddress: 'http://localhost', p2pPort: parseInt(window.localServerPort, 10), }); const content = new textsecure.protobuf.Content({ @@ -41,7 +40,7 @@ log.info('Online broadcast message sent successfully'); } }; - const options = { messageType: 'onlineBroadcast' }; + const options = { messageType: 'onlineBroadcast', forceP2p }; // Send a empty message with information about how to contact us directly const outgoingMessage = new textsecure.OutgoingMessage( null, // server diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index 6f6a8f815..bded6ef25 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -74,7 +74,7 @@ }); }; - this.handleMessage = message => { + this.handleMessage = (message, isP2p = false) => { try { const dataPlaintext = stringToArrayBufferBase64(message); const messageBuf = textsecure.protobuf.WebSocketMessage.decode( @@ -89,7 +89,8 @@ path: messageBuf.request.path, body: messageBuf.request.body, id: messageBuf.request.id, - }) + }), + isP2p ); } } catch (error) { diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index d0e42d7d2..72667842c 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -87,7 +87,7 @@ MessageReceiver.prototype.extend({ localLokiServer.start(localServerPort).then(port => { window.log.info(`Local Server started at localhost:${port}`); libloki.api.broadcastOnlineStatus(); - localLokiServer.on('message', this.httpPollingResource.handleMessage); + localLokiServer.on('message', this.handleP2pMessage.bind(this)); }); // TODO: Rework this socket stuff to work with online messaging @@ -119,6 +119,9 @@ MessageReceiver.prototype.extend({ // all cached envelopes are processed. this.incoming = [this.pending]; }, + handleP2pMessage(message) { + this.httpPollingResource.handleMessage(message, true); + }, shutdown() { if (this.socket) { this.socket.onclose = null; @@ -135,7 +138,7 @@ MessageReceiver.prototype.extend({ if (localLokiServer) { localLokiServer.removeListener( 'message', - this.httpPollingResource.handleMessage + this.handleP2pMessage.bind(this) ); } }, @@ -194,7 +197,7 @@ MessageReceiver.prototype.extend({ // return this.dispatchAndWait(event); // }); }, - handleRequest(request) { + handleRequest(request, isP2p = false) { this.incoming = this.incoming || []; const lastPromise = _.last(this.incoming); @@ -214,6 +217,9 @@ MessageReceiver.prototype.extend({ const promise = Promise.resolve(request.body.toArrayBuffer()) // textsecure.crypto .then(plaintext => { const envelope = textsecure.protobuf.Envelope.decode(plaintext); + if (isP2p) { + lokiP2pAPI.setContactOnline(envelope.source); + } // After this point, decoding errors are not the server's // fault, and we should handle them gracefully and tell the // user they received an invalid message @@ -223,6 +229,7 @@ MessageReceiver.prototype.extend({ } envelope.id = envelope.serverGuid || window.getGuid(); + envelope.isP2p = isP2p; envelope.serverTimestamp = envelope.serverTimestamp ? envelope.serverTimestamp.toNumber() : null; @@ -901,7 +908,12 @@ MessageReceiver.prototype.extend({ }, async handleLokiAddressMessage(envelope, lokiAddressMessage) { const { p2pAddress, p2pPort } = lokiAddressMessage; - lokiP2pAPI.addContactP2pDetails(envelope.source, p2pAddress, p2pPort); + lokiP2pAPI.addContactP2pDetails( + envelope.source, + p2pAddress, + p2pPort, + envelope.isP2p + ); return this.removeFromCache(envelope); }, handleDataMessage(envelope, msg) { diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index 3bee70dbd..edf1c5fc9 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -42,11 +42,13 @@ function OutgoingMessage( this.failoverNumbers = []; this.unidentifiedDeliveries = []; - const { numberInfo, senderCertificate, online, messageType } = options || {}; + const { numberInfo, senderCertificate, online, messageType, forceP2p } = + options || {}; this.numberInfo = numberInfo; this.senderCertificate = senderCertificate; this.online = online; this.messageType = messageType || 'outgoing'; + this.forceP2p = forceP2p || false; } OutgoingMessage.prototype = { @@ -185,7 +187,13 @@ OutgoingMessage.prototype = { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { const pubKey = number; try { - await lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl); + await lokiMessageAPI.sendMessage( + pubKey, + data, + timestamp, + ttl, + this.forceP2p + ); } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { // 409 and 410 should bubble and be handled by doSendMessage