reduce number of commits during opengroup handling of message

This commit is contained in:
Audric Ackermann 2022-04-13 16:51:53 +10:00
parent 062db5caab
commit 01bb200b24
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4
8 changed files with 114 additions and 132 deletions

View File

@ -104,12 +104,11 @@ export const ReadableMessage = (props: ReadableMessageProps) => {
// make sure the app is focused, because we mark message as read here
if (inView === true && isAppFocused) {
dispatch(showScrollToBottomButton(false));
void getConversationController()
getConversationController()
.get(selectedConversationKey)
?.markRead(receivedAt || 0)
.then(() => {
dispatch(markConversationFullyRead(selectedConversationKey));
});
?.markRead(receivedAt || 0);
dispatch(markConversationFullyRead(selectedConversationKey));
} else if (inView === false) {
dispatch(showScrollToBottomButton(true));
}

View File

@ -196,7 +196,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
public updateLastMessage: () => any;
public throttledBumpTyping: () => void;
public throttledNotify: (message: MessageModel) => void;
public markRead: (newestUnreadDate: number, providedOptions?: any) => Promise<void>;
public markRead: (newestUnreadDate: number, providedOptions?: any) => void;
public initialPromise: any;
private typingRefreshTimer?: NodeJS.Timeout | null;
@ -226,8 +226,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
leading: true,
trailing: true,
});
// tslint:disable-next-line: no-async-without-await
this.markRead = async (newestUnreadDate: number) => {
this.markRead = (newestUnreadDate: number) => {
const lastReadTimestamp = this.lastReadTimestamp;
if (newestUnreadDate > lastReadTimestamp) {
this.lastReadTimestamp = newestUnreadDate;
@ -901,8 +900,9 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
receivedAt?: number, // is set if it comes from outside
options: {
fromSync?: boolean;
} = {}
) {
} = {},
shouldCommit = true
): Promise<void> {
let expireTimer = providedExpireTimer;
let source = providedSource;
@ -912,7 +912,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
expireTimer = 0;
}
if (this.get('expireTimer') === expireTimer || (!expireTimer && !this.get('expireTimer'))) {
return null;
return;
}
window?.log?.info("Update conversation 'expireTimer'", {
@ -964,12 +964,13 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
this.set('active_at', timestamp);
}
// tell the UI this conversation was updated
await this.commit();
if (shouldCommit) {
// tell the UI this conversation was updated
await this.commit();
}
// if change was made remotely, don't send it to the number/group
if (receivedAt) {
return message;
return;
}
const expireUpdate = {
@ -998,7 +999,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
await getMessageQueue().sendToGroup(expirationTimerMessage);
}
return message;
return;
}
public triggerUIRefresh() {
@ -1577,7 +1578,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
});
}
public async notifyTyping({ isTyping, sender }: any) {
public async notifyTypingNoCommit({ isTyping, sender }: { isTyping: boolean; sender: string }) {
// We don't do anything with typing messages from our other devices
if (UserUtils.isUsFromCache(sender)) {
return;
@ -1588,32 +1589,15 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
return;
}
const wasTyping = !!this.typingTimer;
if (this.typingTimer) {
global.clearTimeout(this.typingTimer);
this.typingTimer = null;
}
// Note: We trigger two events because:
// 'change' causes a re-render of this conversation's list item in the left pane
if (isTyping) {
this.typingTimer = global.setTimeout(
this.clearContactTypingTimer.bind(this, sender),
15 * 1000
);
if (!wasTyping) {
// User was not previously typing before. State change!
await this.commit();
}
} else {
this.typingTimer = null;
if (wasTyping) {
// User was previously typing, and is no longer. State change!
await this.commit();
}
}
// we do not trigger a state change here, instead we rely on the caller to do the commit once it is done with the queue of messages
this.typingTimer = isTyping
? global.setTimeout(this.clearContactTypingTimer.bind(this, sender), 15 * 1000)
: null;
}
private async addSingleMessage(messageAttributes: MessageAttributesOptionals) {

View File

@ -1471,6 +1471,7 @@ async function initializeSql({
vacuumDatabase(db);
const msgCount = getMessageCount();
const convoCount = getConversationCount();
console.info('total message count: ', msgCount);
console.info('total conversation count: ', convoCount);
} catch (error) {
@ -2234,7 +2235,7 @@ function getMessageBySenderAndTimestamp({
function filterAlreadyFetchedOpengroupMessage(
msgDetails: Array<{ sender: string; serverTimestamp: number }> // MsgDuplicateSearchOpenGroup
) {
): Array<{ sender: string; serverTimestamp: number }> {
return msgDetails.filter(msg => {
const rows = assertGlobalInstance()
.prepare(
@ -2248,7 +2249,7 @@ function filterAlreadyFetchedOpengroupMessage(
});
if (rows.length) {
console.info(
`filtering out already received message from ${msg.sender} at ${msg.serverTimestamp} `
`filtering out already received sogs message from ${msg.sender} at ${msg.serverTimestamp} `
);
return false;
}

View File

@ -27,15 +27,13 @@ export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageH
const plaintext = await decrypt(envelope, envelope.content);
if (!plaintext) {
// window?.log?.warn('handleSwarmContentMessage: plaintext was falsey');
return;
} else if (plaintext instanceof ArrayBuffer && plaintext.byteLength === 0) {
return;
}
perfStart(`innerHandleSwarmContentMessage-${envelope.id}`);
const sentAtTimestamp = _.toNumber(envelope.timestamp);
await innerHandleSwarmContentMessage(envelope, plaintext, messageHash);
perfEnd(`innerHandleSwarmContentMessage-${envelope.id}`, 'innerHandleSwarmContentMessage');
await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash);
} catch (e) {
window?.log?.warn(e);
}
@ -326,6 +324,7 @@ function shouldDropBlockedUserMessage(content: SignalService.Content): boolean {
// tslint:disable-next-line: cyclomatic-complexity
export async function innerHandleSwarmContentMessage(
envelope: EnvelopePlus,
sentAtTimestamp: number,
plaintext: ArrayBuffer,
messageHash: string
): Promise<void> {
@ -381,6 +380,7 @@ export async function innerHandleSwarmContentMessage(
await handleSwarmDataMessage(
envelope,
sentAtTimestamp,
content.dataMessage as SignalService.DataMessage,
messageHash,
senderConversationModel
@ -514,7 +514,8 @@ async function handleTypingMessage(
const started = action === SignalService.TypingMessage.Action.STARTED;
if (conversation) {
await conversation.notifyTyping({
// this does not commit, instead the caller should commit to trigger UI updates
await conversation.notifyTypingNoCommit({
isTyping: started,
sender: source,
});

View File

@ -11,7 +11,6 @@ import { getConversationController } from '../session/conversations';
import { handleClosedGroupControlMessage } from './closedGroups';
import { getMessageBySenderAndSentAt } from '../../ts/data/data';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import { toLogFormat } from '../types/attachments/Errors';
import {
createSwarmMessageSentFromNotUs,
@ -20,6 +19,7 @@ import {
import { MessageModel } from '../models/message';
import { isUsFromCache } from '../session/utils/User';
import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates';
import { toLogFormat } from '../types/attachments/Errors';
function cleanAttachment(attachment: any) {
return {
@ -154,6 +154,7 @@ async function cleanIncomingDataMessage(
// tslint:disable-next-line: cyclomatic-complexity
export async function handleSwarmDataMessage(
envelope: EnvelopePlus,
sentAtTimestamp: number,
rawDataMessage: SignalService.DataMessage,
messageHash: string,
senderConversationModel: ConversationModel
@ -221,8 +222,6 @@ export async function handleSwarmDataMessage(
return removeFromCache(envelope);
}
const sentAtTimestamp = _.toNumber(envelope.timestamp);
if (!convoIdToAddTheMessageTo) {
window?.log?.error('We cannot handle a message without a conversationId');
confirm();

View File

@ -66,7 +66,7 @@ export async function handleOpenGroupV2Message(
const commonAttributes = { serverTimestamp: sentTimestamp, serverId, conversationId };
const attributesForNotUs = { ...commonAttributes, sender };
// those lines just create an empty message only in memory with some basic stuff set.
// those lines just create an empty message only in-memory with some basic stuff set.
// the whole decoding of data is happening in handleMessageJob()
const msgModel = isMe
? createPublicMessageSentFromUs(commonAttributes)

View File

@ -6,7 +6,7 @@ import _ from 'lodash';
import { getConversationController } from '../session/conversations';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import { MessageModel } from '../models/message';
import { getMessageById, getMessageCountByType, getMessagesBySentAt } from '../../ts/data/data';
import { getMessageCountByType, getMessagesBySentAt } from '../../ts/data/data';
import { SignalService } from '../protobuf';
import { UserUtils } from '../session/utils';
@ -21,8 +21,12 @@ function contentTypeSupported(type: string): boolean {
return Chrome.isImageTypeSupported(type) || Chrome.isVideoTypeSupported(type);
}
// tslint:disable-next-line: cyclomatic-complexity
/**
* Note: this function does not trigger a write to the db nor trigger redux update.
* You have to call msg.commit() once you are done with the handling of this message
*/
async function copyFromQuotedMessage(
// tslint:disable-next-line: cyclomatic-complexity
msg: MessageModel,
quote?: SignalService.DataMessage.IQuote | null
): Promise<void> {
@ -41,7 +45,7 @@ async function copyFromQuotedMessage(
const firstAttachment = attachments?.[0] || undefined;
const id: number = _.toNumber(quoteId);
const id = _.toNumber(quoteId);
// We always look for the quote by sentAt timestamp, for opengroups, closed groups and session chats
// this will return an array of sent message by id we have locally.
@ -56,7 +60,6 @@ async function copyFromQuotedMessage(
window?.log?.warn(`We did not found quoted message ${id}.`);
quoteLocal.referencedMessageNotFound = true;
msg.set({ quote: quoteLocal });
await msg.commit();
return;
}
@ -72,7 +75,6 @@ async function copyFromQuotedMessage(
!contentTypeSupported(firstAttachment.contentType)
) {
msg.set({ quote: quoteLocal });
await msg.commit();
return;
}
@ -107,10 +109,11 @@ async function copyFromQuotedMessage(
quoteLocal.attachments = [firstAttachment];
msg.set({ quote: quoteLocal });
await msg.commit();
return;
}
/**
* Note: This does not trigger a redux update, nor write to the DB
*/
function handleLinkPreviews(messageBody: string, messagePreview: any, message: MessageModel) {
const urls = LinkPreviews.findLinks(messageBody);
const incomingPreview = messagePreview || [];
@ -127,15 +130,15 @@ function handleLinkPreviews(messageBody: string, messagePreview: any, message: M
message.set({ preview });
}
async function processProfileKey(
async function processProfileKeyNoCommit(
conversation: ConversationModel,
sendingDeviceConversation: ConversationModel,
profileKeyBuffer?: Uint8Array
) {
if (conversation.isPrivate()) {
await conversation.setProfileKey(profileKeyBuffer);
await conversation.setProfileKey(profileKeyBuffer, false);
} else {
await sendingDeviceConversation.setProfileKey(profileKeyBuffer);
await sendingDeviceConversation.setProfileKey(profileKeyBuffer, false);
}
}
@ -150,22 +153,17 @@ function handleMentions(
}
}
function updateReadStatus(message: MessageModel, conversation: ConversationModel) {
function updateReadStatus(message: MessageModel) {
if (message.isExpirationTimerUpdate()) {
message.set({ unread: 0 });
// This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for
// a message we already know about.
void conversation.onReadMessage(message, Date.now());
}
}
async function handleSyncedReceipts(message: MessageModel, conversation: ConversationModel) {
function handleSyncedReceiptsNoCommit(message: MessageModel, conversation: ConversationModel) {
// If the newly received message is from us, we assume that we've seen the messages up until that point
const sentTimestamp = message.get('sent_at');
if (sentTimestamp) {
await conversation.markRead(sentTimestamp);
conversation.markRead(sentTimestamp);
}
}
@ -204,12 +202,14 @@ export function toRegularMessage(rawDataMessage: SignalService.DataMessage): Reg
async function handleRegularMessage(
conversation: ConversationModel,
sendingDeviceConversation: ConversationModel,
message: MessageModel,
rawDataMessage: RegularMessageType,
source: string,
messageHash: string
) {
): Promise<void> {
const type = message.get('type');
// this does not trigger a UI update nor write to the db
await copyFromQuotedMessage(message, rawDataMessage.quote);
if (rawDataMessage.openGroupInvitation) {
@ -241,8 +241,8 @@ async function handleRegularMessage(
handleMentions(message, conversation, ourPubKey);
if (type === 'incoming') {
updateReadStatus(message, conversation);
if (conversation.isPrivate()) {
updateReadStatus(message);
const incomingMessageCount = await getMessageCountByType(
conversation.id,
MessageDirection.incoming
@ -266,12 +266,10 @@ async function handleRegularMessage(
}
// should only occur after isOutgoing request as it relies on didApproveMe being false.
await conversation.setDidApproveMe(true);
// edge case end
}
}
if (type === 'outgoing') {
await handleSyncedReceipts(message, conversation);
} else if (type === 'outgoing') {
// we want to do this for all types of conversations, not just private chats
handleSyncedReceiptsNoCommit(message, conversation);
if (conversation.isPrivate()) {
await conversation.setIsApproved(true);
@ -286,34 +284,22 @@ async function handleRegularMessage(
});
}
const sendingDeviceConversation = await getConversationController().getOrCreateAndWait(
source,
ConversationTypeEnum.PRIVATE
);
// Check if we need to update any profile names
// the only profile we don't update with what is coming here is ours,
// as our profile is shared accross our devices with a ConfigurationMessage
if (type === 'incoming' && rawDataMessage.profile) {
void appendFetchAvatarAndProfileJob(
if (rawDataMessage.profileKey) {
await processProfileKeyNoCommit(
conversation,
sendingDeviceConversation,
rawDataMessage.profile,
rawDataMessage.profileKey
);
}
if (rawDataMessage.profileKey) {
await processProfileKey(conversation, sendingDeviceConversation, rawDataMessage.profileKey);
}
// we just received a message from that user so we reset the typing indicator for this convo
await conversation.notifyTyping({
await conversation.notifyTypingNoCommit({
isTyping: false,
sender: source,
});
}
async function handleExpirationTimerUpdate(
async function handleExpirationTimerUpdateNoCommit(
conversation: ConversationModel,
message: MessageModel,
source: string,
@ -328,7 +314,7 @@ async function handleExpirationTimerUpdate(
});
conversation.set({ expireTimer });
await conversation.updateExpireTimer(expireTimer, source, message.get('received_at'));
await conversation.updateExpireTimer(expireTimer, source, message.get('received_at'), {}, false);
}
export async function handleMessageJob(
@ -340,11 +326,15 @@ export async function handleMessageJob(
messageHash: string
) {
window?.log?.info(
`Starting handleSwarmDataMessage for message ${messageModel.idForLogging()}, ${messageModel.get(
`Starting handleMessageJob for message ${messageModel.idForLogging()}, ${messageModel.get(
'serverTimestamp'
) || messageModel.get('timestamp')} in conversation ${conversation.idForLogging()}`
);
const sendingDeviceConversation = await getConversationController().getOrCreateAndWait(
source,
ConversationTypeEnum.PRIVATE
);
try {
messageModel.set({ flags: regularDataMessage.flags });
if (messageModel.isExpirationTimerUpdate()) {
@ -357,10 +347,12 @@ export async function handleMessageJob(
);
return;
}
await handleExpirationTimerUpdate(conversation, messageModel, source, expireTimer);
await handleExpirationTimerUpdateNoCommit(conversation, messageModel, source, expireTimer);
} else {
// this does not commit to db nor UI unless we need to approve a convo
await handleRegularMessage(
conversation,
sendingDeviceConversation,
messageModel,
regularDataMessage,
source,
@ -368,7 +360,7 @@ export async function handleMessageJob(
);
}
// save the message model to the db and it save the messageId generated to our copy
// save the message model to the db and it save the messageId generated to our in-memory copy
const id = await messageModel.commit();
messageModel.set({ id });
@ -376,42 +368,53 @@ export async function handleMessageJob(
// call it after we have an id for this message, because the jobs refer back
// to their source message.
void queueAttachmentDownloads(messageModel, conversation);
const unreadCount = await conversation.getUnreadCount();
conversation.set({ unreadCount });
// this is a throttled call and will only run once every 1 sec
// this is a throttled call and will only run once every 1 sec at most
conversation.updateLastMessage();
await conversation.commit();
try {
// We go to the database here because, between the message save above and
// the previous line's trigger() call, we might have marked all messages
// unread in the database. This message might already be read!
const fetched = await getMessageById(messageModel.get('id'));
const previousUnread = messageModel.get('unread');
// Important to update message with latest read state from database
messageModel.merge(fetched);
if (previousUnread !== messageModel.get('unread')) {
window?.log?.warn(
'Caught race condition on new message read state! ' + 'Manually starting timers.'
);
// We call markRead() even though the message is already
// marked read because we need to start expiration
// timers, etc.
await messageModel.markRead(Date.now());
}
} catch (error) {
window?.log?.warn(
'handleSwarmDataMessage: Message',
messageModel.idForLogging(),
'was deleted'
void queueAttachmentDownloads(messageModel, conversation);
// Check if we need to update any profile names
// the only profile we don't update with what is coming here is ours,
// as our profile is shared accross our devices with a ConfigurationMessage
if (messageModel.isIncoming() && regularDataMessage.profile) {
void appendFetchAvatarAndProfileJob(
sendingDeviceConversation,
regularDataMessage.profile,
regularDataMessage.profileKey
);
}
// even with all the warnings, I am very sus about if this is usefull or not
// try {
// // We go to the database here because, between the message save above and
// // the previous line's trigger() call, we might have marked all messages
// // unread in the database. This message might already be read!
// const fetched = await getMessageById(messageModel.get('id'));
// const previousUnread = messageModel.get('unread');
// // Important to update message with latest read state from database
// messageModel.merge(fetched);
// if (previousUnread !== messageModel.get('unread')) {
// window?.log?.warn(
// 'Caught race condition on new message read state! ' + 'Manually starting timers.'
// );
// // We call markRead() even though the message is already
// // marked read because we need to start expiration
// // timers, etc.
// await messageModel.markRead(Date.now());
// }
// } catch (error) {
// window?.log?.warn(
// 'handleMessageJob: Message',
// messageModel.idForLogging(),
// 'was deleted'
// );
// }
if (messageModel.get('unread')) {
conversation.throttledNotify(messageModel);
}
@ -419,13 +422,6 @@ export async function handleMessageJob(
confirm?.();
} catch (error) {
const errorForLog = error && error.stack ? error.stack : error;
window?.log?.error(
'handleSwarmDataMessage',
messageModel.idForLogging(),
'error:',
errorForLog
);
throw error;
window?.log?.error('handleMessageJob', messageModel.idForLogging(), 'error:', errorForLog);
}
}

View File

@ -231,7 +231,9 @@ async function handleDecryptedEnvelope(
messageHash: string
) {
if (envelope.content) {
await innerHandleSwarmContentMessage(envelope, plaintext, messageHash);
const sentAtTimestamp = _.toNumber(envelope.timestamp);
await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash);
} else {
await removeFromCache(envelope);
}