Merge pull request #2532 from Bilb/fix-deleted-messages-all-at-once

To merge once theming is done: handle deleted messages & deleted reacts all at once
This commit is contained in:
Audric Ackermann 2022-11-09 10:16:12 +11:00 committed by GitHub
commit 32e00227a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 605 additions and 116 deletions

View file

@ -8,7 +8,7 @@ import _ from 'lodash';
import { Data } from '../../../../data/data';
import { MessageRenderingProps } from '../../../../models/messageType';
import { getConversationController } from '../../../../session/conversations';
import { messageExpired } from '../../../../state/ducks/conversations';
import { messagesExpired } from '../../../../state/ducks/conversations';
import {
getGenericReadableMessageSelectorProps,
getIsMessageSelected,
@ -68,10 +68,12 @@ function useIsExpired(props: ExpiringProps) {
await Data.removeMessage(messageId);
if (convoId) {
dispatch(
messageExpired({
conversationKey: convoId,
messageId,
})
messagesExpired([
{
conversationKey: convoId,
messageId,
},
])
);
const convo = getConversationController().get(convoId);
convo?.updateLastMessage();

View file

@ -139,7 +139,7 @@ export const Data = {
saveMessage,
saveMessages,
removeMessage,
_removeMessages,
removeMessagesByIds,
getMessageIdsFromServerIds,
getMessageById,
getMessageBySenderAndSentAt,
@ -396,9 +396,13 @@ async function removeMessage(id: string): Promise<void> {
}
}
// Note: this method will not clean up external files, just delete from SQL
async function _removeMessages(ids: Array<string>): Promise<void> {
await channels.removeMessage(ids);
/**
* Note: this method will not clean up external files, just delete from SQL.
* Files are cleaned up on app start if they are not linked to any messages
*
*/
async function removeMessagesByIds(ids: Array<string>): Promise<void> {
await channels.removeMessagesByIds(ids);
}
async function getMessageIdsFromServerIds(
@ -644,7 +648,7 @@ async function removeAllMessagesInConversation(conversationId: string): Promise<
await Promise.all(messages.map(message => message.cleanup()));
// eslint-disable-next-line no-await-in-loop
await channels.removeMessage(ids);
await channels.removeMessagesByIds(ids);
} while (messages.length > 0);
}

View file

@ -38,7 +38,7 @@ const channelsToMake = new Set([
'saveSeenMessageHashes',
'saveMessages',
'removeMessage',
'_removeMessages',
'removeMessagesByIds',
'getUnreadByConversation',
'markAllAsReadByConversationNoExpiration',
'getUnreadCountByConversation',

View file

@ -217,8 +217,10 @@ async function start() {
);
window.log.info(`Cleanup: Found ${messagesForCleanup.length} messages for cleanup`);
const idsToCleanUp: Array<string> = [];
await Promise.all(
messagesForCleanup.map(async (message: MessageModel) => {
messagesForCleanup.map((message: MessageModel) => {
const sentAt = message.get('sent_at');
if (message.hasErrors()) {
@ -226,9 +228,12 @@ async function start() {
}
window.log.info(`Cleanup: Deleting unsent message ${sentAt}`);
await Data.removeMessage(message.id);
idsToCleanUp.push(message.id);
})
);
if (idsToCleanUp.length) {
await Data.removeMessagesByIds(idsToCleanUp);
}
window.log.info('Cleanup: complete');
window.log.info('listening for registration events');

View file

@ -249,12 +249,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
await deleteExternalFilesOfConversation(this.attributes);
}
public async onExpired(_message: MessageModel) {
await this.updateLastMessage();
// removeMessage();
}
public getGroupAdmins(): Array<string> {
const groupAdmins = this.get('groupAdmins');
@ -478,26 +472,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
return true;
}
public async onReadMessage(message: MessageModel, readAt: number) {
// We mark as read everything older than this message - to clean up old stuff
// still marked unread in the database. If the user generally doesn't read in
// the desktop app, so the desktop app only gets read syncs, we can very
// easily end up with messages never marked as read (our previous early read
// sync handling, read syncs never sent because app was offline)
// We queue it because we often get a whole lot of read syncs at once, and
// their markRead calls could very easily overlap given the async pull from DB.
// Lastly, we don't send read syncs for any message marked read due to a read
// sync. That's a notification explosion we don't need.
return this.queueJob(() =>
this.markReadBouncy(message.get('received_at') as any, {
sendReadReceipts: false,
readAt,
})
);
}
public async getUnreadCount() {
const unreadCount = await Data.getUnreadCountByConversation(this.id);
@ -1693,15 +1667,17 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
return this.get('type') === ConversationTypeEnum.GROUP;
}
public async removeMessage(messageId: any) {
public async removeMessage(messageId: string) {
await Data.removeMessage(messageId);
this.updateLastMessage();
window.inboxStore?.dispatch(
conversationActions.messageDeleted({
conversationKey: this.id,
messageId,
})
conversationActions.messagesDeleted([
{
conversationKey: this.id,
messageId,
},
])
);
}

View file

@ -125,7 +125,6 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
throw new Error('A message always needs to have an conversationId.');
}
// this.on('expired', this.onExpired);
if (!attributes.skipTimerInit) {
void this.setToExpire();
}

View file

@ -945,21 +945,44 @@ function saveMessages(arrayOfMessages: Array<any>) {
}
function removeMessage(id: string, instance?: BetterSqlite3.Database) {
if (!Array.isArray(id)) {
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id = $id;`)
.run({ id });
if (!isString(id)) {
throw new Error('removeMessage: only takes single message to delete!');
return;
}
if (!id.length) {
throw new Error('removeMessages: No ids to delete!');
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id = $id;`)
.run({ id });
}
function removeMessagesByIds(ids: Array<string>, instance?: BetterSqlite3.Database) {
if (!Array.isArray(ids)) {
throw new Error('removeMessagesByIds only allowed an array of strings');
}
if (!ids.length) {
throw new Error('removeMessagesByIds: No ids to delete!');
}
// Our node interface doesn't seem to allow you to replace one single ? with an array
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id IN ( ${id.map(() => '?').join(', ')} );`)
.run(id);
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id IN ( ${ids.map(() => '?').join(', ')} );`)
.run(ids);
}
function removeAllMessagesInConversation(
conversationId: string,
instance?: BetterSqlite3.Database
) {
if (!conversationId) {
return;
}
// Our node interface doesn't seem to allow you to replace one single ? with an array
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE conversationId = $conversationId`)
.run({ conversationId });
}
function getMessageIdsFromServerIds(serverIds: Array<string | number>, conversationId: string) {
@ -2440,6 +2463,8 @@ export const sqlNode = {
updateLastHash,
saveMessages,
removeMessage,
removeMessagesByIds,
removeAllMessagesInConversation,
getUnreadByConversation,
markAllAsReadByConversationNoExpiration,
getUnreadCountByConversation,

View file

@ -1,4 +1,4 @@
import _, { compact, isArray, isNumber, isObject, pick } from 'lodash';
import _, { compact, isArray, isEmpty, isNumber, isObject, pick } from 'lodash';
import { OpenGroupData } from '../../../../data/opengroups';
import { handleOpenGroupV4Message } from '../../../../receiver/opengroup';
import { OpenGroupRequestCommonType } from '../opengroupV2/ApiUtil';
@ -35,6 +35,8 @@ import { ConversationTypeEnum } from '../../../../models/conversationAttributes'
import { createSwarmMessageSentFromUs } from '../../../../models/messageFactory';
import { Data } from '../../../../data/data';
import { processMessagesUsingCache } from './sogsV3MutationCache';
import { destroyMessagesAndUpdateRedux } from '../../../../util/expiringMessages';
import { sogsRollingDeletions } from './sogsRollingDeletions';
/**
* Get the convo matching those criteria and make sure it is an opengroup convo, or return null.
@ -159,34 +161,38 @@ const handleSogsV3DeletedMessages = async (
serverUrl: string,
roomId: string
) => {
const deletions = messages.filter(m => Boolean(m.deleted));
const exceptDeletion = messages.filter(m => !m.deleted);
if (!deletions.length) {
return messages;
const messagesDeleted = messages.filter(m => Boolean(m.deleted));
const messagesWithoutDeleted = messages.filter(m => !m.deleted);
if (!messagesDeleted.length) {
return messagesWithoutDeleted;
}
const allIdsRemoved = deletions.map(m => m.id);
const allIdsRemoved = messagesDeleted.map(m => m.id);
try {
const convoId = getOpenGroupV2ConversationId(serverUrl, roomId);
const convo = getConversationController().get(convoId);
const messageIds = await Data.getMessageIdsFromServerIds(allIdsRemoved, convo.id);
// we shouldn't get too many messages to delete at a time, so no need to add a function to remove multiple messages for now
allIdsRemoved.forEach(removedId => {
sogsRollingDeletions.addMessageDeletedId(convoId, removedId);
});
await Promise.all(
(messageIds || []).map(async id => {
if (convo) {
await convo.removeMessage(id);
}
await Data.removeMessage(id);
})
);
if (messageIds && messageIds.length) {
await destroyMessagesAndUpdateRedux(
messageIds.map(messageId => ({
conversationKey: convoId,
messageId,
}))
);
}
} catch (e) {
window?.log?.warn('handleDeletions failed:', e);
}
return exceptDeletion;
return messagesWithoutDeleted;
};
// tslint:disable-next-line: cyclomatic-complexity
// tslint:disable-next-line: max-func-body-length cyclomatic-complexity
const handleMessagesResponseV4 = async (
messages: Array<OpenGroupMessageV4>,
serverUrl: string,
@ -284,6 +290,7 @@ const handleMessagesResponseV4 = async (
const incomingMessageSeqNo = compact(messages.map(n => n.seqno));
const maxNewMessageSeqNo = Math.max(...incomingMessageSeqNo);
for (let index = 0; index < messagesWithResolvedBlindedIdsIfFound.length; index++) {
const msgToHandle = messagesWithResolvedBlindedIdsIfFound[index];
try {
@ -309,13 +316,24 @@ const handleMessagesResponseV4 = async (
await OpenGroupData.saveV2OpenGroupRoom(roomInfosRefreshed);
const messagesWithReactions = messages.filter(m => m.reactions !== undefined);
if (messagesWithReactions.length > 0) {
const conversationId = getOpenGroupV2ConversationId(serverUrl, roomId);
const groupConvo = getConversationController().get(conversationId);
if (groupConvo && groupConvo.isOpenGroupV2()) {
for (const message of messagesWithReactions) {
for (const messageWithReaction of messagesWithReactions) {
if (isEmpty(messageWithReaction.reactions)) {
/*
* When a message is deleted from the server, we get the deleted event as a data: null on the message itself
* and an update on its reactions.
* But, because we just deleted that message, we can skip trying to update its reactions: it's not in the DB anymore.
*/
if (sogsRollingDeletions.hasMessageDeletedId(conversationId, messageWithReaction.id)) {
continue;
}
}
void groupConvo.queueJob(async () => {
await processMessagesUsingCache(serverUrl, roomId, message);
await processMessagesUsingCache(serverUrl, roomId, messageWithReaction);
});
}
}
@ -526,6 +544,7 @@ export const handleBatchPollResults = async (
break;
case 'pollInfo':
await handlePollInfoResponse(subResponse.code, subResponse.body, serverUrl);
break;
case 'inbox':
await handleInboxOutboxMessages(subResponse.body, serverUrl, false);

View file

@ -0,0 +1,48 @@
import { RingBuffer } from '../../../utils/RingBuffer';
const rollingDeletedMessageIds: Map<string, RingBuffer<number>> = new Map();
const addMessageDeletedId = (conversationId: string, messageDeletedId: number) => {
if (!rollingDeletedMessageIds.has(conversationId)) {
rollingDeletedMessageIds.set(
conversationId,
new RingBuffer<number>(sogsRollingDeletions.getPerRoomCount())
);
}
const ringBuffer = rollingDeletedMessageIds.get(conversationId);
if (!ringBuffer) {
return;
}
ringBuffer.insert(messageDeletedId);
};
const hasMessageDeletedId = (conversationId: string, messageDeletedId: number) => {
if (!rollingDeletedMessageIds.has(conversationId)) {
return false;
}
const messageIdWasDeletedRecently = rollingDeletedMessageIds
?.get(conversationId)
?.has(messageDeletedId);
return messageIdWasDeletedRecently;
};
/**
* emptyMessageDeleteIds should only be used for testing purposes.
*/
const emptyMessageDeleteIds = () => {
rollingDeletedMessageIds.clear();
};
export const sogsRollingDeletions = {
addMessageDeletedId,
hasMessageDeletedId,
emptyMessageDeleteIds,
getPerRoomCount,
};
// keep 2000 deleted message ids in memory
function getPerRoomCount() {
return 2000;
}

View file

@ -0,0 +1,76 @@
/**
* This ringbuffer class can be used to keep a list of at most a size and removing old items first when the size is exceeded.
* Internally, it uses an array to keep track of the order, so two times the same item can exist in it.
*
*/
export class RingBuffer<T> {
private newest = -1;
private oldest = 0;
private buffer: Array<T> = [];
private readonly capacity: number;
constructor(capacity: number) {
this.capacity = capacity;
}
public getCapacity(): number {
return this.capacity;
}
public getLength(): number {
if (this.isEmpty()) {
return 0;
}
// When only one item was added, newest = 0 and oldest = 0.
// When more than one item was added, but less than capacity, newest = nbItemsAdded & oldest = 0.
// As soon as we overflow, oldest is incremented to oldest+1 and newest rolls back to 0,
// so this test fails here and we have to extract the length based on the two parts instead.
if (this.newest >= this.oldest) {
return this.newest + 1;
}
const firstPart = this.capacity - this.oldest;
const secondPart = this.newest + 1;
return firstPart + secondPart;
}
public insert(item: T) {
// see comments in `getLength()`
this.newest = (this.newest + 1) % this.capacity;
if (this.buffer.length >= this.capacity) {
this.oldest = (this.oldest + 1) % this.capacity;
}
this.buffer[this.newest] = item;
}
public has(item: T) {
// no items at all
if (this.isEmpty()) {
return false;
}
return this.toArray().includes(item);
}
public isEmpty() {
return this.newest === -1;
}
public clear() {
this.buffer = [];
this.newest = -1;
this.oldest = 0;
}
public toArray(): Array<T> {
if (this.isEmpty()) {
return [];
}
if (this.newest >= this.oldest) {
return this.buffer.slice(0, this.newest + 1);
}
const firstPart = this.buffer.slice(this.oldest, this.capacity);
const secondPart = this.buffer.slice(0, this.newest + 1);
return [...firstPart, ...secondPart];
}
}

View file

@ -504,12 +504,12 @@ function handleMessagesChangedOrAdded(
function handleMessageExpiredOrDeleted(
state: ConversationsStateType,
action: PayloadAction<{
payload: {
messageId: string;
conversationKey: string;
}>
): ConversationsStateType {
const { conversationKey, messageId } = action.payload;
}
) {
const { conversationKey, messageId } = payload;
if (conversationKey === state.selectedConversation) {
// search if we find this message id.
// we might have not loaded yet, so this case might not happen
@ -539,6 +539,23 @@ function handleMessageExpiredOrDeleted(
return state;
}
function handleMessagesExpiredOrDeleted(
state: ConversationsStateType,
action: PayloadAction<
Array<{
messageId: string;
conversationKey: string;
}>
>
): ConversationsStateType {
action.payload.forEach(element => {
// tslint:disable-next-line: no-parameter-reassignment
state = handleMessageExpiredOrDeleted(state, element);
});
return state;
}
function handleConversationReset(state: ConversationsStateType, action: PayloadAction<string>) {
const conversationKey = action.payload;
if (conversationKey === state.selectedConversation) {
@ -670,24 +687,28 @@ const conversationsSlice = createSlice({
return handleMessagesChangedOrAdded(state, action.payload);
},
messageExpired(
messagesExpired(
state: ConversationsStateType,
action: PayloadAction<{
messageId: string;
conversationKey: string;
}>
action: PayloadAction<
Array<{
messageId: string;
conversationKey: string;
}>
>
) {
return handleMessageExpiredOrDeleted(state, action);
return handleMessagesExpiredOrDeleted(state, action);
},
messageDeleted(
messagesDeleted(
state: ConversationsStateType,
action: PayloadAction<{
messageId: string;
conversationKey: string;
}>
action: PayloadAction<
Array<{
messageId: string;
conversationKey: string;
}>
>
) {
return handleMessageExpiredOrDeleted(state, action);
return handleMessagesExpiredOrDeleted(state, action);
},
conversationReset(state: ConversationsStateType, action: PayloadAction<string>) {
@ -985,8 +1006,8 @@ export const {
conversationsChanged,
conversationRemoved,
removeAllConversations,
messageExpired,
messageDeleted,
messagesExpired,
messagesDeleted,
conversationReset,
messagesChanged,
resetOldTopMessageId,

View file

@ -0,0 +1,71 @@
import { expect } from 'chai';
import Sinon from 'sinon';
import { sogsRollingDeletions } from '../../../../session/apis/open_group_api/sogsv3/sogsRollingDeletions';
describe('sogsRollingDeletions', () => {
beforeEach(() => {
sogsRollingDeletions.emptyMessageDeleteIds();
Sinon.stub(sogsRollingDeletions, 'getPerRoomCount').returns(5);
});
afterEach(() => {
Sinon.restore();
});
it('no items at all returns false', () => {
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 1)).to.be.equal(
false,
'1 should not be there'
);
});
it('no items in that convo returns false', () => {
sogsRollingDeletions.addMessageDeletedId('convo1', 1);
expect(sogsRollingDeletions.hasMessageDeletedId('convo2', 1)).to.be.equal(
false,
'1 should not be there'
);
});
it('can add 1 item', () => {
sogsRollingDeletions.addMessageDeletedId('convo1', 1);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 1)).to.be.equal(
true,
'1 should be there'
);
});
it('can add more than capacity items', () => {
sogsRollingDeletions.addMessageDeletedId('convo1', 1);
sogsRollingDeletions.addMessageDeletedId('convo1', 2);
sogsRollingDeletions.addMessageDeletedId('convo1', 3);
sogsRollingDeletions.addMessageDeletedId('convo1', 4);
sogsRollingDeletions.addMessageDeletedId('convo1', 5);
sogsRollingDeletions.addMessageDeletedId('convo1', 6);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 1)).to.be.equal(
false,
'1 should not be there'
);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 2)).to.be.equal(
true,
'2 should be there'
);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 3)).to.be.equal(
true,
'3 should be there'
);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 4)).to.be.equal(
true,
'4 should be there'
);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 5)).to.be.equal(
true,
'5 should be there'
);
expect(sogsRollingDeletions.hasMessageDeletedId('convo1', 6)).to.be.equal(
true,
'6 should be there'
);
});
});

View file

@ -0,0 +1,224 @@
// tslint:disable: no-implicit-dependencies max-func-body-length no-unused-expression no-require-imports no-var-requires
import chai from 'chai';
import { RingBuffer } from '../../../../session/utils/RingBuffer';
const { expect } = chai;
describe('RingBuffer Utils', () => {
it('gets created with right capacity', () => {
const ring = new RingBuffer<number>(5000);
expect(ring.getCapacity()).to.equal(5000);
expect(ring.getLength()).to.equal(0);
expect(ring.has(0)).to.equal(false, '0 should not be there');
});
describe('length & capacity are right', () => {
it('length is right 0', () => {
const ring = new RingBuffer<number>(4);
expect(ring.getLength()).to.equal(0);
});
it('length is right 1', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
expect(ring.getLength()).to.equal(1);
});
it('length is right 4', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
expect(ring.getLength()).to.equal(4);
});
it('capacity does not get exceeded', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(4);
expect(ring.getLength()).to.equal(4);
});
});
describe('isEmpty is correct', () => {
it('no items', () => {
const ring = new RingBuffer<number>(4);
expect(ring.isEmpty()).to.equal(true, 'no items isEmpty should be true');
});
it('length is right 1', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
expect(ring.isEmpty()).to.equal(false, '1 item isEmpty should be false');
});
it('length is right 4', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
expect(ring.isEmpty()).to.equal(false, '4 items isEmpty should be false');
});
it('more than capacity', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(4);
expect(ring.isEmpty()).to.equal(false, '5 item isEmpty should be false');
});
});
it('items are removed in order 1', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(4);
expect(ring.has(0)).to.equal(false, '0 should not be there anymore');
expect(ring.has(1)).to.equal(true, '1 should still be there');
expect(ring.has(2)).to.equal(true, '2 should still be there');
expect(ring.has(3)).to.equal(true, '3 should still be there');
expect(ring.has(4)).to.equal(true, '4 should still be there');
});
it('two times the same items can exist', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(1);
ring.insert(4);
expect(ring.has(0)).to.equal(false, '0 should not be there anymore');
expect(ring.has(1)).to.equal(true, '1 should still be there');
expect(ring.has(2)).to.equal(true, '2 should still be there');
expect(ring.has(3)).to.equal(false, '3 should not be there');
expect(ring.has(4)).to.equal(true, '4 should still be there');
});
it('items are removed in order completely', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(10);
ring.insert(20);
ring.insert(30);
ring.insert(40);
expect(ring.has(0)).to.equal(false, '0 should not be there anymore');
expect(ring.has(1)).to.equal(false, '1 should not be there');
expect(ring.has(2)).to.equal(false, '2 should not be there');
expect(ring.has(3)).to.equal(false, '3 should not be there');
expect(ring.has(4)).to.equal(false, '4 should not be there');
expect(ring.has(10)).to.equal(true, '10 should still be there');
expect(ring.has(20)).to.equal(true, '20 should still be there');
expect(ring.has(30)).to.equal(true, '30 should still be there');
expect(ring.has(40)).to.equal(true, '40 should still be there');
});
it('clear empties the list but keeps the capacity', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(1);
expect(ring.getLength()).to.equal(4);
expect(ring.getCapacity()).to.equal(4);
ring.clear();
expect(ring.getCapacity()).to.equal(4);
expect(ring.getLength()).to.equal(0);
});
describe('toArray', () => {
it('empty buffer', () => {
const ring = new RingBuffer<number>(4);
expect(ring.toArray()).to.deep.eq([]);
});
it('with 1', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
expect(ring.toArray()).to.deep.eq([0]);
});
it('with 4', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
expect(ring.toArray()).to.deep.eq([0, 1, 2, 3]);
});
it('with 5', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(4);
expect(ring.toArray()).to.deep.eq([1, 2, 3, 4]);
});
it('more than 2 full laps erasing data', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(4); // first lap first item
ring.insert(5);
ring.insert(6); // first item in toArray should be this one
ring.insert(7);
ring.insert(8); // second lap first item
ring.insert(9);
expect(ring.toArray()).to.deep.eq([6, 7, 8, 9]);
});
});
describe('clear', () => {
it('empty buffer', () => {
const ring = new RingBuffer<number>(4);
ring.clear();
expect(ring.getCapacity()).to.deep.eq(4);
expect(ring.getLength()).to.deep.eq(0);
});
it('with 1', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.clear();
expect(ring.getCapacity()).to.deep.eq(4);
expect(ring.getLength()).to.deep.eq(0);
});
it('with 5', () => {
const ring = new RingBuffer<number>(4);
ring.insert(0);
ring.insert(1);
ring.insert(2);
ring.insert(3);
ring.insert(4);
ring.clear();
expect(ring.getCapacity()).to.deep.eq(4);
expect(ring.getLength()).to.deep.eq(0);
});
});
});

View file

@ -1,42 +1,61 @@
import _ from 'lodash';
import { throttle, uniq } from 'lodash';
import moment from 'moment';
import { MessageModel } from '../models/message';
import { messageExpired } from '../state/ducks/conversations';
import { messagesExpired } from '../state/ducks/conversations';
import { TimerOptionsArray } from '../state/ducks/timerOptions';
import { LocalizerKeys } from '../types/LocalizerKeys';
import { initWallClockListener } from './wallClockListener';
import { Data } from '../data/data';
import { getConversationController } from '../session/conversations';
export async function destroyMessagesAndUpdateRedux(
messages: Array<{
conversationKey: string;
messageId: string;
}>
) {
if (!messages.length) {
return;
}
const conversationWithChanges = uniq(messages.map(m => m.conversationKey));
try {
// Delete all thoses messages in a single sql call
await Data.removeMessagesByIds(messages.map(m => m.messageId));
} catch (e) {
window.log.error('destroyMessages: removeMessagesByIds failed', e && e.message ? e.message : e);
}
// trigger a redux update if needed for all those messages
window.inboxStore?.dispatch(messagesExpired(messages));
// trigger a refresh the last message for all those uniq conversation
conversationWithChanges.map(convoIdToUpdate => {
getConversationController()
.get(convoIdToUpdate)
?.updateLastMessage();
});
}
async function destroyExpiredMessages() {
try {
window.log.info('destroyExpiredMessages: Loading messages...');
const messages = await Data.getExpiredMessages();
await Promise.all(
messages.map(async (message: MessageModel) => {
window.log.info('Message expired', {
sentAt: message.get('sent_at'),
});
const messagesExpiredDetails: Array<{
conversationKey: string;
messageId: string;
}> = messages.map(m => ({
conversationKey: m.attributes.conversationId,
messageId: m.id,
}));
// We delete after the trigger to allow the conversation time to process
// the expiration before the message is removed from the database.
await Data.removeMessage(message.id);
messages.map(expired => {
window.log.info('Message expired', {
sentAt: expired.get('sent_at'),
});
});
// trigger the expiration of the message on the redux itself.
window.inboxStore?.dispatch(
messageExpired({
conversationKey: message.attributes.conversationId,
messageId: message.id,
})
);
const conversation = message.getConversation();
if (conversation) {
await conversation.onExpired(message);
}
})
);
await destroyMessagesAndUpdateRedux(messagesExpiredDetails);
} catch (error) {
window.log.error(
'destroyExpiredMessages: Error deleting expired messages',
@ -81,7 +100,7 @@ async function checkExpiringMessages() {
}
timeout = global.setTimeout(destroyExpiredMessages, wait);
}
const throttledCheckExpiringMessages = _.throttle(checkExpiringMessages, 1000);
const throttledCheckExpiringMessages = throttle(checkExpiringMessages, 1000);
let isInit = false;