feat: reconnection logic works correctly now

This commit is contained in:
jubb 2022-03-08 14:29:46 +11:00
parent 18884ea06b
commit e9578de779
6 changed files with 137 additions and 49 deletions

View File

@ -270,6 +270,7 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
action == ACTION_FLIP_CAMERA -> handleSetCameraFlip(intent)
action == ACTION_WIRED_HEADSET_CHANGE -> handleWiredHeadsetChanged(intent)
action == ACTION_SCREEN_OFF -> handleScreenOffChange(intent)
action == ACTION_RESPONSE_MESSAGE && isSameCall(intent) && callManager.currentConnectionState == CallState.Reconnecting -> handleResponseMessage(intent)
action == ACTION_RESPONSE_MESSAGE -> handleResponseMessage(intent)
action == ACTION_ICE_MESSAGE -> handleRemoteIceCandidate(intent)
action == ACTION_ICE_CONNECTED -> handleIceConnected(intent)
@ -550,10 +551,9 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
handleLocalHangup(intent)
return
}
val isNewSession = callManager.currentConnectionState == CallState.Reconnecting
val callId = getCallId(intent)
val description = intent.getStringExtra(EXTRA_REMOTE_DESCRIPTION)
callManager.handleResponseMessage(recipient, callId, SessionDescription(SessionDescription.Type.ANSWER, description), isNewSession)
callManager.handleResponseMessage(recipient, callId, SessionDescription(SessionDescription.Type.ANSWER, description))
} catch (e: PeerConnectionException) {
terminate()
}
@ -800,35 +800,43 @@ class WebRtcCallService: Service(), CallManager.WebRtcListener {
}
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
if (newState == CONNECTED) {
scheduledTimeout?.cancel(false)
scheduledReconnect?.cancel(false)
scheduledTimeout = null
scheduledReconnect = null
activeNetwork = getCurrentNetwork()
newState?.let { state -> processIceConnectionChange(state) }
}
val intent = Intent(this, WebRtcCallService::class.java)
private fun processIceConnectionChange(newState: PeerConnection.IceConnectionState) {
serviceExecutor.execute {
if (newState == CONNECTED) {
scheduledTimeout?.cancel(false)
scheduledReconnect?.cancel(false)
scheduledTimeout = null
scheduledReconnect = null
activeNetwork = getCurrentNetwork()
val intent = Intent(this, WebRtcCallService::class.java)
.setAction(ACTION_ICE_CONNECTED)
startService(intent)
} else if (newState in arrayOf(FAILED, DISCONNECTED) && scheduledReconnect == null) {
callManager.callId?.let { callId ->
callManager.postConnectionEvent(Event.IceDisconnect) {
val currentNetwork = getCurrentNetwork()
callManager.postViewModelState(CallViewModel.State.CALL_RECONNECTING)
if (activeNetwork != currentNetwork || currentNetwork == null) {
Log.i("Loki", "Starting reconnect timer")
scheduledReconnect = timeoutExecutor.schedule(CheckReconnectedRunnable(callId, this), RECONNECT_SECONDS, TimeUnit.SECONDS)
} else {
Log.i("Loki", "Starting timeout, awaiting new reconnect")
scheduledTimeout = timeoutExecutor.schedule(TimeoutRunnable(callId, this), TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
}
} ?: run {
val intent = hangupIntent(this)
startService(intent)
} else if (newState in arrayOf(FAILED, DISCONNECTED) && scheduledReconnect == null) {
callManager.callId?.let { callId ->
callManager.postConnectionEvent(Event.IceDisconnect) {
val currentNetwork = getCurrentNetwork()
callManager.postViewModelState(CallViewModel.State.CALL_RECONNECTING)
if (activeNetwork != currentNetwork || currentNetwork == null) {
Log.i("Loki", "Starting reconnect timer")
scheduledReconnect = timeoutExecutor.schedule(CheckReconnectedRunnable(callId, this), RECONNECT_SECONDS, TimeUnit.SECONDS)
} else {
Log.i("Loki", "Starting timeout, awaiting new reconnect")
callManager.postConnectionEvent(Event.PrepareForNewOffer) {
scheduledTimeout = timeoutExecutor.schedule(TimeoutRunnable(callId, this), TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
}
}
} ?: run {
val intent = hangupIntent(this)
startService(intent)
}
}
Log.i("Loki", "onIceConnectionChange: $newState")
}
Log.i("Loki", "onIceConnectionChange: $newState")
}
override fun onIceConnectionReceivingChange(p0: Boolean) {}

View File

@ -401,10 +401,10 @@ class CallManager(context: Context, audioManager: AudioManagerCompat, private va
val connection = peerConnection ?: return Promise.ofFail(NullPointerException("No peer connection wrapper"))
val reconnected = stateProcessor.processEvent(Event.NetworkReconnect)
val reconnected = stateProcessor.processEvent(Event.ReceiveOffer) && stateProcessor.processEvent(Event.SendAnswer)
return if (reconnected) {
Log.i("Loki", "Handling new offer, restarting ice session")
connection.setNewOffer(SessionDescription(SessionDescription.Type.OFFER, offer))
connection.setNewRemoteDescription(SessionDescription(SessionDescription.Type.OFFER, offer))
// re-established an ice
val answer = connection.createAnswer(MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("IceRestart", "true"))
@ -631,15 +631,13 @@ class CallManager(context: Context, audioManager: AudioManagerCompat, private va
}
}
fun handleResponseMessage(recipient: Recipient, callId: UUID, answer: SessionDescription, isNewSession: Boolean) {
fun handleResponseMessage(recipient: Recipient, callId: UUID, answer: SessionDescription) {
if (recipient != this.recipient || callId != this.callId) {
Log.w(TAG,"Got answer for recipient and call ID we're not currently dialing")
return
}
val event = if (isNewSession) Event.Connect else Event.ReceiveAnswer
stateProcessor.processEvent(event) {
stateProcessor.processEvent(Event.ReceiveAnswer) {
val connection = peerConnection ?: throw AssertionError("assert")
connection.setRemoteDescription(answer)
@ -698,7 +696,7 @@ class CallManager(context: Context, audioManager: AudioManagerCompat, private va
postConnectionEvent(Event.NetworkReconnect) {
Log.d("Loki", "start re-establish")
val offer = connection.createOffer(MediaConstraints().apply {
val offer = connection.createNewOffer(MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("IceRestart", "true"))
})
connection.setLocalDescription(offer)

View File

@ -36,6 +36,7 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
Log.d("Loki", nextMessage.type?.name ?: "CALL MESSAGE RECEIVED")
val sender = nextMessage.sender ?: continue
val approvedContact = Recipient.from(context, Address.fromSerialized(sender), false).isApproved
Log.i("Loki", "Contact is approved?: $approvedContact")
if (!approvedContact && storage.getUserPublicKey() != sender) continue
if (!textSecurePreferences.isCallNotificationsEnabled()) {

View File

@ -140,7 +140,7 @@ class PeerConnectionWrapper(private val context: Context,
peerConnection?.dispose()
}
fun setNewOffer(description: SessionDescription) {
fun setNewRemoteDescription(description: SessionDescription) {
val future = SettableFuture<Boolean>()
peerConnection?.close()
@ -240,6 +240,39 @@ class PeerConnectionWrapper(private val context: Context,
return SessionDescription(sessionDescription.type, updatedSdp)
}
fun createNewOffer(mediaConstraints: MediaConstraints): SessionDescription {
val future = SettableFuture<SessionDescription>()
peerConnection?.close()
initPeerConnection()
peerConnection!!.createOffer(object:SdpObserver {
override fun onCreateSuccess(sdp: SessionDescription?) {
future.set(sdp)
}
override fun onSetSuccess() {
throw AssertionError()
}
override fun onCreateFailure(p0: String?) {
future.setException(PeerConnectionException(p0))
}
override fun onSetFailure(p0: String?) {
throw AssertionError()
}
}, mediaConstraints)
try {
return correctSessionDescription(future.get())
} catch (e: InterruptedException) {
throw AssertionError()
} catch (e: ExecutionException) {
throw PeerConnectionException(e)
}
}
fun createOffer(mediaConstraints: MediaConstraints): SessionDescription {
val future = SettableFuture<SessionDescription>()

View File

@ -13,11 +13,14 @@ sealed class State {
object Connecting : State()
object Connected : State()
object Reconnecting : State()
object PendingReconnect : State()
object Disconnected : State()
companion object {
val ALL_STATES = arrayOf(Idle, RemotePreOffer, RemoteRing, LocalPreOffer, LocalRing,
Connecting, Connected, Reconnecting, Disconnected)
val ALL_STATES = arrayOf(
Idle, RemotePreOffer, RemoteRing, LocalPreOffer, LocalRing,
Connecting, Connected, Reconnecting, Disconnected
)
val CAN_DECLINE_STATES = arrayOf(RemotePreOffer, RemoteRing)
val PENDING_CONNECTION_STATES = arrayOf(
@ -32,12 +35,20 @@ sealed class State {
LocalRing,
)
val CAN_HANGUP_STATES =
arrayOf(RemotePreOffer, RemoteRing, LocalPreOffer, LocalRing, Connecting, Connected, Reconnecting)
arrayOf(
RemotePreOffer,
RemoteRing,
LocalPreOffer,
LocalRing,
Connecting,
Connected,
Reconnecting
)
val CAN_RECEIVE_ICE_STATES =
arrayOf(RemoteRing, LocalRing, Connecting, Connected, Reconnecting)
}
fun withState(vararg expectedState: State, body: ()->Unit) {
fun withState(vararg expectedState: State, body: () -> Unit) {
if (this in expectedState) {
body()
}
@ -47,19 +58,31 @@ sealed class State {
sealed class Event(vararg val expectedStates: State, val outputState: State) {
object ReceivePreOffer :
Event(State.Idle, State.RemotePreOffer, outputState = State.RemotePreOffer)
Event(State.Idle, outputState = State.RemotePreOffer)
object ReceiveOffer :
Event(State.RemotePreOffer, State.Reconnecting, outputState = State.RemoteRing)
object ReceiveOffer : Event(State.RemotePreOffer, State.Reconnecting, outputState = State.RemoteRing)
object SendPreOffer : Event(State.Idle, outputState = State.LocalPreOffer)
object SendOffer : Event(State.LocalPreOffer, outputState = State.LocalRing)
object SendAnswer : Event(State.RemoteRing, outputState = State.Connecting)
object ReceiveAnswer : Event(State.LocalRing, outputState = State.Connecting)
object ReceiveAnswer :
Event(State.LocalRing, State.Reconnecting, outputState = State.Connecting)
object Connect : Event(State.Connecting, outputState = State.Connected)
object IceFailed : Event(State.Connecting, outputState = State.Disconnected)
object IceDisconnect : Event(State.Connected, outputState = State.Reconnecting)
object NetworkReconnect : Event(State.Reconnecting, outputState = State.LocalRing)
object IceDisconnect : Event(State.Connected, outputState = State.PendingReconnect)
object NetworkReconnect : Event(State.PendingReconnect, outputState = State.Reconnecting)
object PrepareForNewOffer : Event(State.PendingReconnect, outputState = State.Reconnecting)
object TimeOut :
Event(State.Connecting, State.LocalRing, State.RemoteRing, outputState = State.Disconnected)
Event(
State.Connecting,
State.LocalRing,
State.RemoteRing,
State.Reconnecting,
outputState = State.Disconnected
)
object Error : Event(*State.ALL_STATES, outputState = State.Disconnected)
object DeclineCall : Event(*CAN_DECLINE_STATES, outputState = State.Disconnected)
object Hangup : Event(*CAN_HANGUP_STATES, outputState = State.Disconnected)
@ -72,11 +95,18 @@ open class StateProcessor(initialState: State) {
open fun processEvent(event: Event, sideEffect: () -> Unit = {}): Boolean {
if (currentState in event.expectedStates) {
Log.i(
"Loki-Call",
"succeeded transitioning from ${currentState::class.simpleName} to ${event.outputState::class.simpleName} with ${event::class.simpleName}"
)
_currentState = event.outputState
sideEffect()
return true
}
Log.e("Loki-Call", "error transitioning from $currentState with ${event::class.simpleName}")
Log.e(
"Loki-Call",
"error transitioning from ${currentState::class.simpleName} to ${event.outputState::class.simpleName} with ${event::class.simpleName}"
)
return false
}
}

View File

@ -21,7 +21,14 @@ class CallStateMachineTests {
fun setup() {
stateProcessor = TestStateProcessor(State.Idle)
mock = mockStatic(Log::class.java).apply {
`when`<Unit> { Log.e(any(), any(), any()) }.then { /* do nothing */ }
`when`<Unit> { Log.e(any(), any(), any()) }.then { invocation ->
val msg = invocation.getArgument<Any>(1)
println(msg)
}
`when`<Unit> { Log.i(any(), any(), any()) }.then { invocation ->
val msg = invocation.getArgument<Any>(1)
println(msg)
}
}
}
@ -127,6 +134,18 @@ class CallStateMachineTests {
Event.Connect,
Event.IceDisconnect,
Event.NetworkReconnect,
Event.ReceiveAnswer,
Event.Connect,
Event.Hangup,
Event.Cleanup,
Event.ReceivePreOffer,
Event.ReceiveOffer,
Event.SendAnswer,
Event.Connect,
Event.IceDisconnect,
Event.PrepareForNewOffer,
Event.ReceiveOffer,
Event.SendAnswer,
Event.Connect,
Event.Hangup,
Event.Cleanup,
@ -136,15 +155,14 @@ class CallStateMachineTests {
Event.IceFailed,
Event.Cleanup,
Event.ReceivePreOffer,
Event.ReceiveOffer,
Event.DeclineCall,
Event.Cleanup
)
executions.forEach { event -> stateProcessor.processEvent(event) }
assertEquals(stateProcessor.transitions, executions.size)
assertEquals(stateProcessor.currentState, State.Idle)
assertEquals(State.Idle, stateProcessor.currentState)
assertEquals(executions.size, stateProcessor.transitions)
}
}