Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/auto reconnect ws #166

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ClientApplicationBootstrapFacade(
// } else {
setProgress(0.5f)
setState("Connecting to Trusted Node..")
if (!trustedNodeService.isConnected()) {
if (!trustedNodeService.isConnected) {
try {
trustedNodeService.connect()
setState("bootstrap.connectedToTrustedNode".i18n())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import io.ktor.websocket.close
import io.ktor.websocket.readText
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand All @@ -36,40 +39,77 @@ class WebSocketClient(
val port: Int
) : Logging {

companion object {
const val DELAY_TO_RECONNECT = 3000L
}

private val webSocketUrl: String = "ws://$host:$port/websocket"
private var session: DefaultClientWebSocketSession? = null
var isConnected = false
private val webSocketEventObservers = ConcurrentMap<String, WebSocketEventObserver>()
private val requestResponseHandlers = mutableMapOf<String, RequestResponseHandler>()
private var connectionReady = CompletableDeferred<Boolean>()
private val requestResponseHandlersMutex = Mutex()

suspend fun connect() {
private val backgroundScope = CoroutineScope(BackgroundDispatcher)

enum class WebSockectClientStatus {
DISCONNECTED,
CONNECTING,
CONNECTED
}

val _connected = MutableStateFlow(WebSockectClientStatus.DISCONNECTED)
val connected: StateFlow<WebSockectClientStatus> = _connected

fun isConnected(): Boolean = connected.value == WebSockectClientStatus.CONNECTED

suspend fun connect(isTest: Boolean = false) {
log.i("Connecting to websocket at: $webSocketUrl")
if (!isConnected) {
if (connected.value != WebSockectClientStatus.CONNECTED) {
try {
_connected.value = WebSockectClientStatus.CONNECTING
session = httpClient.webSocketSession { url(webSocketUrl) }
if (session?.isActive == true) {
isConnected = true
_connected.value = WebSockectClientStatus.CONNECTED
CoroutineScope(BackgroundDispatcher).launch { startListening() }
connectionReady.complete(true)
if (!isTest) {
log.d { "Websocket connected" }
}
}
} catch (e: Exception) {
log.e("Connecting websocket failed", e)
throw e
_connected.value = WebSockectClientStatus.DISCONNECTED
if (isTest) {
throw e
} else {
reconnect()
}
}
}
}

suspend fun disconnect() {
suspend fun disconnect(isTest: Boolean = false) {
requestResponseHandlersMutex.withLock {
requestResponseHandlers.values.forEach { it.dispose() }
requestResponseHandlers.clear()
}

session?.close()
session = null
isConnected = false
_connected.value = WebSockectClientStatus.DISCONNECTED
if (!isTest) {
log.d { "WS disconnected" }
}
}

private fun reconnect() {
backgroundScope.launch {
log.d { "Launching reconnect" }
disconnect()
delay(DELAY_TO_RECONNECT) // Delay before reconnecting
connect() // Try reconnecting recursively
}
}

// Blocking request until we get the associated response
Expand Down Expand Up @@ -133,21 +173,29 @@ class WebSocketClient(

private suspend fun startListening() {
session?.let { session ->
for (frame in session.incoming) {
if (frame is Frame.Text) {
val message = frame.readText()
//todo add input validation
log.d { "Received raw text $message" }
val webSocketMessage: WebSocketMessage =
json.decodeFromString(WebSocketMessage.serializer(), message)
log.i { "Received webSocketMessage $webSocketMessage" }
if (webSocketMessage is WebSocketResponse) {
onWebSocketResponse(webSocketMessage)
} else if (webSocketMessage is WebSocketEvent) {
onWebSocketEvent(webSocketMessage)
try {
for (frame in session.incoming) {
if (frame is Frame.Text) {
val message = frame.readText()
//todo add input validation
log.d { "Received raw text $message" }
val webSocketMessage: WebSocketMessage =
json.decodeFromString(WebSocketMessage.serializer(), message)
log.i { "Received webSocketMessage $webSocketMessage" }
if (webSocketMessage is WebSocketResponse) {
onWebSocketResponse(webSocketMessage)
} else if (webSocketMessage is WebSocketEvent) {
onWebSocketEvent(webSocketMessage)
}
}
}
} catch (e: Exception) {
log.e(e) { "Exception ocurred whilst listening for WS messages - triggering reconnect" }
} finally {
log.d { "Not listining for WS messages anymore" }
reconnect()
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WebSocketClientProvider(
}
// only update if there was actually a change
if (currentClient == null || currentClient!!.host != host || currentClient!!.port != port) {
if (currentClient?.isConnected == true) {
if (currentClient?.isConnected() == true) {
currentClient?.disconnect()
}
log.d { "Websocket client updated with url $host:$port" }
Expand All @@ -70,13 +70,13 @@ class WebSocketClientProvider(
val url = "ws://$host:$port"
return try {
// if connection is refused, catch will execute returning false
client.connect()
return client.isConnected
client.connect(true)
return client.isConnected()
} catch (e: Exception) {
log.e("Error testing connection to $url: ${e.message}")
false
} finally {
client.disconnect() // Ensure the client is closed to free resources
client.disconnect(true) // Ensure the client is closed to free resources
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network.bisq.mobile.domain.service

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.domain.data.BackgroundDispatcher
import network.bisq.mobile.domain.utils.Logging
Expand All @@ -12,14 +13,16 @@ import network.bisq.mobile.domain.utils.Logging
class TrustedNodeService(private val webSocketClientProvider: WebSocketClientProvider) : Logging {
private val backgroundScope = CoroutineScope(BackgroundDispatcher)

// TODO websocketClient.isConnected should be observable so that we emit
// events when disconnected and UI can react
fun isConnected() = webSocketClientProvider.get().isConnected
var isConnected: Boolean = false
var observingConnectivity = false

/**
* Connects to the trusted node, throws an exception if connection fails
*/
suspend fun connect() {
if (!observingConnectivity) {
observeConnectivity()
}
runCatching {
webSocketClientProvider.get().connect()
}.onSuccess {
Expand All @@ -33,4 +36,14 @@ class TrustedNodeService(private val webSocketClientProvider: WebSocketClientPro
suspend fun disconnect() {
// TODO
}

private fun observeConnectivity() {
backgroundScope.launch {
webSocketClientProvider.get().connected.collect {
log.d { "connectivity status changed - connected = $it" }
isConnected = webSocketClientProvider.get().isConnected()
}
}
observingConnectivity = true
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package network.bisq.mobile.client

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.domain.UrlLauncher
import network.bisq.mobile.domain.service.bootstrap.ApplicationBootstrapFacade
import network.bisq.mobile.domain.service.controller.NotificationServiceController
Expand All @@ -11,6 +14,7 @@ import network.bisq.mobile.presentation.MainPresenter

class ClientMainPresenter(
notificationServiceController: NotificationServiceController,
private val webSocketClientProvider: WebSocketClientProvider,
private val applicationBootstrapFacade: ApplicationBootstrapFacade,
private val offersServiceFacade: OffersServiceFacade,
private val marketPriceServiceFacade: MarketPriceServiceFacade,
Expand All @@ -21,7 +25,35 @@ class ClientMainPresenter(

override fun onViewAttached() {
super.onViewAttached()
activateServices()
listenForConnectivity()
}

override fun onViewUnattaching() {
// For Tor we might want to leave it running while in background to avoid delay of re-connect
// when going into foreground again.
// coroutineScope.launch { webSocketClient.disconnect() }
deactivateServices()
super.onViewUnattaching()
}

private fun listenForConnectivity() {
backgroundScope.launch {
webSocketClientProvider.get().connected.collect {
if (webSocketClientProvider.get().isConnected()) {
log.d { "connectivity status changed to $it - reconnecting services" }
reactiveServices()
}
}
}
}

private fun reactiveServices() {
deactivateServices()
activateServices()
}

private fun activateServices() {
runCatching {
applicationBootstrapFacade.activate()
offersServiceFacade.activate()
Expand All @@ -35,16 +67,11 @@ class ClientMainPresenter(
}
}

override fun onViewUnattaching() {
// For Tor we might want to leave it running while in background to avoid delay of re-connect
// when going into foreground again.
// coroutineScope.launch { webSocketClient.disconnect() }

private fun deactivateServices() {
applicationBootstrapFacade.deactivate()
offersServiceFacade.deactivate()
marketPriceServiceFacade.deactivate()
tradesServiceFacade.deactivate()
settingsServiceFacade.deactivate()
super.onViewUnattaching()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.koin.dsl.bind
import org.koin.dsl.module

val presentationModule = module {
single<MainPresenter> { ClientMainPresenter(get(), get(), get(), get(), get(), get(), get()) } bind AppPresenter::class
single<MainPresenter> { ClientMainPresenter(get(), get(), get(), get(), get(), get(), get(), get()) } bind AppPresenter::class

single<TopBarPresenter> { TopBarPresenter(get(), get()) } bind ITopBarPresenter::class

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class InterruptedTradePresenter(
var reportToMediatorButtonVisible: Boolean = false

override fun onViewAttached() {
super.onViewAttached()
require(tradesServiceFacade.selectedTrade.value != null)
val openTradeItemModel = tradesServiceFacade.selectedTrade.value!!
presenterScope.launch {
Expand All @@ -46,6 +47,7 @@ class InterruptedTradePresenter(

override fun onViewUnattaching() {
reset()
super.onViewUnattaching()
}

private fun tradeStateChanged(state: BisqEasyTradeStateEnum?) {
Expand Down Expand Up @@ -138,9 +140,10 @@ class InterruptedTradePresenter(

fun onCloseTrade() {
backgroundScope.launch {
require(selectedTrade.value != null)
tradesServiceFacade.closeTrade()
navigateToTab(Routes.TabOpenTradeList)
if (selectedTrade.value != null) {
tradesServiceFacade.closeTrade()
}
navigateBack()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ class TradeDetailsHeaderPresenter(
}
}

fun closeWorkflow() {
// doing a shark navigateBack causes white broken UI screen
navigateToTab(Routes.TabOpenTradeList)
private fun closeWorkflow() {
// Do not navigate, close button on the same screen does it
// navigateBack()
}

private fun reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class TradeFlowPresenter(
_tradePhaseState.value = TradePhaseState.INIT
isSeller = false
isMainChain = false
super.onViewUnattaching()
}

private fun tradeStateChanged(state: BisqEasyTradeStateEnum?) {
Expand Down
Loading