From 48e3ded9d263c20c5f92f08469747ba57b2142fd Mon Sep 17 00:00:00 2001 From: Vincent Date: Fri, 12 Jun 2020 14:03:50 +1000 Subject: [PATCH] review changes --- .../content/data/group/ClosedGroupMessage.ts | 2 +- .../content/sync/ContactSyncMessage.ts | 31 ------- .../messages/outgoing/content/sync/index.ts | 3 +- ts/session/sending/MessageQueue.ts | 72 +++++++++-------- ts/session/sending/MessageQueueInterface.ts | 5 +- ts/session/types/PubKey.ts | 5 +- ts/session/utils/SyncMessageUtils.ts | 80 +++++++++---------- .../sending/PendingMessageCache_test.ts | 32 ++++---- ts/test/test-utils/testUtils.ts | 2 +- 9 files changed, 102 insertions(+), 130 deletions(-) delete mode 100644 ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts diff --git a/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts b/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts index a1e21dd2e..ccdb8f121 100644 --- a/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts +++ b/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts @@ -8,7 +8,7 @@ interface ClosedGroupMessageParams extends MessageParams { } export abstract class ClosedGroupMessage extends DataMessage { - protected readonly groupId: string; + public readonly groupId: string; constructor(params: ClosedGroupMessageParams) { super({ diff --git a/ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts b/ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts deleted file mode 100644 index 2c9cdd3a0..000000000 --- a/ts/session/messages/outgoing/content/sync/ContactSyncMessage.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { SignalService } from '../../../../../protobuf'; -import { MessageParams } from '../../Message'; -import { SyncMessage } from '../'; -import { PubKey } from '../../../../types'; -import { DataMessage } from '../data'; - - -interface ContactSyncMessageParams extends MessageParams { - // Send to our devices - contacts: Array; - dataMessage?: DataMessage; -} - -export class ContactSyncMessage extends SyncMessage { - private readonly contacts: Array; - private readonly dataMessage?: DataMessage; - - constructor(params: ContactSyncMessageParams) { - super(params); - - // Stubbed for now - this.contacts = params.contacts; - this.dataMessage = params.dataMessage; - - this.syncProto(); - } - - protected syncProto(): SignalService.SyncMessage { - return new SignalService.SyncMessage(); - } -} diff --git a/ts/session/messages/outgoing/content/sync/index.ts b/ts/session/messages/outgoing/content/sync/index.ts index d17d62a83..804f4f561 100644 --- a/ts/session/messages/outgoing/content/sync/index.ts +++ b/ts/session/messages/outgoing/content/sync/index.ts @@ -1,4 +1,3 @@ import { SyncMessage } from './SyncMessage'; -import { ContactSyncMessage } from './ContactSyncMessage'; -export { ContactSyncMessage, SyncMessage }; +export { SyncMessage }; diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 7b029a933..c53345fa9 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -1,5 +1,5 @@ import * as _ from 'lodash'; -import * as Data from '../../../js/modules/data'; +import { getPairedDevicesFor } from '../../../js/modules/data'; import { ConversationController } from '../../window'; import { EventEmitter } from 'events'; @@ -14,14 +14,11 @@ import { SessionRequestMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { - JobQueue, - SyncMessageUtils, - TypedEventEmitter, -} from '../utils'; +import { JobQueue, SyncMessageUtils, TypedEventEmitter } from '../utils'; import { PubKey } from '../types'; import { MessageSender } from '.'; import { SessionProtocol } from '../protocols'; +import * as UserUtils from '../../util/user'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; @@ -35,7 +32,7 @@ export class MessageQueue implements MessageQueueInterface { } public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) { - const userLinked = await Data.getPairedDevicesFor(user.key); + const userLinked = await getPairedDevicesFor(user.key); const userDevices = userLinked.map(d => new PubKey(d)); await this.sendMessageToDevices(userDevices, message); @@ -53,23 +50,34 @@ export class MessageQueue implements MessageQueueInterface { // Sync to our devices if syncable if (SyncMessageUtils.canSync(message)) { + const currentDevice = await UserUtils.getCurrentDevicePubKey(); - const ourDevices = await SyncMessageUtils.getOurPairedDevices(); - await this.sendSyncMessage(message, ourDevices); + if (currentDevice) { + const otherDevices = await getPairedDevicesFor(currentDevice); - // Remove our devices from currentDevices - const ourDeviceContacts = ourDevices.map(device => ConversationController.get(device.key)); - currentDevices = _.xor(currentDevices, ourDeviceContacts); + const ourDevices = [currentDevice, ...otherDevices].map( + device => new PubKey(device) + ); + await this.sendSyncMessage(message, ourDevices); + + // Remove our devices from currentDevices + const ourDeviceContacts = ourDevices.map(device => + ConversationController.get(device.key) + ); + currentDevices = _.xor(currentDevices, ourDeviceContacts); + } } const promises = currentDevices.map(async device => { - await this.queue(device, message); + await this.process(device, message); }); return Promise.all(promises); } - public async sendToGroup(message: OpenGroupMessage | ContentMessage): Promise { + public async sendToGroup( + message: OpenGroupMessage | ContentMessage + ): Promise { if ( !(message instanceof OpenGroupMessage) && !(message instanceof ClosedGroupMessage) @@ -102,15 +110,12 @@ export class MessageQueue implements MessageQueueInterface { return false; } - public async sendSyncMessage( - message: ContentMessage, - sendTo: Array - ) { + public async sendSyncMessage(message: ContentMessage, sendTo: Array) { // Sync with our devices const promises = sendTo.map(async device => { - const syncMessage = await SyncMessageUtils.from(message, device); + const syncMessage = await SyncMessageUtils.from(message); - return this.queue(device, syncMessage); + return this.process(device, syncMessage); }); return Promise.all(promises); @@ -130,17 +135,19 @@ export class MessageQueue implements MessageQueueInterface { } const jobQueue = this.getJobQueue(device); - messages.forEach(message => { - if (!jobQueue.has(message.identifier)) { - const promise = jobQueue.add(async () => MessageSender.send(message)); + messages.forEach(async message => { + const messageId = String(message.timestamp); - promise - .then(() => { - // Message sent; remove from cache - void this.pendingMessageCache.remove(message); - }) - // Message failed to send - .catch(() => null); + if (!jobQueue.has(messageId)) { + try { + await jobQueue.addWithId(messageId, async () => + MessageSender.send(message) + ); + void this.pendingMessageCache.remove(message); + this.events.emit('success', message); + } catch (e) { + this.events.emit('fail', message, e); + } } }); } @@ -152,8 +159,8 @@ export class MessageQueue implements MessageQueueInterface { return Promise.all(promises); } - private async queue(device: PubKey, message: ContentMessage) { - if (message instanceof SessionRequestMessage) { + private async process(device: PubKey, message?: ContentMessage) { + if (!message || message instanceof SessionRequestMessage) { return; } @@ -170,5 +177,4 @@ export class MessageQueue implements MessageQueueInterface { return queue; } - } diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index b4b374569..9f58357a9 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -19,5 +19,8 @@ export interface MessageQueueInterface { sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; - sendSyncMessage(message: ContentMessage, sendTo: Array): Promise>; + sendSyncMessage( + message: ContentMessage, + sendTo: Array + ): Promise>; } diff --git a/ts/session/types/PubKey.ts b/ts/session/types/PubKey.ts index fc49b5472..a9c84773b 100644 --- a/ts/session/types/PubKey.ts +++ b/ts/session/types/PubKey.ts @@ -1,7 +1,8 @@ export class PubKey { public static readonly PUBKEY_LEN = 66; - private static readonly regex: RegExp = new RegExp(`^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - - 2}}$`); + private static readonly regex: RegExp = new RegExp( + `^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - 2}}$` + ); public readonly key: string; constructor(pubkeyString: string) { diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts index 6052a69cc..e14c79c24 100644 --- a/ts/session/utils/SyncMessageUtils.ts +++ b/ts/session/utils/SyncMessageUtils.ts @@ -1,53 +1,54 @@ -import { - ContactSyncMessage, - ContentMessage, - SyncMessageEnum, -} from '../messages/outgoing'; -import { PubKey } from '../types'; - import * as _ from 'lodash'; -import * as Data from '../../../js/modules/data'; -import { ConversationController, textsecure, Whisper } from '../../window'; -import { OpenGroup } from '../types/OpenGroup'; +import * as UserUtils from '../../util/user'; +import { + getAllConversations, + getPrimaryDeviceFor, +} from '../../../js/modules/data'; +import { ConversationController, Whisper } from '../../window'; +import { ContentMessage, SyncMessage } from '../messages/outgoing'; export async function from( - message: ContentMessage, - sendTo: PubKey | OpenGroup, - syncType: SyncMessageEnum.CONTACTS | SyncMessageEnum.GROUPS = SyncMessageEnum.CONTACTS -): Promise { - const { timestamp, identifier } = message; + message: ContentMessage +): Promise { + // const { timestamp, identifier } = message; // Stubbed for now - return new ContactSyncMessage({ - timestamp, - identifier, - contacts: [], - }); + return undefined; } export async function canSync(message: ContentMessage): Promise { // This function should be agnostic to the device; it shouldn't need // to know about the recipient - // return Boolean(from(message, device)); + // return Boolean(from(message)); // Stubbed for now return true; } -export async function getSyncContacts(): Promise> { - const thisDevice = textsecure.storage.user.getNumber(); - const primaryDevice = await Data.getPrimaryDeviceFor(thisDevice); - const conversations = await Data.getAllConversations({ ConversationCollection: Whisper.ConversationCollection }); +export async function getSyncContacts(): Promise | undefined> { + const thisDevice = await UserUtils.getCurrentDevicePubKey(); + + if (!thisDevice) { + return []; + } + + const primaryDevice = await getPrimaryDeviceFor(thisDevice); + const conversations = await getAllConversations({ + ConversationCollection: Whisper.ConversationCollection, + }); // We are building a set of all contacts - const primaryContacts = conversations.filter(c => - c.isPrivate() && - !c.isOurLocalDevice() && - c.isFriend() && - !c.attributes.secondaryStatus - ) || []; + const primaryContacts = + conversations.filter( + c => + c.isPrivate() && + !c.isOurLocalDevice() && + c.isFriend() && + !c.attributes.secondaryStatus + ) || []; - const secondaryContactsPartial = conversations.filter(c => + const secondaryContactsPartial = conversations.filter( + c => c.isPrivate() && !c.isOurLocalDevice() && c.isFriend() && @@ -66,15 +67,8 @@ export async function getSyncContacts(): Promise> { .filter(c => c.id !== primaryDevice); // Return unique contacts - return _.uniqBy([ - ...primaryContacts, - ...secondaryContacts, - ], device => !!device); -} - -export async function getOurPairedDevices(): Promise> { - const ourPubKey = textsecure.storage.user.getNumber(); - const ourDevices = await Data.getPairedDevicesFor(ourPubKey); - - return ourDevices.map(device => new PubKey(device)); + return _.uniqBy( + [...primaryContacts, ...secondaryContacts], + device => !!device + ); } diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index d732d859e..05b2d7b87 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import * as _ from 'lodash'; -import * as MessageUtils from '../../../session/utils'; +import { MessageUtils } from '../../../session/utils'; import { TestUtils } from '../../../test/test-utils'; import { PendingMessageCache } from '../../../session/sending/PendingMessageCache'; @@ -53,7 +53,7 @@ describe('PendingMessageCache', () => { it('can add to cache', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); await pendingMessageCacheStub.add(device, message); @@ -70,7 +70,7 @@ describe('PendingMessageCache', () => { it('can remove from cache', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); await pendingMessageCacheStub.add(device, message); @@ -91,15 +91,15 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; @@ -123,11 +123,11 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; @@ -150,7 +150,7 @@ describe('PendingMessageCache', () => { it('can find nothing when empty', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); const foundMessage = pendingMessageCacheStub.find(rawMessage); @@ -159,7 +159,7 @@ describe('PendingMessageCache', () => { it('can find message in cache', async () => { const device = TestUtils.generateFakePubkey(); - const message = TestUtils.generateUniqueChatMessage(); + const message = TestUtils.generateChatMessage(); const rawMessage = MessageUtils.toRawMessage(device, message); await pendingMessageCacheStub.add(device, message); @@ -176,15 +176,15 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; @@ -206,15 +206,15 @@ describe('PendingMessageCache', () => { const cacheItems = [ { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, { device: TestUtils.generateFakePubkey(), - message: TestUtils.generateUniqueChatMessage(), + message: TestUtils.generateChatMessage(), }, ]; diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts index a8ced37d5..4cfbc50ac 100644 --- a/ts/test/test-utils/testUtils.ts +++ b/ts/test/test-utils/testUtils.ts @@ -55,7 +55,7 @@ export function generateFakePubkey(): PubKey { return new PubKey(pubkeyString); } -export function generateUniqueChatMessage(): ChatMessage { +export function generateChatMessage(): ChatMessage { return new ChatMessage({ body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', identifier: uuid(),