filter duplicates on opengroup poll in a single sql call

This commit is contained in:
Audric Ackermann 2022-04-13 13:52:05 +10:00
parent 00d70db0be
commit 0158fd5ebb
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4
10 changed files with 158 additions and 124 deletions

View File

@ -1,6 +1,8 @@
import React from 'react';
import _ from 'lodash';
import classNames from 'classnames';
import autoBind from 'auto-bind';
import {
CompositionBox,
@ -8,13 +10,14 @@ import {
StagedAttachmentType,
} from './composition/CompositionBox';
import _ from 'lodash';
import { perfEnd, perfStart } from '../../session/utils/Performance';
const DEFAULT_JPEG_QUALITY = 0.85;
import { SessionMessagesListContainer } from './SessionMessagesListContainer';
import { SessionFileDropzone } from './SessionFileDropzone';
import autoBind from 'auto-bind';
import { InConversationCallContainer } from '../calling/InConversationCallContainer';
import { SplitViewContainer } from '../SplitViewContainer';
import { LightboxGallery, MediaItemType } from '../lightbox/LightboxGallery';
@ -40,7 +43,6 @@ import { MessageView } from '../MainViewController';
import { ConversationHeaderWithDetails } from './ConversationHeader';
import { MessageDetail } from './message/message-item/MessageDetail';
import { SessionRightPanelWithDetails } from './SessionRightPanel';
import { autoOrientJpegImage } from '../../types/attachments/migrations';
import {
makeImageThumbnailBuffer,
makeVideoScreenshot,
@ -51,6 +53,7 @@ import { MAX_ATTACHMENT_FILESIZE_BYTES } from '../../session/constants';
import { ConversationMessageRequestButtons } from './ConversationRequestButtons';
import { ConversationRequestinfo } from './ConversationRequestInfo';
import { getCurrentRecoveryPhrase } from '../../util/storage';
import loadImage from 'blueimp-load-image';
// tslint:disable: jsx-curly-spacing
interface State {
@ -177,7 +180,7 @@ export class SessionConversation extends React.Component<Props, State> {
await this.scrollToNow();
};
const recoveryPhrase = getCurrentRecoveryPhrase() as string;
const recoveryPhrase = getCurrentRecoveryPhrase();
// string replace to fix case where pasted text contains invis characters causing false negatives
if (msg.body.replace(/\s/g, '').includes(recoveryPhrase.replace(/\s/g, ''))) {
@ -526,6 +529,18 @@ const renderVideoPreview = async (contentType: string, file: File, fileName: str
}
};
const autoOrientJpegImage = async (fileOrBlobOrURL: File): Promise<string> => {
perfStart('autoOrientJpegImage');
const loadedImage = await loadImage(fileOrBlobOrURL, { orientation: true, canvas: true });
perfEnd('autoOrientJpegImage', 'autoOrientJpegImage');
const dataURL = (loadedImage.image as HTMLCanvasElement).toDataURL(
MIME.IMAGE_JPEG,
DEFAULT_JPEG_QUALITY
);
return dataURL;
};
const renderImagePreview = async (contentType: string, file: File, fileName: string) => {
if (!MIME.isJPEG(contentType)) {
const urlImage = URL.createObjectURL(file);

View File

@ -10,6 +10,7 @@ import { HexKeyPair } from '../receiver/keypairs';
import { getSodiumRenderer } from '../session/crypto';
import { PubKey } from '../session/types';
import { ReduxConversationType } from '../state/ducks/conversations';
import { MsgDuplicateSearchOpenGroup } from '../types/sqlSharedTypes';
import { ExpirationTimerOptions } from '../util/expiringMessages';
import { Storage } from '../util/storage';
import { channels } from './channels';
@ -373,22 +374,11 @@ export async function getMessageBySenderAndSentAt({
return new MessageModel(messages[0]);
}
export async function getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
}: {
source: string;
serverTimestamp: number;
}): Promise<MessageModel | null> {
const messages = await channels.getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
});
if (!messages || !messages.length) {
return null;
}
return new MessageModel(messages[0]);
export async function filterAlreadyFetchedOpengroupMessage(
msgDetails: MsgDuplicateSearchOpenGroup
): Promise<MsgDuplicateSearchOpenGroup> {
const msgDetailsNotAlreadyThere = await channels.filterAlreadyFetchedOpengroupMessage(msgDetails);
return msgDetailsNotAlreadyThere || [];
}
/**

View File

@ -48,7 +48,7 @@ const channelsToMake = new Set([
'removeAllMessagesInConversation',
'getMessageCount',
'getMessageBySenderAndSentAt',
'getMessageBySenderAndServerTimestamp',
'filterAlreadyFetchedOpengroupMessage',
'getMessageBySenderAndTimestamp',
'getMessageIdsFromServerIds',
'getMessageById',

View File

@ -2232,25 +2232,28 @@ function getMessageBySenderAndTimestamp({
return map(rows, row => jsonToObject(row.json));
}
function getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
}: {
source: string;
serverTimestamp: number;
}) {
const rows = assertGlobalInstance()
.prepare(
`SELECT json FROM ${MESSAGES_TABLE} WHERE
source = $source AND
function filterAlreadyFetchedOpengroupMessage(
msgDetails: Array<{ sender: string; serverTimestamp: number }> // MsgDuplicateSearchOpenGroup
) {
return msgDetails.filter(msg => {
const rows = assertGlobalInstance()
.prepare(
`SELECT source, serverTimestamp FROM ${MESSAGES_TABLE} WHERE
source = $sender AND
serverTimestamp = $serverTimestamp;`
)
.all({
source,
serverTimestamp,
});
return map(rows, row => jsonToObject(row.json));
)
.all({
sender: msg.sender,
serverTimestamp: msg.serverTimestamp,
});
if (rows.length) {
console.info(
`filtering out already received message from ${msg.sender} at ${msg.serverTimestamp} `
);
return false;
}
return true;
});
}
function getUnreadByConversation(conversationId: string) {
@ -3521,7 +3524,7 @@ export const sqlNode = {
getUnreadCountByConversation,
getMessageCountByType,
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
filterAlreadyFetchedOpengroupMessage,
getMessageBySenderAndTimestamp,
getMessageIdsFromServerIds,
getMessageById,

View File

@ -9,10 +9,7 @@ import _ from 'lodash';
import { StringUtils, UserUtils } from '../session/utils';
import { getConversationController } from '../session/conversations';
import { handleClosedGroupControlMessage } from './closedGroups';
import {
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
} from '../../ts/data/data';
import { getMessageBySenderAndSentAt } from '../../ts/data/data';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import { toLogFormat } from '../types/attachments/Errors';
@ -276,31 +273,6 @@ export async function isSwarmMessageDuplicate({
}
}
export async function isOpengroupMessageDuplicate({
sender,
serverTimestamp,
}: {
sender: string;
serverTimestamp: number;
}) {
// serverTimestamp is only used for opengroupv2
try {
const result = await getMessageBySenderAndServerTimestamp({
source: sender,
serverTimestamp,
});
// if we have a result, it means a specific user sent two messages either with the same serverTimestamp.
// no need to do anything else, those messages must be the same
// Note: this test is not based on which conversation the user sent the message
// but we consider that a user sending two messages with the same serverTimestamp is unlikely
return Boolean(result);
} catch (error) {
window?.log?.error('isOpengroupMessageDuplicate error:', toLogFormat(error));
return false;
}
}
// tslint:disable:cyclomatic-complexity max-func-body-length */
async function handleSwarmMessage(
msgModel: MessageModel,

View File

@ -1,5 +1,4 @@
import { noop } from 'lodash';
import { ConversationTypeEnum } from '../models/conversation';
import _, { noop } from 'lodash';
import {
createPublicMessageSentFromNotUs,
createPublicMessageSentFromUs,
@ -11,8 +10,8 @@ import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/uti
import { getConversationController } from '../session/conversations';
import { removeMessagePadding } from '../session/crypto/BufferPadding';
import { UserUtils } from '../session/utils';
import { perfEnd, perfStart } from '../session/utils/Performance';
import { fromBase64ToArray } from '../session/utils/String';
import { isOpengroupMessageDuplicate } from './dataMessage';
import { handleMessageJob, toRegularMessage } from './queuedJob';
export async function handleOpenGroupV2Message(
@ -26,8 +25,12 @@ export async function handleOpenGroupV2Message(
return;
}
// Note: opengroup messages are not padded
const dataUint = new Uint8Array(removeMessagePadding(fromBase64ToArray(base64EncodedData)));
// Note: opengroup messages should not be padded
perfStart(`fromBase64ToArray-${base64EncodedData.length}`);
const arr = fromBase64ToArray(base64EncodedData);
perfEnd(`fromBase64ToArray-${base64EncodedData.length}`, 'fromBase64ToArray');
const dataUint = new Uint8Array(removeMessagePadding(arr));
const decoded = SignalService.Content.decode(dataUint);
@ -42,43 +45,38 @@ export async function handleOpenGroupV2Message(
return;
}
if (!getConversationController().get(conversationId)) {
window?.log?.error('Received a message for an unknown convo. Skipping');
if (
!getConversationController()
.get(conversationId)
?.isOpenGroupV2()
) {
window?.log?.error('Received a message for an unknown convo or not an v2. Skipping');
return;
}
const conversation = await getConversationController().getOrCreateAndWait(
conversationId,
ConversationTypeEnum.GROUP
);
const groupConvo = getConversationController().get(conversationId);
if (!conversation) {
if (!groupConvo) {
window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId);
return;
}
void conversation.queueJob(async () => {
void groupConvo.queueJob(async () => {
const isMe = UserUtils.isUsFromCache(sender);
const commonAttributes = { serverTimestamp: sentTimestamp, serverId, conversationId };
const attributesForNotUs = { ...commonAttributes, sender };
// those lines just create an empty message with some basic stuff set.
// those lines just create an empty message only in memory with some basic stuff set.
// the whole decoding of data is happening in handleMessageJob()
const msgModel = isMe
? createPublicMessageSentFromUs(commonAttributes)
: createPublicMessageSentFromNotUs(attributesForNotUs);
// WARNING this is important that the isOpengroupMessageDuplicate is made INSIDE the conversation.queueJob call
const isDuplicate = await isOpengroupMessageDuplicate(attributesForNotUs);
if (isDuplicate) {
window?.log?.info('Received duplicate opengroup message. Dropping it.');
return;
}
// Note, deduplication is made in filterDuplicatesFromDbAndIncoming now
await handleMessageJob(
msgModel,
conversation,
groupConvo,
toRegularMessage(decoded?.dataMessage as SignalService.DataMessage),
noop,
sender,

View File

@ -13,7 +13,11 @@ import {
} from './OpenGroupAPIV2CompactPoll';
import _ from 'lodash';
import { ConversationModel } from '../../../../models/conversation';
import { getMessageIdsFromServerIds, removeMessage } from '../../../../data/data';
import {
filterAlreadyFetchedOpengroupMessage,
getMessageIdsFromServerIds,
removeMessage,
} from '../../../../data/data';
import { getV2OpenGroupRoom, saveV2OpenGroupRoom } from '../../../../data/opengroups';
import { OpenGroupMessageV2 } from './OpenGroupMessageV2';
import autoBind from 'auto-bind';
@ -23,7 +27,6 @@ import { processNewAttachment } from '../../../../types/MessageAttachment';
import { MIME } from '../../../../types';
import { handleOpenGroupV2Message } from '../../../../receiver/opengroup';
import { callUtilsWorker } from '../../../../webworker/workers/util_worker_interface';
const pollForEverythingInterval = DURATION.SECONDS * 10;
const pollForRoomAvatarInterval = DURATION.DAYS * 1;
const pollForMemberCountInterval = DURATION.MINUTES * 10;
@ -394,6 +397,39 @@ const handleDeletions = async (
}
};
const filterDuplicatesFromDbAndIncoming = async (
newMessages: Array<OpenGroupMessageV2>
): Promise<Array<OpenGroupMessageV2>> => {
const start = Date.now();
// open group messages are deduplicated by sender and serverTimestamp only.
// first make sure that the incoming messages have no duplicates:
const filtered = _.uniqWith(newMessages, (a, b) => {
return (
Boolean(a.sender) &&
Boolean(a.sentTimestamp) &&
a.sender === b.sender &&
a.sentTimestamp === b.sentTimestamp
);
// make sure a sender is set, as we cast it just below
}).filter(m => Boolean(m.sender));
// now, check database to make sure those messages are not already fetched
const filteredInDb = await filterAlreadyFetchedOpengroupMessage(
filtered.map(m => {
return { sender: m.sender as string, serverTimestamp: m.sentTimestamp };
})
);
window.log.debug(
`filterDuplicatesFromDbAndIncoming of ${newMessages.length} messages took ${Date.now() -
start}ms.`
);
const opengroupMessagesFiltered = filteredInDb?.map(f => {
return newMessages.find(m => m.sender === f.sender && m.sentTimestamp === f.serverTimestamp);
});
return _.compact(opengroupMessagesFiltered) || [];
};
const handleNewMessages = async (
newMessages: Array<OpenGroupMessageV2>,
conversationId: string,
@ -419,10 +455,11 @@ const handleNewMessages = async (
// TODO filter out duplicates ?
const roomDetails: OpenGroupRequestCommonType = _.pick(roomInfos, 'serverUrl', 'roomId');
const filteredDuplicates = await filterDuplicatesFromDbAndIncoming(newMessages);
// tslint:disable-next-line: prefer-for-of
for (let index = 0; index < newMessages.length; index++) {
const newMessage = newMessages[index];
for (let index = 0; index < filteredDuplicates.length; index++) {
const newMessage = filteredDuplicates[index];
try {
await handleOpenGroupV2Message(newMessage, roomDetails);
} catch (e) {

View File

@ -1,9 +1,8 @@
import * as GoogleChrome from '../../../ts/util/GoogleChrome';
import * as MIME from '../../../ts/types/MIME';
import { toLogFormat } from './Errors';
import { arrayBufferToBlob, blobToArrayBuffer, dataURLToBlob } from 'blob-util';
import { arrayBufferToBlob, blobToArrayBuffer } from 'blob-util';
import loadImage from 'blueimp-load-image';
import { isString } from 'lodash';
import {
getImageDimensions,
@ -20,27 +19,6 @@ import {
readAttachmentData,
writeNewAttachmentData,
} from '../MessageAttachment';
import { perfEnd, perfStart } from '../../session/utils/Performance';
const DEFAULT_JPEG_QUALITY = 0.85;
// File | Blob | URLString -> LoadImageOptions -> Promise<DataURLString>
//
// Documentation for `options` (`LoadImageOptions`):
// https://github.com/blueimp/JavaScript-Load-Image/tree/v2.18.0#options
export const autoOrientJpegImage = async (
fileOrBlobOrURL: string | File | Blob
): Promise<string> => {
perfStart(`autoOrientJpegImage`);
const loadedImage = await loadImage(fileOrBlobOrURL, { orientation: true, canvas: true });
perfEnd(`autoOrientJpegImage`, `autoOrientJpegImage`);
const dataURL = (loadedImage.image as HTMLCanvasElement).toDataURL(
MIME.IMAGE_JPEG,
DEFAULT_JPEG_QUALITY
);
return dataURL;
};
// Returns true if `rawAttachment` is a valid attachment based on our current schema.
// Over time, we can expand this definition to become more narrow, e.g. require certain
@ -80,8 +58,7 @@ export const autoOrientJPEGAttachment = async (attachment: {
}
const dataBlob = arrayBufferToBlob(attachment.data, attachment.contentType);
const newDataBlob = dataURLToBlob(await autoOrientJpegImage(dataBlob));
const newDataArrayBuffer = await blobToArrayBuffer(newDataBlob);
const newDataArrayBuffer = await blobToArrayBuffer(dataBlob);
// IMPORTANT: We overwrite the existing `data` `ArrayBuffer` losing the original
// image data. Ideally, wed preserve the original image data for users who want to

View File

@ -0,0 +1,4 @@
export type MsgDuplicateSearchOpenGroup = Array<{
sender: string;
serverTimestamp: number;
}>;

View File

@ -5,7 +5,7 @@ import { sendDataExtractionNotification } from '../session/messages/outgoing/con
import { AttachmentType, save } from '../types/Attachment';
import { StagedAttachmentType } from '../components/conversation/composition/CompositionBox';
import { getAbsoluteAttachmentPath, processNewAttachment } from '../types/MessageAttachment';
import { arrayBufferToBlob, dataURLToBlob } from 'blob-util';
import { arrayBufferToBlob } from 'blob-util';
import { IMAGE_GIF, IMAGE_JPEG, IMAGE_PNG, IMAGE_TIFF, IMAGE_UNKNOWN } from '../types/MIME';
import { THUMBNAIL_SIDE } from '../types/attachments/VisualAttachment';
@ -118,6 +118,22 @@ export async function autoScaleForThumbnail<T extends { contentType: string; blo
return autoScale(attachment, maxMeasurements);
}
async function canvasToBlob(
canvas: HTMLCanvasElement,
type: string,
quality: number
): Promise<Blob | null> {
return new Promise(resolve => {
canvas.toBlob(
blob => {
resolve(blob);
},
type,
quality
);
});
}
/**
* Scale down an image to fit in the required dimension.
* Note: This method won't crop if needed,
@ -171,6 +187,8 @@ export async function autoScale<T extends { contentType: string; blob: Blob }>(
maxWidth: makeSquare ? maxMeasurements?.maxSide : maxWidth,
maxHeight: makeSquare ? maxMeasurements?.maxSide : maxHeight,
...crop,
orientation: 1,
aspectRatio: makeSquare ? 1 : undefined,
canvas: true,
};
@ -198,16 +216,34 @@ export async function autoScale<T extends { contentType: string; blob: Blob }>(
blob,
};
}
window.log.debug('canvas.originalWidth', {
canvasOriginalWidth: canvas.originalWidth,
canvasOriginalHeight: canvas.originalHeight,
maxWidth,
maxHeight,
blobsize: blob.size,
maxSize,
makeSquare,
});
let quality = 0.95;
let i = 4;
const start = Date.now();
do {
i -= 1;
window.log.info('autoscale of ', attachment, i);
readAndResizedBlob = dataURLToBlob(
(canvas.image as HTMLCanvasElement).toDataURL('image/jpeg', quality)
window.log.info(`autoscale iteration: [${i}] for:`, attachment);
perfStart(`autoscale-canvasToBlob-${attachment.blob.size}`);
const tempBlob = await canvasToBlob(canvas.image as HTMLCanvasElement, 'image/jpeg', quality);
perfEnd(
`autoscale-canvasToBlob-${attachment.blob.size}`,
`autoscale-canvasToBlob-${attachment.blob.size}`
);
if (!tempBlob) {
throw new Error('Failed to get blob during canvasToBlob.');
}
readAndResizedBlob = tempBlob;
quality = (quality * maxSize) / readAndResizedBlob.size;
if (quality > 1) {
@ -218,6 +254,8 @@ export async function autoScale<T extends { contentType: string; blob: Blob }>(
if (readAndResizedBlob.size > maxSize) {
throw new Error('Cannot add this attachment even after trying to scale it down.');
}
window.log.debug(`autoscale took ${Date.now() - start}ms `);
return {
contentType: attachment.contentType,
blob: readAndResizedBlob,