setup webrtc between two pubkey

This commit is contained in:
Audric Ackermann 2021-09-21 17:51:23 +10:00
parent a538cac491
commit d55c96cb67
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4
9 changed files with 405 additions and 14 deletions

View File

@ -53,6 +53,7 @@ window.lokiFeatureFlags = {
padOutgoingAttachments: true,
enablePinConversations: true,
useUnsendRequests: false,
useCallMessage: true,
};
window.isBeforeVersion = (toCheck, baseVersion) => {

View File

@ -0,0 +1,80 @@
import _ from 'lodash';
import { SignalService } from '../protobuf';
import { TTL_DEFAULT } from '../session/constants';
import { SNodeAPI } from '../session/snode_api';
import { CallManager } from '../session/utils';
import { removeFromCache } from './cache';
import { EnvelopePlus } from './types';
// audric FIXME: refactor this out to persistence, just to help debug the flow and send/receive in synchronous testing
export async function handleCallMessage(
envelope: EnvelopePlus,
callMessage: SignalService.CallMessage
) {
const sender = envelope.senderIdentity || envelope.source;
const currentOffset = SNodeAPI.getLatestTimestampOffset();
const sentTimestamp = _.toNumber(envelope.timestamp);
const { type } = callMessage;
switch (type) {
case SignalService.CallMessage.Type.END_CALL:
window.log.info('handling callMessage END_CALL');
break;
case SignalService.CallMessage.Type.ANSWER:
window.log.info('handling callMessage ANSWER');
break;
case SignalService.CallMessage.Type.ICE_CANDIDATES:
window.log.info('handling callMessage ICE_CANDIDATES');
break;
case SignalService.CallMessage.Type.OFFER:
window.log.info('handling callMessage OFFER');
break;
case SignalService.CallMessage.Type.PROVISIONAL_ANSWER:
window.log.info('handling callMessage PROVISIONAL_ANSWER');
break;
default:
window.log.info('handling callMessage unknown');
}
if (type === SignalService.CallMessage.Type.OFFER) {
if (Math.max(sentTimestamp - (Date.now() - currentOffset)) > TTL_DEFAULT.CALL_MESSAGE) {
window?.log?.info('Dropping incoming OFFER callMessage sent a while ago: ', sentTimestamp);
await removeFromCache(envelope);
return;
}
await removeFromCache(envelope);
await CallManager.handleOfferCallMessage(sender, callMessage);
return;
}
if (type === SignalService.CallMessage.Type.END_CALL) {
await removeFromCache(envelope);
CallManager.handleEndCallMessage(sender);
return;
}
if (type === SignalService.CallMessage.Type.ANSWER) {
await removeFromCache(envelope);
await CallManager.handleCallAnsweredMessage(sender, callMessage);
return;
}
if (type === SignalService.CallMessage.Type.ICE_CANDIDATES) {
await removeFromCache(envelope);
await CallManager.handleIceCandidatesMessage(sender, callMessage);
return;
}
await removeFromCache(envelope);
// if this another type of call message, just add it to the manager
await CallManager.handleOtherCallMessage(sender, callMessage);
}

View File

@ -18,6 +18,7 @@ import { removeMessagePadding } from '../session/crypto/BufferPadding';
import { perfEnd, perfStart } from '../session/utils/Performance';
import { getAllCachedECKeyPair } from './closedGroups';
import { getMessageBySenderAndTimestamp } from '../data/data';
import { handleCallMessage } from './callMessage';
export async function handleContentMessage(envelope: EnvelopePlus, messageHash?: string) {
try {
@ -399,6 +400,9 @@ export async function innerHandleContentMessage(
if (content.unsendMessage && window.lokiFeatureFlags?.useUnsendRequests) {
await handleUnsendMessage(envelope, content.unsendMessage as SignalService.Unsend);
}
if (content.callMessage && window.lokiFeatureFlags?.useCallMessage) {
await handleCallMessage(envelope, content.callMessage as SignalService.CallMessage);
}
} catch (e) {
window?.log?.warn(e);
}

View File

@ -3,28 +3,30 @@ import { MessageParams } from '../Message';
import { ContentMessage } from '..';
import { signalservice } from '../../../../protobuf/compiled';
import { TTL_DEFAULT } from '../../../constants';
interface CallMessageMessageParams extends MessageParams {
interface CallMessageParams extends MessageParams {
type: SignalService.CallMessage.Type;
sdpMLineIndexes: Array<number>;
sdpMids: Array<string>;
sdps: Array<string>;
referencedAttachmentTimestamp: number;
sdpMLineIndexes?: Array<number>;
sdpMids?: Array<string>;
sdps?: Array<string>;
}
export class CallMessageMessage extends ContentMessage {
export class CallMessage extends ContentMessage {
public readonly type: signalservice.CallMessage.Type;
public readonly sdpMLineIndexes: Array<number>;
public readonly sdpMids: Array<string>;
public readonly sdps: Array<string>;
public readonly sdpMLineIndexes?: Array<number>;
public readonly sdpMids?: Array<string>;
public readonly sdps?: Array<string>;
constructor(params: CallMessageMessageParams) {
constructor(params: CallMessageParams) {
super({ timestamp: params.timestamp, identifier: params.identifier });
this.type = params.type;
this.sdpMLineIndexes = params.sdpMLineIndexes;
this.sdpMids = params.sdpMids;
this.sdps = params.sdps;
// this does not make any sense
if (this.type !== signalservice.CallMessage.Type.END_CALL && this.sdps.length === 0) {
if (
this.type !== signalservice.CallMessage.Type.END_CALL &&
(!this.sdps || this.sdps.length === 0)
) {
throw new Error('sdps must be set unless this is a END_CALL type message');
}
}

View File

@ -21,6 +21,7 @@ import { SyncMessageType } from '../utils/syncUtils';
import { OpenGroupRequestCommonType } from '../../opengroup/opengroupV2/ApiUtil';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage';
type ClosedGroupMessageType =
| ClosedGroupVisibleMessage
@ -129,7 +130,7 @@ export class MessageQueue {
*/
public async sendToPubKeyNonDurably(
user: PubKey,
message: ClosedGroupNewMessage
message: ClosedGroupNewMessage | CallMessage
): Promise<boolean> {
let rawMessage;
try {

View File

@ -30,11 +30,11 @@ export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.';
let latestTimestampOffset = Number.MAX_SAFE_INTEGER;
function handleTimestampOffset(request: string, snodeTimestamp: number) {
function handleTimestampOffset(_request: string, snodeTimestamp: number) {
if (snodeTimestamp && _.isNumber(snodeTimestamp) && snodeTimestamp > 1609419600 * 1000) {
// first january 2021. Arbitrary, just want to make sure the return timestamp is somehow valid and not some crazy low value
const now = Date.now();
window?.log?.info(`timestamp offset from request ${request}: ${now - snodeTimestamp}ms`);
// window?.log?.info(`timestamp offset from request ${request}: ${now - snodeTimestamp}ms`);
latestTimestampOffset = now - snodeTimestamp;
}
}

View File

@ -0,0 +1,300 @@
import _ from 'lodash';
import { SignalService } from '../../protobuf';
import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage';
import { ed25519Str } from '../onions/onionPath';
import { getMessageQueue } from '../sending';
import { PubKey } from '../types';
const incomingCall = ({ sender }: { sender: string }) => {
return { type: 'incomingCall', payload: sender };
};
const endCall = ({ sender }: { sender: string }) => {
return { type: 'endCall', payload: sender };
};
const answerCall = ({ sender, sdps }: { sender: string; sdps: Array<string> }) => {
return {
type: 'answerCall',
payload: {
sender,
sdps,
},
};
};
/**
* This field stores all the details received by a sender about a call in separate messages.
*/
const callCache = new Map<string, Array<SignalService.CallMessage>>();
let peerConnection: RTCPeerConnection | null;
const ENABLE_VIDEO = false;
const configuration = {
configuration: {
offerToReceiveAudio: true,
offerToReceiveVideo: ENABLE_VIDEO,
},
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' },
{ urls: 'stun:stun2.l.google.com:19302' },
{ urls: 'stun:stun3.l.google.com:19302' },
{ urls: 'stun:stun4.l.google.com:19302' },
],
};
export async function USER_callRecipient(recipient: string) {
window?.log?.info(`starting call with ${ed25519Str(recipient)}..`);
if (peerConnection) {
window.log.info('closing existing peerconnection');
peerConnection.close();
peerConnection = null;
}
peerConnection = new RTCPeerConnection(configuration);
const mediadevices = await openMediaDevices();
mediadevices.getTracks().map(track => {
window.log.info('USER_callRecipient adding track: ', track);
peerConnection?.addTrack(track);
});
peerConnection.addEventListener('connectionstatechange', _event => {
window.log.info('peerConnection?.connectionState:', peerConnection?.connectionState);
if (peerConnection?.connectionState === 'connected') {
// Peers connected!
}
});
peerConnection.addEventListener('icecandidate', event => {
// window.log.warn('event.candidate', event.candidate);
if (event.candidate) {
iceCandidates.push(event.candidate);
void iceSenderDebouncer(recipient);
}
});
const offerDescription = await peerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: ENABLE_VIDEO,
});
if (!offerDescription || !offerDescription.sdp || !offerDescription.sdp.length) {
window.log.warn(`failed to createOffer for recipient ${ed25519Str(recipient)}`);
return;
}
await peerConnection.setLocalDescription(offerDescription);
const callOfferMessage = new CallMessage({
timestamp: Date.now(),
type: SignalService.CallMessage.Type.OFFER,
sdps: [offerDescription.sdp],
});
window.log.info('sending OFFER MESSAGE');
await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(recipient), callOfferMessage);
// FIXME audric dispatch UI update to show the calling UI
}
const iceCandidates: Array<RTCIceCandidate> = new Array();
const iceSenderDebouncer = _.debounce(async (recipient: string) => {
if (!iceCandidates) {
return;
}
const validCandidates = _.compact(
iceCandidates.map(c => {
if (
c.sdpMLineIndex !== null &&
c.sdpMLineIndex !== undefined &&
c.sdpMid !== null &&
c.candidate
) {
return {
sdpMLineIndex: c.sdpMLineIndex,
sdpMid: c.sdpMid,
candidate: c.candidate,
};
}
return null;
})
);
const callIceCandicates = new CallMessage({
timestamp: Date.now(),
type: SignalService.CallMessage.Type.ICE_CANDIDATES,
sdpMLineIndexes: validCandidates.map(c => c.sdpMLineIndex),
sdpMids: validCandidates.map(c => c.sdpMid),
sdps: validCandidates.map(c => c.candidate),
});
window.log.info('sending ICE CANDIDATES MESSAGE to ', recipient);
await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(recipient), callIceCandicates);
}, 2000);
const openMediaDevices = async () => {
return navigator.mediaDevices.getUserMedia({
// video: {
// width: 320,
// height: 240,
// },
video: ENABLE_VIDEO,
audio: true,
});
};
export async function USER_acceptIncomingCallRequest(fromSender: string) {
const msgCacheFromSender = callCache.get(fromSender);
if (!msgCacheFromSender) {
window?.log?.info(
'incoming call request cannot be accepted as the corresponding message is not found'
);
return;
}
const lastOfferMessage = _.findLast(
msgCacheFromSender,
m => m.type === SignalService.CallMessage.Type.OFFER
);
if (!lastOfferMessage) {
window?.log?.info(
'incoming call request cannot be accepted as the corresponding message is not found'
);
return;
}
if (peerConnection) {
window.log.info('closing existing peerconnection');
peerConnection.close();
peerConnection = null;
}
peerConnection = new RTCPeerConnection(configuration);
const mediadevices = await openMediaDevices();
mediadevices.getTracks().map(track => {
window.log.info('USER_acceptIncomingCallRequest adding track ', track);
peerConnection?.addTrack(track);
});
peerConnection.addEventListener('connectionstatechange', _event => {
window.log.info('peerConnection?.connectionState:', peerConnection?.connectionState);
if (peerConnection?.connectionState === 'connected') {
// Peers connected!
}
});
const { sdps } = lastOfferMessage;
if (!sdps || sdps.length === 0) {
window?.log?.info(
'incoming call request cannot be accepted as the corresponding sdps is empty'
);
return;
}
await peerConnection.setRemoteDescription(
new RTCSessionDescription({ sdp: sdps[0], type: 'offer' })
);
const answer = await peerConnection.createAnswer({
offerToReceiveAudio: true,
offerToReceiveVideo: ENABLE_VIDEO,
});
if (!answer?.sdp || answer.sdp.length === 0) {
window.log.warn('failed to create answer');
return;
}
await peerConnection.setLocalDescription(answer);
const answerSdp = answer.sdp;
const callAnswerMessage = new CallMessage({
timestamp: Date.now(),
type: SignalService.CallMessage.Type.ANSWER,
sdps: [answerSdp],
});
window.log.info('sending ANSWER MESSAGE');
await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(fromSender), callAnswerMessage);
window.inboxStore?.dispatch(answerCall({ sender: fromSender, sdps }));
}
export async function USER_rejectIncomingCallRequest(fromSender: string) {
const endCallMessage = new CallMessage({
type: SignalService.CallMessage.Type.END_CALL,
timestamp: Date.now(),
});
callCache.delete(fromSender);
window.inboxStore?.dispatch(endCall({ sender: fromSender }));
window.log.info('sending END_CALL MESSAGE');
await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(fromSender), endCallMessage);
}
export function handleEndCallMessage(sender: string) {
callCache.delete(sender);
//
// FIXME audric trigger UI cleanup
window.inboxStore?.dispatch(endCall({ sender }));
}
export async function handleOfferCallMessage(
sender: string,
callMessage: SignalService.CallMessage
) {
if (!callCache.has(sender)) {
callCache.set(sender, new Array());
}
callCache.get(sender)?.push(callMessage);
window.inboxStore?.dispatch(incomingCall({ sender }));
//FIXME audric. thiis should not be auto accepted here
await USER_acceptIncomingCallRequest(sender);
}
export async function handleCallAnsweredMessage(
sender: string,
callMessage: SignalService.CallMessage
) {
if (!callMessage.sdps || callMessage.sdps.length === 0) {
window.log.warn('cannot handle answered message without sdps');
return;
}
if (!callCache.has(sender)) {
callCache.set(sender, new Array());
}
callCache.get(sender)?.push(callMessage);
window.inboxStore?.dispatch(incomingCall({ sender }));
const remoteDesc = new RTCSessionDescription({ type: 'answer', sdp: callMessage.sdps[0] });
if (peerConnection) {
await peerConnection.setRemoteDescription(remoteDesc);
} else {
window.log.info('call answered by recipient but we do not have a peerconnection set');
}
}
export async function handleIceCandidatesMessage(
sender: string,
callMessage: SignalService.CallMessage
) {
if (!callMessage.sdps || callMessage.sdps.length === 0) {
window.log.warn('cannot handle iceCandicates message without candidates');
return;
}
if (!callCache.has(sender)) {
callCache.set(sender, new Array());
}
callCache.get(sender)?.push(callMessage);
window.inboxStore?.dispatch(incomingCall({ sender }));
if (peerConnection) {
// tslint:disable-next-line: prefer-for-of
for (let index = 0; index < callMessage.sdps.length; index++) {
const sdp = callMessage.sdps[index];
const sdpMLineIndex = callMessage.sdpMLineIndexes[index];
const sdpMid = callMessage.sdpMids[index];
const candicate = new RTCIceCandidate({ sdpMid, sdpMLineIndex, candidate: sdp });
await peerConnection.addIceCandidate(candicate);
}
} else {
window.log.info('handleIceCandidatesMessage but we do not have a peerconnection set');
}
}
// tslint:disable-next-line: no-async-without-await
export async function handleOtherCallMessage(
sender: string,
callMessage: SignalService.CallMessage
) {
callCache.get(sender)?.push(callMessage);
}

View File

@ -9,6 +9,7 @@ import * as UserUtils from './User';
import * as SyncUtils from './syncUtils';
import * as AttachmentsV2Utils from './AttachmentsV2';
import * as AttachmentDownloads from './AttachmentsDownload';
import * as CallManager from './CallManager';
export * from './Attachments';
export * from './TypedEmitter';
@ -26,4 +27,5 @@ export {
SyncUtils,
AttachmentsV2Utils,
AttachmentDownloads,
CallManager,
};

1
ts/window.d.ts vendored
View File

@ -48,6 +48,7 @@ declare global {
padOutgoingAttachments: boolean;
enablePinConversations: boolean;
useUnsendRequests: boolean;
useCallMessage: boolean;
};
lokiSnodeAPI: LokiSnodeAPI;
onLogin: any;