merge w cache

This commit is contained in:
Vincent 2020-06-09 07:19:33 +10:00
commit 5f38e216a5
3 changed files with 105 additions and 39 deletions

View file

@ -1,5 +1,9 @@
import { createOrUpdateItem, getItemById } from '../../../js/modules/data';
import { RawMessage } from '../types/RawMessage';
import {
createOrUpdateItem,
getItemById,
bulkAddItems,
} from '../../../js/modules/data';
import { RawMessage, BareRawMessage } from '../types/RawMessage';
import { ContentMessage } from '../messages/outgoing';
import { PubKey } from '../types';
import * as MessageUtils from '../utils';
@ -13,13 +17,13 @@ import * as MessageUtils from '../utils';
export class PendingMessageCache {
public readonly isReady: Promise<boolean>;
private cache: Array<any>;
private cache: Array<RawMessage>;
constructor() {
// Load pending messages from the database
// You should await isReady on making a new PendingMessageCache
// if you'd like to have instant access to the cache
this.cache = ['bleep'];
this.cache = [];
this.isReady = new Promise(async resolve => {
await this.loadFromDB();
@ -33,23 +37,14 @@ export class PendingMessageCache {
}
public getForDevice(device: PubKey): Array<RawMessage> {
const pending = this.cache.filter(m => m.device === device.key);
return pending.sort((a, b) => a.timestamp - b.timestamp);
return this.getAllPending().filter(m => m.device === device.key);
}
public getDevices(): Array<PubKey> {
// Gets all unique devices with pending messages
const pubkeyStrings = [...new Set(this.cache.map(m => m.device))];
const pubkeys: Array<PubKey> = [];
pubkeyStrings.forEach(pubkey => {
if (PubKey.validate(pubkey)) {
pubkeys.push(new PubKey(pubkey));
}
});
return pubkeys;
return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k);
}
public async add(
@ -113,34 +108,40 @@ export class PendingMessageCache {
return [];
}
const barePending = JSON.parse(String(data.value));
const barePending = JSON.parse(String(data.value)) as Array<BareRawMessage>;
const pending = barePending.map((message: any) => {
const {
identifier,
plainTextBuffer,
timestamp,
device,
ttl,
encryption,
} = message;
// Rebuild plainTextBuffer
// tslint:disable-next-line: no-unnecessary-local-variable
const pending = barePending.map((message: BareRawMessage) => {
const rebuiltMessage = { ...message };
return {
identifier,
plainTextBuffer,
timestamp,
device,
ttl,
encryption,
} as RawMessage;
// From Array<number> to ArrayBuffer
const bufferArray = Uint8Array.from(message.plainTextBuffer);
// From ArrayBuffer into Buffer
const buffer = Buffer.alloc(bufferArray.byteLength);
for (let i = 0; i < buffer.length; i++) {
buffer[i] = bufferArray[i];
}
rebuiltMessage.plainTextBuffer = buffer;
return rebuiltMessage as RawMessage;
});
return pending as Array<RawMessage>;
return pending;
}
private async saveToDB() {
// Only call when adding / removing from cache.
const encodedPendingMessages = JSON.stringify(this.cache) || '[]';
// For each plainTextBuffer in cache, save in as a simple Array<number> to avoid
// Node issues with JSON stringifying Buffer without strict typing
const encodedCache = [...this.cache].map(item => {
const plainTextBuffer = Array.from(item.plainTextBuffer);
return { ...item, plainTextBuffer };
});
const encodedPendingMessages = JSON.stringify(encodedCache) || '[]';
await createOrUpdateItem({
id: 'pendingMessages',
value: encodedPendingMessages,

View file

@ -10,3 +10,12 @@ export interface RawMessage {
ttl: number;
encryption: EncryptionType;
}
export interface BareRawMessage {
identifier: string;
plainTextBuffer: any;
timestamp: number;
device: string;
ttl: number;
encryption: number;
}

View file

@ -1,5 +1,5 @@
import { expect } from 'chai';
import * as _ from 'lodash';
import * as MessageUtils from '../../../session/utils';
import { TestUtils } from '../../../test/test-utils';
import { PendingMessageCache } from '../../../session/sending/PendingMessageCache';
@ -12,14 +12,15 @@ interface StorageItem {
describe('PendingMessageCache', () => {
// Initialize new stubbed cache
let data: StorageItem;
let pendingMessageCacheStub: PendingMessageCache;
beforeEach(async () => {
// Stub out methods which touch the database
const storageID = 'pendingMessages';
let data: StorageItem = {
data = {
id: storageID,
value: '',
value: '[]',
};
TestUtils.stubData('getItemById')
@ -200,4 +201,59 @@ describe('PendingMessageCache', () => {
const finalCache = pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(0);
});
it('can restore from db', async () => {
const cacheItems = [
{
device: TestUtils.generateFakePubkey(),
message: TestUtils.generateUniqueChatMessage(),
},
{
device: TestUtils.generateFakePubkey(),
message: TestUtils.generateUniqueChatMessage(),
},
{
device: TestUtils.generateFakePubkey(),
message: TestUtils.generateUniqueChatMessage(),
},
];
cacheItems.forEach(async item => {
await pendingMessageCacheStub.add(item.device, item.message);
});
const addedMessages = pendingMessageCacheStub.getAllPending();
expect(addedMessages).to.have.length(cacheItems.length);
// Rebuild from DB
const freshCache = new PendingMessageCache();
await freshCache.isReady;
// Verify messages
const rebuiltMessages = freshCache.getAllPending();
rebuiltMessages.forEach((message, index) => {
const addedMessage = addedMessages[index];
// Pull out plainTextBuffer for a separate check
const buffersCompare =
Buffer.compare(
message.plainTextBuffer,
addedMessage.plainTextBuffer
) === 0;
expect(buffersCompare).to.equal(
true,
'buffers were not loaded properly from database'
);
// Compare all other valures
const trimmedAdded = _.omit(addedMessage, ['plainTextBuffer']);
const trimmedRebuilt = _.omit(message, ['plainTextBuffer']);
expect(_.isEqual(trimmedAdded, trimmedRebuilt)).to.equal(
true,
'cached messages were not rebuilt properly'
);
});
});
});