More clean up in message_receiver.js

This commit is contained in:
Maxim Shishmarev 2020-07-02 12:32:13 +10:00
parent 6fcc1f7ae4
commit 255c7ada68
8 changed files with 48 additions and 137 deletions

View file

@ -1580,7 +1580,6 @@
// messageReceiver.addEventListener('typing', onTyping);
window.Signal.AttachmentDownloads.start({
getMessageReceiver: () => messageReceiver,
logger: window.log,
});

View file

@ -34,17 +34,11 @@ const RETRY_BACKOFF = {
let enabled = false;
let timeout;
let getMessageReceiver;
let logger;
const _activeAttachmentDownloadJobs = {};
async function start(options = {}) {
({ getMessageReceiver, logger } = options);
if (!isFunction(getMessageReceiver)) {
throw new Error(
'attachment_downloads/start: getMessageReceiver must be a function'
);
}
({ logger } = options);
if (!logger) {
throw new Error('attachment_downloads/start: logger must be provided!');
}
@ -162,10 +156,6 @@ async function _runJob(job) {
await setAttachmentDownloadJobPending(id, pending);
let downloaded;
const messageReceiver = getMessageReceiver();
if (!messageReceiver) {
throw new Error('_runJob: messageReceiver not found');
}
try {
downloaded = await NewReceiver.downloadAttachment(attachment);

View file

@ -38,7 +38,7 @@ function MessageReceiver(username, password, signalingKey) {
// bind events
lokiPublicChatAPI.on(
'publicMessage',
this.handleUnencryptedMessage.bind(this)
window.NewReceiver.handleUnencryptedMessage
);
openGroupBound = true;
}
@ -80,48 +80,13 @@ MessageReceiver.prototype.extend({
}
// set up pollers for any RSS feeds
feeds.forEach(feed => {
feed.on('rssMessage', this.handleUnencryptedMessage.bind(this));
feed.on('rssMessage', window.NewReceiver.handleUnencryptedMessage);
});
// Ensures that an immediate 'empty' event from the websocket will fire only after
// all cached envelopes are processed.
this.incoming = [this.pending];
},
async handleUnencryptedMessage({ message }) {
const isMe = message.source === textsecure.storage.user.getNumber();
if (!isMe && message.message.profile) {
const conversation = await window.ConversationController.getOrCreateAndWait(
message.source,
'private'
);
await window.NewReceiver.updateProfile(
conversation,
message.message.profile,
message.message.profileKey
);
}
const ourNumber = textsecure.storage.user.getNumber();
const primaryDevice = window.storage.get('primaryDevicePubKey');
const isOurDevice =
message.source &&
(message.source === ourNumber || message.source === primaryDevice);
const isPublicChatMessage =
message.message.group &&
message.message.group.id &&
!!message.message.group.id.match(/^publicChat:/);
let ev;
if (isPublicChatMessage && isOurDevice) {
// Public chat messages from ourselves should be outgoing
ev = new Event('sent');
} else {
ev = new Event('message');
}
ev.confirm = function confirmTerm() {};
ev.data = message;
this.dispatchAndWait(ev);
},
stopProcessing() {
window.log.info('MessageReceiver: stopProcessing requested');
this.stoppingProcessing = true;
@ -145,15 +110,6 @@ MessageReceiver.prototype.extend({
onerror() {
window.log.error('websocket error');
},
dispatchAndWait(event) {
const promise = this.appPromise || Promise.resolve();
const appJobPromise = Promise.all(this.dispatchEvent(event));
const job = () => appJobPromise;
this.appPromise = promise.then(job, job);
return Promise.resolve();
},
onclose(ev) {
window.log.info(
'websocket closed',
@ -163,40 +119,6 @@ MessageReceiver.prototype.extend({
this.calledClose
);
},
onEmpty() {
const { incoming } = this;
this.incoming = [];
const emitEmpty = () => {
window.log.info("MessageReceiver: emitting 'empty' event");
const ev = new Event('empty');
this.dispatchAndWait(ev);
};
const waitForApplication = async () => {
window.log.info(
"MessageReceiver: finished processing messages after 'empty', now waiting for application"
);
const promise = this.appPromise || Promise.resolve();
this.appPromise = Promise.resolve();
// We don't await here because we don't this to gate future message processing
promise.then(emitEmpty, emitEmpty);
};
const waitForEmptyQueue = () => {
// resetting count to zero so everything queued after this starts over again
this.count = 0;
this.addToQueue(waitForApplication);
};
// We first wait for all recently-received messages (this.incoming) to be queued,
// then we queue a task to wait for the application to finish its processing, then
// finally we emit the 'empty' event to the queue.
Promise.all(incoming).then(waitForEmptyQueue, waitForEmptyQueue);
},
drain() {
const { incoming } = this;
this.incoming = [];
@ -219,42 +141,6 @@ MessageReceiver.prototype.extend({
}
return -1;
},
unpad(paddedData) {
const paddedPlaintext = new Uint8Array(paddedData);
let plaintext;
for (let i = paddedPlaintext.length - 1; i >= 0; i -= 1) {
if (paddedPlaintext[i] === 0x80) {
plaintext = new Uint8Array(i);
plaintext.set(paddedPlaintext.subarray(0, i));
plaintext = plaintext.buffer;
break;
} else if (paddedPlaintext[i] !== 0x00) {
throw new Error('Invalid padding');
}
}
return plaintext;
},
async decryptPreKeyWhisperMessage(ciphertext, sessionCipher, address) {
const padded = await sessionCipher.decryptPreKeyWhisperMessage(ciphertext);
try {
return this.unpad(padded);
} catch (e) {
if (e.message === 'Unknown identity key') {
// create an error that the UI will pick up and ask the
// user if they want to re-negotiate
const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);
throw new textsecure.IncomingIdentityKeyError(
address.toString(),
buffer.toArrayBuffer(),
e.identityKey
);
}
throw e;
}
},
});
window.textsecure = window.textsecure || {};

View file

@ -34,7 +34,7 @@ async function decryptForMediumGroup(
envelope: EnvelopePlus,
ciphertextObj: ArrayBuffer
) {
const { textsecure, dcodeIO, libloki } = window;
const { dcodeIO, libloki } = window;
const groupId = envelope.source;

View file

@ -185,7 +185,7 @@ export async function processDecrypted(envelope: EnvelopePlus, decrypted: any) {
}
if (decrypted.group) {
decrypted.group.id = decrypted.group.id?.toBinary();
decrypted.group.id = new TextDecoder('utf-8').decode(decrypted.group.id);
switch (decrypted.group.type) {
case SignalService.GroupContext.Type.UPDATE:

View file

@ -584,7 +584,9 @@ export async function handleMessageJob(
conversation.notify(message);
}
confirm();
if (confirm) {
confirm();
}
} catch (error) {
const errorForLog = error && error.stack ? error.stack : error;
window.log.error(

View file

@ -241,12 +241,6 @@ async function queueCached(item: any) {
if (decrypted) {
const payloadPlaintext = StringUtils.encode(decrypted, 'base64');
// Convert preKeys to array buffer
if (typeof envelope.preKeyBundleMessage === 'string') {
// envelope.preKeyBundleMessage = await MessageReceiver.stringToArrayBuffer(
// envelope.preKeyBundleMessage
// );
}
await queueDecryptedEnvelope(envelope, payloadPlaintext);
} else {
queueEnvelope(envelope);
@ -306,3 +300,42 @@ async function handleDecryptedEnvelope(
await removeFromCache(envelope);
}
}
export async function handleUnencryptedMessage({message : outerMessage} : any) {
const { source } = outerMessage;
const { group, profile, profileKey } = outerMessage.message;
const ourNumber = window.textsecure.storage.user.getNumber();
const isMe = source === ourNumber;
if (!isMe && profile) {
const conversation = await window.ConversationController.getOrCreateAndWait(
source,
'private'
);
await updateProfile(
conversation,
profile,
profileKey
);
}
const primaryDevice = window.storage.get('primaryDevicePubKey');
const isOurDevice = source &&
(source === ourNumber || source === primaryDevice);
const isPublicChatMessage =
group &&
group.id &&
!!group.id.match(/^publicChat:/);
const ev = {
// Public chat messages from ourselves should be outgoing
type: (isPublicChatMessage && isOurDevice) ? 'sent' : 'message',
data: outerMessage,
};
await handleMessageEvent(ev);
}

View file

@ -21,4 +21,5 @@ export interface MessageQueueInterface {
send(device: PubKey, message: ContentMessage): Promise<void>;
sendToGroup(message: GroupMessageType): Promise<void>;
sendSyncMessage(message: SyncMessage | undefined): Promise<void>;
processPending(device: PubKey): Promise<void>;
}