This commit is contained in:
Vincent 2020-06-09 02:53:23 +10:00
parent d9bb7451a3
commit d0005205a4
3 changed files with 56 additions and 9 deletions

View file

@ -3,13 +3,16 @@ import {
MessageQueueInterface,
MessageQueueInterfaceEvents,
} from './MessageQueueInterface';
import { ContentMessage, OpenGroupMessage } from '../messages/outgoing';
import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage } from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, TypedEventEmitter } from '../utils';
import { PubKey } from '../types';
import { ConversationController } from '../../window';
import { MessageSender } from '.';
export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
private readonly jobQueues: Map<string, JobQueue> = new Map();
private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly cache: PendingMessageCache;
constructor() {
@ -19,9 +22,11 @@ export class MessageQueue implements MessageQueueInterface {
}
public sendUsingMultiDevice(user: string, message: ContentMessage) {
throw new Error('Method not implemented.');
// this.cache
// throw new Error('Method not implemented.');
}
public send(device: string, message: ContentMessage) {
public send(device: PubKey, message: ContentMessage) {
throw new Error('Method not implemented.');
}
public sendToGroup(message: ContentMessage | OpenGroupMessage) {
@ -31,16 +36,57 @@ export class MessageQueue implements MessageQueueInterface {
throw new Error('Method not implemented.');
}
public processPending(device: string) {
public processPending(device: PubKey) {
// TODO: implement
const SessionManager: any = {}; // TEMP FIX
const messages = this.cache.getForDevice(device);
const conversation = ConversationController.get(device.key);
const isMediumGroup = conversation.isMediumGroup();
const hasSession = false; // TODO ; = SessionManager.hasSession(device);
if (!isMediumGroup && !hasSession) {
SessionManager.sendSessionRequestIfNeeded();
return;
}
const jobQueue = this.getJobQueue(device);
messages.forEach(message => {
if (!jobQueue.has(message.identifier)) {
const promise = jobQueue.add(message.identifier, MessageSender.send(message));
promise.then(() => {
// Message sent; remove from cache
void this.cache.remove(message);
}).catch(() => {
// Message failed to send
});
}
});
}
private processAllPending() {
// TODO: Get all devices which are pending here
}
private queue(device: string, message: ContentMessage) {
private queue(device: PubKey, message: ContentMessage) {
// TODO: implement
if (message instanceof SessionResetMessage) {
return;
}
const added = this.cache.add(device, message);
// if not added?
this.processPending(device);
}
private queueOpenGroupMessage(message: OpenGroupMessage) {
@ -48,7 +94,7 @@ export class MessageQueue implements MessageQueueInterface {
// If so we can get open group job queue and add the send job here
}
private getJobQueue(device: string): JobQueue {
private getJobQueue(device: PubKey): JobQueue {
let queue = this.jobQueues.get(device);
if (!queue) {
queue = new JobQueue();

View file

@ -5,6 +5,7 @@ import {
} from '../messages/outgoing';
import { RawMessage } from '../types/RawMessage';
import { TypedEventEmitter } from '../utils';
import { PubKey } from '../types';
type GroupMessageType = OpenGroupMessage | ClosedGroupMessage;
@ -16,7 +17,7 @@ export interface MessageQueueInterfaceEvents {
export interface MessageQueueInterface {
events: TypedEventEmitter<MessageQueueInterfaceEvents>;
sendUsingMultiDevice(user: string, message: ContentMessage): void;
send(device: string, message: ContentMessage): void;
send(device: PubKey, message: ContentMessage): void;
sendToGroup(message: GroupMessageType): void;
sendSyncMessage(message: ContentMessage): void;
}

View file

@ -102,7 +102,7 @@ export class PendingMessageCache {
await this.saveToDB();
}
public async loadFromDB() {
private async loadFromDB() {
const messages = await this.getFromStorage();
this.cache = messages;
}