variable swarm polling based on activeAt timestamp

This commit is contained in:
Audric Ackermann 2021-06-25 10:47:25 +10:00
parent cc6951e4c2
commit 7ea30b70ca
No known key found for this signature in database
GPG key ID: 999F434D76324AD4
7 changed files with 99 additions and 34 deletions

View file

@ -450,11 +450,7 @@
}, window.CONSTANTS.NOTIFICATION_ENABLE_TIMEOUT_SECONDS * 1000);
window.NewReceiver.queueAllCached();
window
.getSwarmPollingInstance()
.addPubkey(window.libsession.Utils.UserUtils.getOurPubKeyStrFromCache());
window.getSwarmPollingInstance().start();
window.libsession.Utils.AttachmentDownloads.start({
logger: window.log,
});

View file

@ -245,8 +245,6 @@ const doAppStartUp = () => {
debounce(triggerAvatarReUploadIfNeeded, 200);
// TODO: Investigate the case where we reconnect
const ourKey = UserUtils.getOurPubKeyStrFromCache();
getSwarmPollingInstance().addPubkey(ourKey);
getSwarmPollingInstance().start();
};

View file

@ -44,6 +44,7 @@ import { NotificationForConvoOption } from '../components/conversation/Conversat
import { useDispatch } from 'react-redux';
import { updateConfirmModal } from '../state/ducks/modalDialog';
import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout';
import { DURATION, SWARM_POLLING_TIMEOUT } from '../session/constants';
export enum ConversationTypeEnum {
GROUP = 'group',

View file

@ -17,6 +17,12 @@ export const TTL_DEFAULT = {
TTL_MAX: 14 * DURATION.DAYS,
};
export const SWARM_POLLING_TIMEOUT = {
ACTIVE: DURATION.SECONDS * 5,
MEDIUM_ACTIVE: DURATION.SECONDS * 60,
INACTIVE: DURATION.MINUTES * 60,
};
export const PROTOCOLS = {
// tslint:disable-next-line: no-http-string
HTTP: 'http:',

View file

@ -12,9 +12,11 @@ import {
updateLastHash,
} from '../../../ts/data/data';
import { StringUtils } from '../../session/utils';
import { StringUtils, UserUtils } from '../../session/utils';
import { getConversationController } from '../conversations';
import { ConversationModel } from '../../models/conversation';
import { DURATION, SWARM_POLLING_TIMEOUT } from '../constants';
import { ConversationController } from '../conversations/ConversationController';
type PubkeyToHash = { [key: string]: string };
@ -49,33 +51,26 @@ export const getSwarmPollingInstance = () => {
return instance;
};
export class SwarmPolling {
private pubkeys: Array<PubKey>;
private groupPubkeys: Array<PubKey>;
class SwarmPolling {
private ourPubkey: PubKey | undefined;
private groupPolling: Array<{ pubkey: PubKey; lastPolledTimestamp: number }>;
private readonly lastHashes: { [key: string]: PubkeyToHash };
constructor() {
this.pubkeys = [];
this.groupPubkeys = [];
this.groupPolling = [];
this.lastHashes = {};
}
public start(): void {
this.ourPubkey = UserUtils.getOurPubKeyFromCache();
this.loadGroupIds();
void this.pollForAllKeys();
}
public addGroupId(pubkey: PubKey) {
if (this.groupPubkeys.findIndex(m => m.key === pubkey.key) === -1) {
if (this.groupPolling.findIndex(m => m.pubkey.key === pubkey.key) === -1) {
window?.log?.info('Swarm addGroupId: adding pubkey to polling', pubkey.key);
this.groupPubkeys.push(pubkey);
}
}
public addPubkey(pk: PubKey | string) {
const pubkey = PubKey.cast(pk);
if (this.pubkeys.findIndex(m => m.key === pubkey.key) === -1) {
this.pubkeys.push(pubkey);
this.groupPolling.push({ pubkey, lastPolledTimestamp: 0 });
}
}
@ -83,11 +78,13 @@ export class SwarmPolling {
const pubkey = PubKey.cast(pk);
window?.log?.info('Swarm removePubkey: removing pubkey from polling', pubkey.key);
this.pubkeys = this.pubkeys.filter(key => !pubkey.isEqual(key));
this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key));
if (this.ourPubkey && PubKey.cast(pk).isEqual(this.ourPubkey)) {
this.ourPubkey = undefined;
}
this.groupPolling = this.groupPolling.filter(group => !pubkey.isEqual(group.pubkey));
}
protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
private async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
// NOTE: sometimes pubkey is string, sometimes it is object, so
// accept both until this is fixed:
const pkStr = pubkey.key;
@ -123,6 +120,19 @@ export class SwarmPolling {
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
if (isGroup) {
// update the last fetched timestamp
this.groupPolling = this.groupPolling.map(group => {
if (PubKey.isEqual(pubkey, group.pubkey)) {
return {
...group,
lastPolledTimestamp: Date.now(),
};
}
return group;
});
}
const newMessages = await this.handleSeenMessages(messages);
newMessages.forEach((m: Message) => {
@ -133,7 +143,7 @@ export class SwarmPolling {
// Fetches messages for `pubkey` from `node` potentially updating
// the lash hash record
protected async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any>> {
private async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any>> {
const edkey = node.pubkey_ed25519;
const pkStr = pubkey.key;
@ -188,21 +198,72 @@ export class SwarmPolling {
return newMessages;
}
private async pollForAllKeys() {
const directPromises = this.pubkeys.map(async pk => {
return this.pollOnceForKey(pk, false);
});
/**
* As of today, we pull closed group pubkeys as follow:
* if activeAt is not set, poll only once per hour
* if activeAt is less than an hour old, poll every 5 seconds or so
* if activeAt is less than a day old, poll every minutes only.
* If activeAt is more than a day old, poll only once per hour
*/
private getPollingTimeout(convoId: PubKey) {
const convo = getConversationController().get(convoId.key);
if (!convo) {
return this.pollOnceForKey(convoId, true);
}
const activeAt = convo.get('active_at');
if (!activeAt) {
return SWARM_POLLING_TIMEOUT.INACTIVE;
}
const groupPromises = this.groupPubkeys.map(async pk => {
return this.pollOnceForKey(pk, true);
const currentTimestamp = Date.now();
// consider that this is an active group if activeAt is less than an hour old
if (currentTimestamp - activeAt <= DURATION.HOURS * 1) {
return SWARM_POLLING_TIMEOUT.ACTIVE;
}
if (currentTimestamp - activeAt <= DURATION.DAYS * 1) {
return SWARM_POLLING_TIMEOUT.MEDIUM_ACTIVE;
}
return SWARM_POLLING_TIMEOUT.INACTIVE;
}
private async pollForAllKeys() {
// we always poll as often as possible for our pubkey
const directPromise = this.ourPubkey
? this.pollOnceForKey(this.ourPubkey, false)
: Promise.resolve();
const now = Date.now();
const groupPromises = this.groupPolling.map(async group => {
const convoPollingTimeout = this.getPollingTimeout(group.pubkey);
const diff = now - group.lastPolledTimestamp;
const loggingId = getConversationController()
.get(group.pubkey.key)
.idForLogging();
if (diff >= convoPollingTimeout) {
window?.log?.info(
`Polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);
return this.pollOnceForKey(group.pubkey, true);
}
window?.log?.info(
`Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
);
return Promise.resolve();
});
try {
await Promise.all(_.concat(directPromises, groupPromises));
await Promise.all(_.concat(directPromise, groupPromises));
} catch (e) {
window?.log?.warn('pollForAllKeys swallowing exception: ', e);
throw e;
} finally {
setTimeout(this.pollForAllKeys.bind(this), 2000);
setTimeout(this.pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE);
}
}

View file

@ -152,6 +152,10 @@ export class PubKey {
return key.replace(PubKey.PREFIX_GROUP_TEXTSECURE, '');
}
public static isEqual(comparator1: PubKey | string, comparator2: PubKey | string) {
return PubKey.cast(comparator1).isEqual(comparator2);
}
public isEqual(comparator: PubKey | string) {
return comparator instanceof PubKey
? this.key === comparator.key

View file

@ -33,7 +33,6 @@ const defaultRoomsSlice = createSlice({
},
updateDefaultRoomsInProgress(state, action) {
const inProgress = action.payload as boolean;
window?.log?.info('fetching default rooms inProgress?', action.payload);
return { ...state, inProgress };
},
updateDefaultBase64RoomData(state, action: PayloadAction<Base64Update>) {