From 005950e8bf55482d2941c18d220b2137820b490f Mon Sep 17 00:00:00 2001 From: cloudwebrtc Date: Fri, 24 Nov 2023 21:06:10 +0800 Subject: [PATCH] Improve PCs reconnection speed. --- lib/src/core/engine.dart | 100 ++++++++++--------------- lib/src/core/signal_client.dart | 8 +- lib/src/extensions.dart | 3 +- lib/src/internal/events.dart | 5 -- lib/src/support/websocket_utility.dart | 4 + 5 files changed, 45 insertions(+), 75 deletions(-) diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index a7302579a..8d1ddb96f 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -102,14 +102,6 @@ class Engine extends Disposable with EventsEmittable { late EventsListener _signalListener = signalClient.createListener(synchronized: true); - Function? _cancelDebounce; - - late final resumeConnection = Utils.createDebounceFunc( - (_) => _resumeConnection(), - cancelFunc: (f) => _cancelDebounce = f, - wait: connectOptions.timeouts.debounce, - ); - Engine({ required this.connectOptions, required this.roomOptions, @@ -130,7 +122,6 @@ class Engine extends Disposable with EventsEmittable { await cleanUp(); await events.dispose(); await _signalListener.dispose(); - _cancelDebounce?.call(); }); } @@ -397,37 +388,28 @@ class Engine extends Disposable with EventsEmittable { subscriber?.pc.onDataChannel = _onDataChannel; } - subscriber?.pc.onConnectionState = - (state) => events.emit(EngineSubscriberPeerStateUpdatedEvent( - state: state, - isPrimary: _subscriberPrimary, - )); - - publisher?.pc.onConnectionState = - (state) => events.emit(EnginePublisherPeerStateUpdatedEvent( - state: state, - isPrimary: !_subscriberPrimary, - )); + subscriber?.pc.onConnectionState = (state) async { + events.emit(EngineSubscriberPeerStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); + logger.fine('subscriber connectionState: $state'); + if (state.isDisconnected()) { + await resumeConnection(ClientDisconnectReason.peerConnectionFailed); + } + }; - events.on((event) async { - if (event.state.isDisconnected() || event.state.isFailed()) { - try { - if (signalClient.connectionState != ConnectionState.connected) { - logger.fine( - 'PeerConnectionState isDisconnected: ${event.state} isPrimary: ${event.isPrimary}'); - - await signalClient.events.waitFor( - duration: connectOptions.timeouts.connection * 2, - onTimeout: () => throw ConnectException( - 'Timed out waiting for SignalResumedEvent'), - ); - } - await resumeConnection.call(null); - } catch (e) { - logger.warning('Failed to resume connection: $e'); - } + publisher?.pc.onConnectionState = (state) async { + events.emit(EnginePublisherPeerStateUpdatedEvent( + state: state, + isPrimary: !_subscriberPrimary, + )); + logger.fine('publisher connectionState: $state'); + if (state.isDisconnected()) { + await resumeConnection + .call(ClientDisconnectReason.peerConnectionFailed); } - }); + }; subscriber?.pc.onTrack = (rtc.RTCTrackEvent event) { logger.fine('[WebRTC] pc.onTrack'); @@ -593,6 +575,11 @@ class Engine extends Disposable with EventsEmittable { logger .info('onDisconnected state:${_connectionState} reason:${reason.name}'); + if (_restarting) { + logger.fine('in restarting, skip handleDisconnect'); + return; + } + if (!fullReconnect) { fullReconnect = _clientConfiguration?.resumeConnection == lk_models.ClientConfigSetting.DISABLED || @@ -603,13 +590,10 @@ class Engine extends Disposable with EventsEmittable { ].contains(reason); } - if (reason == ClientDisconnectReason.signal) { - logger.fine('[$objectId] Signal disconnected'); - return; - } - - if (_restarting) { - logger.fine('[$objectId] Already reconnecting...'); + if (reason == ClientDisconnectReason.leaveReconnect && + [ConnectionState.disconnected, ConnectionState.connected] + .contains(_connectionState)) { + logger.fine('skip handleDisconnect for leaveReconnect'); return; } @@ -618,14 +602,13 @@ class Engine extends Disposable with EventsEmittable { } } - Future _resumeConnection() async { - logger.fine('resumeConnection'); - - if (publisher == null || subscriber == null) { - throw UnexpectedStateException('publisher or subscribers is null'); + Future resumeConnection(ClientDisconnectReason reason) async { + if (_restarting) { + logger.fine('in restarting or resuming, skip resumeConnection..'); + return; } - subscriber!.restartingIce = true; + logger.fine('resumeConnection: reason: ${reason.name}'); if (_hasPublished) { logger.fine('resumeConnection: negotiating publisher...'); @@ -640,16 +623,13 @@ class Engine extends Disposable with EventsEmittable { logger.fine('resumeConnection: primary is connected: $isConnected'); if (!isConnected) { + subscriber!.restartingIce = true; logger.fine('resumeConnection: Waiting for primary to connect...'); await events.waitFor( filter: (event) => event.isPrimary && event.state.isConnected(), - duration: connectOptions.timeouts.peerConnection + - const Duration( - seconds: 5, - ), - onTimeout: () => throw ConnectException( - 'Timed out waiting for peerconnection to connect'), + duration: connectOptions.timeouts.peerConnection, ); + logger.fine('resumeConnection: primary connected'); } } @@ -780,6 +760,8 @@ class Engine extends Disposable with EventsEmittable { if (!_subscriberPrimary) { await negotiate(); } + + await resumeConnection(ClientDisconnectReason.signal); }) ..on((event) async { switch (event.newState) { @@ -865,10 +847,6 @@ class Engine extends Disposable with EventsEmittable { reason: event.reason.toSDKType()); await cleanUp(); } - }) - ..on((event) async { - logger.fine('[$objectId] Signal resumed'); - await resumeConnection.call(null); }); } diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index 51b6f9a10..f21fe23b9 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -96,17 +96,11 @@ class SignalClient extends Disposable with EventsEmittable { logger.fine('Socket status changed to $status'); switch (status) { case SocketStatus.kSocketStatusConnected: - if (_connectionState == ConnectionState.reconnecting) { - _updateConnectionState(ConnectionState.connected); - events.emit(const SignalResumedEvent()); - } else { - _updateConnectionState(ConnectionState.connected); - } + _updateConnectionState(ConnectionState.connected); break; case SocketStatus.kSocketStatusReconnecting: _updateConnectionState(ConnectionState.reconnecting); break; - case SocketStatus.kSocketStatusClosed: _updateConnectionState(ConnectionState.disconnected); break; diff --git a/lib/src/extensions.dart b/lib/src/extensions.dart index 099258b2e..4b9e568a5 100644 --- a/lib/src/extensions.dart +++ b/lib/src/extensions.dart @@ -97,9 +97,8 @@ extension RTCPeerConnectionStateExt on rtc.RTCPeerConnectionState { bool isDisconnected() => [ rtc.RTCPeerConnectionState.RTCPeerConnectionStateClosed, rtc.RTCPeerConnectionState.RTCPeerConnectionStateDisconnected, + rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed ].contains(this); - bool isFailed() => - this == rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed; } extension RTCIceTransportPolicyExt on RTCIceTransportPolicy { diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index 5b279a76e..f4fe608f9 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -343,11 +343,6 @@ class SignalTokenUpdatedEvent with SignalEvent, InternalEvent { }); } -@internal -class SignalResumedEvent with SignalEvent, EngineEvent, InternalEvent { - const SignalResumedEvent(); -} - // ---------------------------------------------------------------------- // Engine events // ---------------------------------------------------------------------- diff --git a/lib/src/support/websocket_utility.dart b/lib/src/support/websocket_utility.dart index 182e4b8a4..a3e60f3ec 100644 --- a/lib/src/support/websocket_utility.dart +++ b/lib/src/support/websocket_utility.dart @@ -202,6 +202,10 @@ class WebSocketUtility { } Future reconnect() async { + if (_reconnectUrl == null) { + logger.warning('WebSocket reconnect failed, no reconnect url'); + return false; + } if (_reconnectTimes < _reconnectCount) { if (_socketStatus != SocketStatus.kSocketStatusReconnecting) { _changeSocketStatus(SocketStatus.kSocketStatusReconnecting);