feat: state machine reconnect logic wip

This commit is contained in:
jubb 2022-03-04 17:10:32 +11:00
parent a90bd89c9a
commit 3fc7654b81
4 changed files with 152 additions and 105 deletions

View File

@ -6,6 +6,8 @@ import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.media.AudioManager
import android.net.ConnectivityManager
import android.net.Network
import android.os.IBinder
import android.os.ResultReceiver
import android.telephony.PhoneStateListener
@ -190,8 +192,8 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
private var wantsToAnswer = false
private var currentTimeouts = 0
private var isNetworkAvailable = true
private var activeNetwork: Network? = null
private var scheduledTimeout: ScheduledFuture<*>? = null
private var scheduledReconnectTimeout: ScheduledFuture<*>? = null
private var scheduledReconnect: ScheduledFuture<*>? = null
private val lockManager by lazy { LockManager(this) }
@ -216,6 +218,7 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
wantsToAnswer = false
currentTimeouts = 0
isNetworkAvailable = true
activeNetwork = null
stopForeground(true)
}
@ -241,6 +244,7 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
callManager.recipient?.let { recipient ->
insertMissedCall(recipient, true)
}
} else {
}
terminate()
}
@ -250,9 +254,9 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
if (intent == null || intent.action == null) return START_NOT_STICKY
serviceExecutor.execute {
val action = intent.action
Log.d("Loki", "Handling ${intent.action}")
Log.i("Loki", "Handling ${intent.action}")
when {
action == ACTION_INCOMING_RING && isSameCall(intent) && !isPreOffer() -> handleNewOffer(intent)
action == ACTION_INCOMING_RING && isSameCall(intent) && callManager.currentConnectionState == CallState.Reconnecting -> handleNewOffer(intent)
action == ACTION_PRE_OFFER && isIdle() -> handlePreOffer(intent)
action == ACTION_INCOMING_RING && isBusy(intent) -> handleBusyCall(intent)
action == ACTION_INCOMING_RING && isPreOffer() -> handleIncomingRing(intent)
@ -271,7 +275,6 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
action == ACTION_ICE_CONNECTED -> handleIceConnected(intent)
action == ACTION_CHECK_TIMEOUT -> handleCheckTimeout(intent)
action == ACTION_CHECK_RECONNECT -> handleCheckReconnect(intent)
action == ACTION_CHECK_RECONNECT_TIMEOUT -> handleCheckReconnectTimeout(intent)
action == ACTION_IS_IN_CALL_QUERY -> handleIsInCallQuery(intent)
action == ACTION_UPDATE_AUDIO -> handleUpdateAudio(intent)
}
@ -283,16 +286,14 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
super.onCreate()
callManager.registerListener(this)
wantsToAnswer = false
isNetworkAvailable = false
isNetworkAvailable = true
registerIncomingPstnCallReceiver()
registerWiredHeadsetStateReceiver()
registerWantsToAnswerReceiver()
getSystemService(TelephonyManager::class.java)
.listen(hangupOnCallAnswered, PhoneStateListener.LISTEN_CALL_STATE)
registerUncaughtExceptionHandler()
networkChangedReceiver = NetworkChangeReceiver { available ->
networkChange(available)
}
networkChangedReceiver = NetworkChangeReceiver(::networkChange)
networkChangedReceiver!!.register(this)
}
@ -346,12 +347,16 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
val offer = intent.getStringExtra(EXTRA_REMOTE_DESCRIPTION) ?: return
val callId = getCallId(intent)
val recipient = getRemoteRecipient(intent)
callManager.onNewOffer(offer, callId, recipient)
callManager.onNewOffer(offer, callId, recipient).fail {
Log.e("Loki", "Error handling new offer", it)
callManager.postConnectionError()
terminate()
}
}
private fun handlePreOffer(intent: Intent) {
if (!callManager.isIdle()) {
Log.d(TAG, "Handling pre-offer from non-idle state")
Log.w(TAG, "Handling pre-offer from non-idle state")
return
}
val callId = getCallId(intent)
@ -580,6 +585,8 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
callManager.startCommunication(lockManager)
}
if (!connected) {
Log.e("Loki", "Error handling ice connected state transition")
callManager.postConnectionError()
terminate()
}
}
@ -604,21 +611,14 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
val numTimeouts = ++currentTimeouts
if (callId == getCallId(intent) && isNetworkAvailable && numTimeouts <= MAX_RECONNECTS) {
Log.d("Loki", "Trying to re-connect")
Log.i("Loki", "Trying to re-connect")
callManager.networkReestablished()
scheduledTimeout = timeoutExecutor.schedule(TimeoutRunnable(callId, this), TIMEOUT_SECONDS, TimeUnit.SECONDS)
} else {
Log.d("Loki", "Network isn't available, timeouts == $numTimeouts out of $MAX_RECONNECTS")
} else if (numTimeouts < MAX_RECONNECTS) {
Log.i("Loki", "Network isn't available, timeouts == $numTimeouts out of $MAX_RECONNECTS")
scheduledReconnect = timeoutExecutor.schedule(CheckReconnectedRunnable(callId, this), RECONNECT_SECONDS, TimeUnit.SECONDS)
}
}
private fun handleCheckReconnectTimeout(intent: Intent) {
val callId = callManager.callId ?: return
val callState = callManager.currentConnectionState
if (callId == getCallId(intent) && (callState !in arrayOf(CallState.Connected, CallState.Connecting))) {
Log.w(TAG, "Timing out reconnect: $callId")
} else {
Log.i("Loki", "Network isn't available, timing out")
handleLocalHangup(intent)
}
}
@ -687,11 +687,13 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
uncaughtExceptionHandlerManager?.unregister()
wantsToAnswer = false
currentTimeouts = 0
isNetworkAvailable = true
isNetworkAvailable = false
activeNetwork = null
super.onDestroy()
}
fun networkChange(networkAvailable: Boolean) {
private fun networkChange(networkAvailable: Boolean) {
Log.d("Loki", "flipping network available to $networkAvailable")
isNetworkAvailable = networkAvailable
if (networkAvailable && !callManager.isReestablishing && callManager.currentConnectionState == CallState.Connected) {
Log.d("Loki", "Should reconnected")
@ -791,30 +793,41 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
override fun onSignalingChange(p0: PeerConnection.SignalingState?) {}
fun Context.getCurrentNetwork(): Network? {
val cm = this.getSystemService(CONNECTIVITY_SERVICE) as ConnectivityManager
return cm.activeNetwork
}
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
if (newState == CONNECTED) {
scheduledTimeout?.cancel(false)
scheduledReconnect?.cancel(false)
scheduledReconnectTimeout?.cancel(false)
scheduledTimeout = null
scheduledReconnect = null
scheduledReconnectTimeout = null
activeNetwork = getCurrentNetwork()
val intent = Intent(this, WebRtcCallService::class.java)
.setAction(ACTION_ICE_CONNECTED)
startService(intent)
} else if (newState in arrayOf(FAILED, DISCONNECTED) && scheduledReconnectTimeout == null) {
} else if (newState in arrayOf(FAILED, DISCONNECTED) && scheduledReconnect == null) {
callManager.callId?.let { callId ->
callManager.postConnectionEvent(Event.IceDisconnect) {
val currentNetwork = getCurrentNetwork()
callManager.postViewModelState(CallViewModel.State.CALL_RECONNECTING)
scheduledReconnect = timeoutExecutor.schedule(CheckReconnectedRunnable(callId, this), RECONNECT_SECONDS, TimeUnit.SECONDS)
scheduledReconnectTimeout = timeoutExecutor.schedule(ReconnectTimeoutRunnable(callId, this), TIMEOUT_SECONDS, TimeUnit.SECONDS)
if (activeNetwork != currentNetwork || currentNetwork == null) {
Log.i("Loki", "Starting reconnect timer")
scheduledReconnect = timeoutExecutor.schedule(CheckReconnectedRunnable(callId, this), RECONNECT_SECONDS, TimeUnit.SECONDS)
} else {
Log.i("Loki", "Starting timeout, awaiting new reconnect")
scheduledTimeout = timeoutExecutor.schedule(TimeoutRunnable(callId, this), TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
}
} ?: run {
val intent = hangupIntent(this)
startService(intent)
}
}
Log.d(TAG, "onIceConnectionChange: $newState")
Log.i("Loki", "onIceConnectionChange: $newState")
}
override fun onIceConnectionReceivingChange(p0: Boolean) {}

View File

@ -396,18 +396,22 @@ class CallManager(context: Context, audioManager: AudioManagerCompat, private va
}
fun onNewOffer(offer: String, callId: UUID, recipient: Recipient): Promise<Unit, Exception> {
// TODO transition to ice reestablished
if (callId != this.callId) return Promise.ofFail(NullPointerException("No callId"))
if (recipient != this.recipient) return Promise.ofFail(NullPointerException("No recipient"))
val connection = peerConnection ?: return Promise.ofFail(NullPointerException("No peer connection"))
connection.setNewOffer(SessionDescription(SessionDescription.Type.OFFER, offer))
val answer = connection.createAnswer(MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("IceRestart", "true"))
})
val answerMessage = CallMessage.answer(answer.description, callId)
return MessageSender.sendNonDurably(answerMessage, recipient.address)
val reconnected = stateProcessor.processEvent(Event.NetworkReconnect)
return if (reconnected) {
connection.setNewOffer(SessionDescription(SessionDescription.Type.OFFER, offer))
val answer = connection.createAnswer(MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("IceRestart", "true"))
})
val answerMessage = CallMessage.answer(answer.description, callId)
MessageSender.sendNonDurably(answerMessage, recipient.address)
} else {
Promise.ofFail(Exception("Couldn't reconnect from current state"))
}
}
fun onIncomingRing(offer: String, callId: UUID, recipient: Recipient, callTime: Long, onSuccess: () -> Unit) {
@ -552,6 +556,10 @@ class CallManager(context: Context, audioManager: AudioManagerCompat, private va
CallState.RemoteRing -> postViewModelState(CallViewModel.State.RECIPIENT_UNAVAILABLE)
else -> postViewModelState(CallViewModel.State.CALL_DISCONNECTED)
}
if (!stateProcessor.processEvent(Event.Hangup)) {
Log.e("Loki", "")
stateProcessor.processEvent(Event.Error)
}
}
fun handleSetMuteAudio(muted: Boolean) {

View File

@ -6,8 +6,6 @@ import android.content.Intent
import android.content.IntentFilter
import android.net.ConnectivityManager
import android.net.Network
import android.os.Build
import android.util.SparseArray
import org.session.libsignal.utilities.Log
class NetworkChangeReceiver(private val onNetworkChangedCallback: (Boolean)->Unit) {
@ -20,58 +18,58 @@ class NetworkChangeReceiver(private val onNetworkChangedCallback: (Boolean)->Uni
}
}
private fun checkNetworks() {
}
val defaultObserver = object: ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
Log.d("Loki", "onAvailable: $network")
Log.i("Loki", "onAvailable: $network")
networkList += network
onNetworkChangedCallback(networkList.isNotEmpty())
}
override fun onLosing(network: Network, maxMsToLive: Int) {
Log.d("Loki", "onLosing: $network, maxMsToLive: $maxMsToLive")
Log.i("Loki", "onLosing: $network, maxMsToLive: $maxMsToLive")
}
override fun onLost(network: Network) {
Log.d("Loki", "onLost: $network")
Log.i("Loki", "onLost: $network")
networkList -= network
onNetworkChangedCallback(networkList.isNotEmpty())
}
override fun onUnavailable() {
Log.d("Loki", "onUnavailable")
Log.i("Loki", "onUnavailable")
}
}
fun receiveBroadcast(context: Context, intent: Intent) {
Log.d("Loki", intent.toString())
onNetworkChangedCallback(context.isConnected())
val connected = context.isConnected()
Log.i("Loki", "received broadcast, network connected: $connected")
onNetworkChangedCallback(connected)
}
fun Context.isConnected() : Boolean {
val cm = getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
return cm.activeNetworkInfo?.isConnected ?: false
return cm.activeNetwork != null
}
fun register(context: Context) {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
cm.registerDefaultNetworkCallback(defaultObserver)
} else {
val intentFilter = IntentFilter("android.net.conn.CONNECTIVITY_CHANGE")
context.registerReceiver(broadcastDelegate, intentFilter)
}
val intentFilter = IntentFilter("android.net.conn.CONNECTIVITY_CHANGE")
context.registerReceiver(broadcastDelegate, intentFilter)
// if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
// val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
// cm.registerDefaultNetworkCallback(defaultObserver)
// } else {
//
// }
}
fun unregister(context: Context) {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
cm.unregisterNetworkCallback(defaultObserver)
} else {
context.unregisterReceiver(broadcastDelegate)
}
context.unregisterReceiver(broadcastDelegate)
// if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
// val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
// cm.unregisterNetworkCallback(defaultObserver)
// } else {
//
// }
}
}

View File

@ -7,31 +7,46 @@ import org.thoughtcrime.securesms.webrtc.video.Camera
import org.thoughtcrime.securesms.webrtc.video.CameraEventListener
import org.thoughtcrime.securesms.webrtc.video.CameraState
import org.thoughtcrime.securesms.webrtc.video.RotationVideoSink
import org.webrtc.*
import org.webrtc.AudioSource
import org.webrtc.AudioTrack
import org.webrtc.DataChannel
import org.webrtc.EglBase
import org.webrtc.IceCandidate
import org.webrtc.MediaConstraints
import org.webrtc.MediaStream
import org.webrtc.PeerConnection
import org.webrtc.PeerConnectionFactory
import org.webrtc.SdpObserver
import org.webrtc.SessionDescription
import org.webrtc.SurfaceTextureHelper
import org.webrtc.VideoSink
import org.webrtc.VideoSource
import org.webrtc.VideoTrack
import java.security.SecureRandom
import java.util.concurrent.ExecutionException
import kotlin.random.asKotlinRandom
class PeerConnectionWrapper(context: Context,
factory: PeerConnectionFactory,
observer: PeerConnection.Observer,
localRenderer: VideoSink,
class PeerConnectionWrapper(private val context: Context,
private val factory: PeerConnectionFactory,
private val observer: PeerConnection.Observer,
private val localRenderer: VideoSink,
private val cameraEventListener: CameraEventListener,
eglBase: EglBase,
relay: Boolean = false): CameraEventListener {
private val eglBase: EglBase,
private val relay: Boolean = false): CameraEventListener {
private val peerConnection: PeerConnection
private var peerConnection: PeerConnection? = null
private val audioTrack: AudioTrack
private val audioSource: AudioSource
private val camera: Camera
private val mediaStream: MediaStream
private val videoSource: VideoSource?
private val videoTrack: VideoTrack?
private val rotationVideoSink = RotationVideoSink()
val readyForIce
get() = peerConnection.localDescription != null && peerConnection.remoteDescription != null
get() = peerConnection?.localDescription != null && peerConnection?.remoteDescription != null
init {
private fun initPeerConnection() {
val random = SecureRandom().asKotlinRandom()
val iceServers = listOf("freyr","fenrir","frigg","angus","hereford","holstein", "brahman").shuffled(random).take(2).map { sub ->
PeerConnection.IceServer.builder("turn:$sub.getsession.org")
@ -43,9 +58,6 @@ class PeerConnectionWrapper(context: Context,
val constraints = MediaConstraints().apply {
optional.add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"))
}
val audioConstraints = MediaConstraints().apply {
optional.add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"))
}
val configuration = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
@ -55,36 +67,49 @@ class PeerConnectionWrapper(context: Context,
}
}
peerConnection = factory.createPeerConnection(configuration, constraints, observer)!!
peerConnection.setAudioPlayout(true)
peerConnection.setAudioRecording(true)
val newPeerConnection = factory.createPeerConnection(configuration, constraints, observer)!!
peerConnection = newPeerConnection
newPeerConnection.setAudioPlayout(true)
newPeerConnection.setAudioRecording(true)
val mediaStream = factory.createLocalMediaStream("ARDAMS")
newPeerConnection.addStream(mediaStream)
}
init {
val audioConstraints = MediaConstraints().apply {
optional.add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"))
}
mediaStream = factory.createLocalMediaStream("ARDAMS")
audioSource = factory.createAudioSource(audioConstraints)
audioTrack = factory.createAudioTrack("ARDAMSa0", audioSource)
audioTrack.setEnabled(true)
mediaStream.addTrack(audioTrack)
camera = Camera(context, this)
if (camera.capturer != null) {
videoSource = factory.createVideoSource(false)
videoTrack = factory.createVideoTrack("ARDAMSv0", videoSource)
rotationVideoSink.setObserver(videoSource.capturerObserver)
camera.capturer.initialize(
SurfaceTextureHelper.create("WebRTC-SurfaceTextureHelper", eglBase.eglBaseContext),
context,
rotationVideoSink
val newCamera = Camera(context, this)
camera = newCamera
if (newCamera.capturer != null) {
val newVideoSource = factory.createVideoSource(false)
videoSource = newVideoSource
val newVideoTrack = factory.createVideoTrack("ARDAMSv0", newVideoSource)
videoTrack = newVideoTrack
rotationVideoSink.setObserver(newVideoSource.capturerObserver)
newCamera.capturer.initialize(
SurfaceTextureHelper.create("WebRTC-SurfaceTextureHelper", eglBase.eglBaseContext),
context,
rotationVideoSink
)
rotationVideoSink.mirrored = camera.activeDirection == CameraState.Direction.FRONT
rotationVideoSink.mirrored = newCamera.activeDirection == CameraState.Direction.FRONT
rotationVideoSink.setSink(localRenderer)
videoTrack.setEnabled(false)
mediaStream.addTrack(videoTrack)
newVideoTrack.setEnabled(false)
mediaStream.addTrack(newVideoTrack)
} else {
videoSource = null
videoTrack = null
}
peerConnection.addStream(mediaStream)
initPeerConnection()
}
fun getCameraState(): CameraState {
@ -97,12 +122,12 @@ class PeerConnectionWrapper(context: Context,
negotiated = true
id = 548
}
return peerConnection.createDataChannel(channelName, dataChannelConfiguration)
return peerConnection!!.createDataChannel(channelName, dataChannelConfiguration)
}
fun addIceCandidate(candidate: IceCandidate) {
// TODO: filter logic based on known servers
peerConnection.addIceCandidate(candidate)
peerConnection!!.addIceCandidate(candidate)
}
fun dispose() {
@ -111,14 +136,17 @@ class PeerConnectionWrapper(context: Context,
videoSource?.dispose()
audioSource.dispose()
peerConnection.close()
peerConnection.dispose()
peerConnection?.close()
peerConnection?.dispose()
}
fun setNewOffer(description: SessionDescription) {
val future = SettableFuture<Boolean>()
peerConnection.setRemoteDescription(object: SdpObserver {
peerConnection?.close()
initPeerConnection()
peerConnection!!.setRemoteDescription(object: SdpObserver {
override fun onCreateSuccess(p0: SessionDescription?) {
throw AssertionError()
}
@ -148,7 +176,7 @@ class PeerConnectionWrapper(context: Context,
fun setRemoteDescription(description: SessionDescription) {
val future = SettableFuture<Boolean>()
peerConnection.setRemoteDescription(object: SdpObserver {
peerConnection!!.setRemoteDescription(object: SdpObserver {
override fun onCreateSuccess(p0: SessionDescription?) {
throw AssertionError()
}
@ -178,7 +206,7 @@ class PeerConnectionWrapper(context: Context,
fun createAnswer(mediaConstraints: MediaConstraints) : SessionDescription {
val future = SettableFuture<SessionDescription>()
peerConnection.createAnswer(object:SdpObserver {
peerConnection!!.createAnswer(object:SdpObserver {
override fun onCreateSuccess(sdp: SessionDescription?) {
future.set(sdp)
}
@ -215,7 +243,7 @@ class PeerConnectionWrapper(context: Context,
fun createOffer(mediaConstraints: MediaConstraints): SessionDescription {
val future = SettableFuture<SessionDescription>()
peerConnection.createOffer(object:SdpObserver {
peerConnection!!.createOffer(object:SdpObserver {
override fun onCreateSuccess(sdp: SessionDescription?) {
future.set(sdp)
}
@ -245,7 +273,7 @@ class PeerConnectionWrapper(context: Context,
fun setLocalDescription(sdp: SessionDescription) {
val future = SettableFuture<Boolean>()
peerConnection.setLocalDescription(object: SdpObserver {
peerConnection!!.setLocalDescription(object: SdpObserver {
override fun onCreateSuccess(p0: SessionDescription?) {
}
@ -271,8 +299,8 @@ class PeerConnectionWrapper(context: Context,
}
fun setCommunicationMode() {
peerConnection.setAudioPlayout(true)
peerConnection.setAudioRecording(true)
peerConnection?.setAudioPlayout(true)
peerConnection?.setAudioRecording(true)
}
fun setAudioEnabled(isEnabled: Boolean) {