From 0fcc34145059a131a84d77bd329e518d83b87a61 Mon Sep 17 00:00:00 2001 From: CloudWebRTC Date: Thu, 21 Dec 2023 21:31:29 +0800 Subject: [PATCH] Improve reconnect logic for websocket (#406) * Capture possible exceptions when closing the room at any time and the internal process is busy. * fix. * imporve reconnect logic. * fix canReconnect logic on SignalLeaveEvent. * Monitor Connectivity changes and automatically reconnect. * update. * Improve PCs reconnection speed. * support Manual subscription. * update. * simple grid view for test. * update pubspec. * Fixed handleParticipantDisconnect not emitting for existing partcicpant. * Trigger full Reconnect when peerConnection Failed. * update grid view. * add checkIfDesposed. * update. * remove checkIfDisposed. * fixed rendering exception. * update. * update. * fix flutter analyze. * fix reconnect test. * update. * update. * revert changes for example app. * update. * Refactored reconnect logic to support connectivity/network change checking. * update. * update. * cleanup. * cleanup. * sendLeave for room.disconnect. * fix for web. * fix for web. * dart run import_sorter:main. * update. * fix. --- example/lib/exts.dart | 4 +- example/lib/pages/room.dart | 2 + example/lib/widgets/controls.dart | 4 + example/pubspec.yaml | 4 +- lib/src/core/engine.dart | 408 +++++++++++++++++++----------- lib/src/core/room.dart | 125 ++++----- lib/src/core/signal_client.dart | 133 ++++++---- lib/src/core/transport.dart | 1 - lib/src/events.dart | 12 + lib/src/extensions.dart | 10 +- lib/src/internal/events.dart | 113 +++++---- lib/src/track/local/video.dart | 10 +- lib/src/types/internal.dart | 3 + lib/src/types/other.dart | 3 + test/core/room_e2e_test.dart | 4 +- test/core/signal_client_test.dart | 14 +- 16 files changed, 520 insertions(+), 330 deletions(-) diff --git a/example/lib/exts.dart b/example/lib/exts.dart index 04316565f..aa6c5e682 100644 --- a/example/lib/exts.dart +++ b/example/lib/exts.dart @@ -208,12 +208,14 @@ extension LKExampleExt on BuildContext { enum SimulateScenarioResult { signalReconnect, + fullReconnect, + speakerUpdate, nodeFailure, migration, serverLeave, switchCandidate, - clear, e2eeKeyRatchet, participantName, participantMetadata, + clear, } diff --git a/example/lib/pages/room.dart b/example/lib/pages/room.dart index ee04b11ef..90f9abf7a 100644 --- a/example/lib/pages/room.dart +++ b/example/lib/pages/room.dart @@ -96,6 +96,8 @@ class _RoomPageState extends State { }) ..on((_) => _sortParticipants()) ..on((_) => _sortParticipants()) + ..on((_) => _sortParticipants()) + ..on((_) => _sortParticipants()) ..on(_onE2EEStateEvent) ..on((event) { print( diff --git a/example/lib/widgets/controls.dart b/example/lib/widgets/controls.dart index 356269d7f..e512fdb5b 100644 --- a/example/lib/widgets/controls.dart +++ b/example/lib/widgets/controls.dart @@ -250,8 +250,12 @@ class _ControlsWidgetState extends State { } await widget.room.sendSimulateScenario( + speakerUpdate: + result == SimulateScenarioResult.speakerUpdate ? 3 : null, signalReconnect: result == SimulateScenarioResult.signalReconnect ? true : null, + fullReconnect: + result == SimulateScenarioResult.fullReconnect ? true : null, nodeFailure: result == SimulateScenarioResult.nodeFailure ? true : null, migration: result == SimulateScenarioResult.migration ? true : null, serverLeave: result == SimulateScenarioResult.serverLeave ? true : null, diff --git a/example/pubspec.yaml b/example/pubspec.yaml index 92065730a..c2e54f653 100644 --- a/example/pubspec.yaml +++ b/example/pubspec.yaml @@ -21,12 +21,10 @@ dependencies: flutter_svg: ^2.0.5 dropdown_button2: ^2.3.6 flutter_window_close: ^0.2.2 + collection: '>=1.16.0' livekit_client: path: ../ - # git: - # url: https://github.com/livekit/client-sdk-flutter - # ref: main dev_dependencies: flutter_test: diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index c2e5ce606..5b881b6a8 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -17,6 +17,7 @@ import 'dart:async'; import 'package:flutter/foundation.dart'; import 'package:collection/collection.dart'; +import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; @@ -43,6 +44,21 @@ import 'room.dart'; import 'signal_client.dart'; import 'transport.dart'; +const maxRetryDelay = 7000; + +const defaultRetryDelaysInMs = [ + 0, + 300, + 2 * 2 * 300, + 3 * 3 * 300, + 4 * 4 * 300, + maxRetryDelay, + maxRetryDelay, + maxRetryDelay, + maxRetryDelay, + maxRetryDelay, +]; + class Engine extends Disposable with EventsEmittable { static const _lossyDCLabel = '_lossy'; static const _reliableDCLabel = '_reliable'; @@ -69,17 +85,13 @@ class Engine extends Disposable with EventsEmittable { rtc.RTCDataChannel? _reliableDCSub; rtc.RTCDataChannel? _lossyDCSub; - ConnectionState _connectionState = ConnectionState.disconnected; - /// Connection state of the [Room]. - ConnectionState get connectionState => _connectionState; + ConnectionState get connectionState => signalClient.connectionState; // true if publisher connection has already been established. // this is helpful to know if we need to restart ICE on the publisher connection bool _hasPublished = false; - bool _restarting = false; - lk_models.ClientConfiguration? _clientConfiguration; // remember url and token for reconnect @@ -91,12 +103,11 @@ class Engine extends Disposable with EventsEmittable { FastConnectOptions? fastConnectOptions; bool _subscriberPrimary = false; - String? _participantSid; String? _connectedServerAddress; String? get connectedServerAddress => _connectedServerAddress; - bool fullReconnect = false; + bool fullReconnectOnNext = false; // server-provided ice servers List _serverProvidedIceServers = []; @@ -104,6 +115,31 @@ class Engine extends Disposable with EventsEmittable { late EventsListener _signalListener = signalClient.createListener(synchronized: true); + int? reconnectAttempts; + + Timer? reconnectTimeout; + DateTime? reconnectStart; + + bool _isClosed = false; + + bool get isClosed => _isClosed; + + final int _reconnectCount = defaultRetryDelaysInMs.length; + + bool attemptingReconnect = false; + + void clearReconnectTimeout() { + if (reconnectTimeout != null) { + reconnectTimeout?.cancel(); + reconnectTimeout = null; + } + } + + void clearPendingReconnect() { + clearReconnectTimeout(); + reconnectAttempts = 0; + } + Engine({ required this.connectOptions, required this.roomOptions, @@ -121,6 +157,7 @@ class Engine extends Disposable with EventsEmittable { _setUpSignalListeners(); onDispose(() async { + _isClosed = true; await cleanUp(); await events.dispose(); await _signalListener.dispose(); @@ -141,8 +178,6 @@ class Engine extends Disposable with EventsEmittable { this.roomOptions = roomOptions ?? this.roomOptions; this.fastConnectOptions = fastConnectOptions; - _updateConnectionState(ConnectionState.connecting); - try { // wait for socket to connect rtc server await signalClient.connect( @@ -168,11 +203,13 @@ class Engine extends Disposable with EventsEmittable { onTimeout: () => throw MediaConnectException( 'Timed out waiting for PeerConnection to connect, please check your network for ice connectivity'), ); - - _updateConnectionState(ConnectionState.connected); + events.emit(const EngineConnectedEvent()); } catch (error) { logger.fine('Connect Error $error'); - _updateConnectionState(ConnectionState.disconnected); + + events.emit(EngineDisconnectedEvent( + reason: DisconnectReason.joinFailure, + )); rethrow; } } @@ -190,7 +227,9 @@ class Engine extends Disposable with EventsEmittable { await signalClient.cleanUp(); - _updateConnectionState(ConnectionState.disconnected); + fullReconnectOnNext = false; + + clearPendingReconnect(); } @internal @@ -257,7 +296,7 @@ class Engine extends Disposable with EventsEmittable { publisher!.negotiate(null); } catch (error) { if (error is NegotiationError) { - fullReconnect = true; + fullReconnectOnNext = true; } await handleDisconnect(ClientDisconnectReason.negotiationFailed); } @@ -387,25 +426,31 @@ class Engine extends Disposable with EventsEmittable { subscriber?.pc.onDataChannel = _onDataChannel; } - subscriber?.pc.onConnectionState = - (state) => events.emit(EngineSubscriberPeerStateUpdatedEvent( - state: state, - isPrimary: _subscriberPrimary, - )); - - publisher?.pc.onConnectionState = - (state) => events.emit(EnginePublisherPeerStateUpdatedEvent( - state: state, - isPrimary: !_subscriberPrimary, - )); + subscriber?.pc.onConnectionState = (state) async { + events.emit(EngineSubscriberPeerStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); + logger.fine('subscriber connectionState: $state'); + if (state.isDisconnected() || state.isFailed()) { + await handleDisconnect(state.isFailed() + ? ClientDisconnectReason.peerConnectionFailed + : ClientDisconnectReason.peerConnectionClosed); + } + }; - events.on((event) { - if (event.state.isDisconnectedOrFailed()) { - handleDisconnect(ClientDisconnectReason.reconnect); - } else if (event.state.isClosed()) { - handleDisconnect(ClientDisconnectReason.peerConnectionClosed); + publisher?.pc.onConnectionState = (state) async { + events.emit(EnginePublisherPeerStateUpdatedEvent( + state: state, + isPrimary: !_subscriberPrimary, + )); + logger.fine('publisher connectionState: $state'); + if (state.isDisconnected() || state.isFailed()) { + await handleDisconnect(state.isFailed() + ? ClientDisconnectReason.peerConnectionFailed + : ClientDisconnectReason.peerConnectionClosed); } - }); + }; subscriber?.pc.onTrack = (rtc.RTCTrackEvent event) { logger.fine('[WebRTC] pc.onTrack'); @@ -427,11 +472,11 @@ class Engine extends Disposable with EventsEmittable { logger.fine('[WebRTC] stream.onRemoveTrack'); }; - if (connectionState == ConnectionState.reconnecting || - connectionState == ConnectionState.connecting) { + if (signalClient.connectionState == ConnectionState.reconnecting || + signalClient.connectionState == ConnectionState.connecting) { final track = event.track; final receiver = event.receiver; - events.on((event) async { + events.on((event) async { Timer(const Duration(milliseconds: 10), () { events.emit(EngineTrackAddedEvent( track: track, @@ -568,114 +613,171 @@ class Engine extends Disposable with EventsEmittable { } Future handleDisconnect(ClientDisconnectReason reason) async { - logger - .info('onDisconnected state:${_connectionState} reason:${reason.name}'); - - if (!fullReconnect) { - fullReconnect = _clientConfiguration?.resumeConnection == - lk_models.ClientConfigSetting.DISABLED || - [ - ClientDisconnectReason.leaveReconnect, - ClientDisconnectReason.negotiationFailed, - ClientDisconnectReason.peerConnectionClosed - ].contains(reason); - } - - if (_restarting || - (_connectionState == ConnectionState.reconnecting && !fullReconnect)) { - logger.fine('[$objectId] Already reconnecting...'); + if (_isClosed) { + logger.fine('handleDisconnect: engine is closed, skip'); return; } - if (_connectionState == ConnectionState.disconnected) { - logger.fine('[$objectId] Already disconnected... $reason'); - return; - } + logger + .info('onDisconnected state:${connectionState} reason:${reason.name}'); - logger.fine('[$runtimeType] Should attempt reconnect sequence...'); - if (fullReconnect) { - await restartConnection(); - } else { - await resumeConnection(); + if (reconnectAttempts == 0) { + reconnectStart = DateTime.now(); } + + var delay = defaultRetryDelaysInMs[reconnectAttempts!]; + + clearReconnectTimeout(); + logger.fine( + 'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts'); + reconnectTimeout = Timer(Duration(milliseconds: delay), () async { + await attemptReconnect(reason); + }); } @internal - Future resumeConnection() async { - if (_connectionState == ConnectionState.disconnected) { - logger.fine('resumeConnection: Already closed.'); + Future attemptReconnect(ClientDisconnectReason reason) async { + if (_isClosed) { + return; + } + + // guard for attempting reconnection multiple times while one attempt is still not finished + if (attemptingReconnect) { return; } - if (url == null || token == null) { - throw ConnectException( - 'could not resume connection without url and token'); + if (_clientConfiguration?.resumeConnection == + lk_models.ClientConfigSetting.DISABLED || + [ + ClientDisconnectReason.leaveReconnect, + ClientDisconnectReason.negotiationFailed, + ClientDisconnectReason.peerConnectionFailed, + ].contains(reason)) { + fullReconnectOnNext = true; } - Future sequence() async { - await signalClient.connect( - url!, - token!, - connectOptions: connectOptions, - roomOptions: roomOptions, - reconnect: true, - sid: _participantSid, - ); + if (reconnectAttempts! >= _reconnectCount) { + logger.fine('reconnectAttempts exceeded, disconnecting...'); + events.emit(EngineDisconnectedEvent( + reason: DisconnectReason.connectionClosed, + )); + await cleanUp(); + return; + } - if (publisher == null || subscriber == null) { - throw UnexpectedStateException('publisher or subscribers is null'); + try { + attemptingReconnect = true; + + if (await signalClient.checkInternetConnection() == false) { + logger.fine('no internet connection, waiting...'); + await signalClient.events.waitFor( + duration: connectOptions.timeouts.connection * 10, + filter: (event) => event.state != ConnectivityResult.none, + onTimeout: () => throw ConnectException( + 'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent'), + ); } - subscriber!.restartingIce = true; + if (fullReconnectOnNext) { + await restartConnection(); + } else { + await resumeConnection(reason); + } + clearPendingReconnect(); + attemptingReconnect = false; + } catch (e) { + reconnectAttempts = reconnectAttempts! + 1; + bool recoverable = true; + if (e is WebSocketException || + e is ConnectException || + e is MediaConnectException) { + // cannot resume connection, need to do full reconnect + fullReconnectOnNext = true; + } else if (e is TimeoutException) { + fullReconnectOnNext = false; + } else { + recoverable = false; + } - if (_hasPublished) { - logger.fine('resumeConnection: negotiating publisher...'); - await publisher!.createAndSendOffer(const RTCOfferOptions( - iceRestart: true, + if (recoverable) { + unawaited(handleDisconnect(ClientDisconnectReason.reconnectRetry)); + } else { + logger.fine('attemptReconnect: disconnecting...'); + events.emit(EngineDisconnectedEvent( + reason: DisconnectReason.connectionClosed, )); + await cleanUp(); } + } finally { + attemptingReconnect = false; + } + } + + Future resumeConnection(ClientDisconnectReason reason) async { + if (_isClosed) { + return; + } + + events.emit(const EngineResumingEvent()); + + // wait for socket to connect rtc server + await signalClient.connect( + url!, + token!, + connectOptions: connectOptions, + roomOptions: roomOptions, + reconnect: true, + ); - final iceConnected = - (await primary?.pc.getConnectionState())?.isConnected() ?? false; + await events.waitFor( + duration: connectOptions.timeouts.connection, + onTimeout: () => throw ConnectException( + 'resumeConnection: Timed out waiting for SignalReconnectedEvent'), + ); - logger.fine('resumeConnection: iceConnected: $iceConnected'); + events.emit(const EngineSignalResumedEvent()); - if (!iceConnected) { - logger.fine('resumeConnection: Waiting for primary to connect...'); + logger.fine('resumeConnection: reason: ${reason.name}'); - await events.waitFor( - filter: (event) => event.isPrimary && event.state.isConnected(), - duration: connectOptions.timeouts.iceRestart, - onTimeout: () => throw MediaConnectException('ice restart failed'), - ); - } + if (_hasPublished) { + logger.fine('resumeConnection: negotiating publisher...'); + await publisher!.createAndSendOffer(const RTCOfferOptions( + iceRestart: true, + )); } - try { - _updateConnectionState(ConnectionState.reconnecting); - await Utils.retry( - (tries, errors) { - logger.fine('Retrying connect sequence remaining ${tries} tries...'); - return sequence(); - }, - retryCondition: (_, __) => - _connectionState == ConnectionState.reconnecting, - tries: 3, - delay: const Duration(seconds: 3), + final isConnected = + (await primary?.pc.getConnectionState())?.isConnected() ?? false; + + logger.fine('resumeConnection: primary is connected: $isConnected'); + + if (!isConnected) { + subscriber!.restartingIce = true; + logger.fine('resumeConnection: Waiting for primary to connect...'); + await events.waitFor( + filter: (event) => event.isPrimary && event.state.isConnected(), + duration: connectOptions.timeouts.peerConnection, + onTimeout: () => throw MediaConnectException( + 'resumeConnection: Timed out waiting for EnginePeerStateUpdatedEvent'), ); - _updateConnectionState(ConnectionState.connected); - } catch (error) { - _updateConnectionState(ConnectionState.disconnected); + logger.fine('resumeConnection: primary connected'); } + + events.emit(const EngineReconnectedEvent()); } @internal Future restartConnection([bool signalEvents = false]) async { - if (_restarting) { - logger.fine('restartConnection: Already restarting...'); + if (_isClosed) { return; } - _restarting = true; + + events.emit(const EngineFullRestartingEvent()); + + if (signalClient.connectionState == ConnectionState.connected) { + await signalClient.sendLeave(); + } + await publisher?.dispose(); publisher = null; @@ -709,8 +811,9 @@ class Engine extends Disposable with EventsEmittable { ); } - fullReconnect = false; - _restarting = false; + fullReconnectOnNext = false; + + events.emit(const EngineRestartedEvent()); } @internal @@ -718,9 +821,13 @@ class Engine extends Disposable with EventsEmittable { required lk_rtc.UpdateSubscription subscription, required Iterable? publishTracks, }) async { - final answer = (await subscriber?.pc.getLocalDescription())?.toPBType(); + final previousAnswer = + (await subscriber?.pc.getLocalDescription())?.toPBType(); + final previousOffer = + (await publisher?.pc.getLocalDescription())?.toPBType(); signalClient.sendSyncState( - answer: answer, + answer: previousAnswer, + offer: previousOffer, subscription: subscription, publishTracks: publishTracks, dataChannelInfo: dataChannelInfo(), @@ -728,18 +835,15 @@ class Engine extends Disposable with EventsEmittable { } void _setUpEngineListeners() => - events.on((event) async { - if (event.didReconnect) { - // send queued requests if engine re-connected - signalClient.sendQueuedRequests(); - } + events.on((event) async { + // send queued requests if engine re-connected + signalClient.sendQueuedRequests(); }); void _setUpSignalListeners() => _signalListener ..on((event) async { // create peer connections _subscriberPrimary = event.response.subscriberPrimary; - _participantSid = event.response.participant.sid; var iceServersFromServer = event.response.iceServers.map((e) => e.toSDKType()).toList(); @@ -793,10 +897,29 @@ class Engine extends Disposable with EventsEmittable { if (!_subscriberPrimary) { await negotiate(); } + + events.emit(const SignalReconnectedEvent()); + }) + ..on((event) async { + logger.fine('Signal connected'); + reconnectAttempts = 0; + events.emit(const EngineConnectedEvent()); + }) + ..on((event) async { + logger.fine('Signal connecting'); + }) + ..on((event) async { + logger.fine('Signal reconnecting'); + events.emit(const EngineReconnectingEvent()); }) - ..on((event) async { - if (event.newState == ConnectionState.disconnected) { + ..on((event) async { + logger.fine('Signal disconnected ${event.reason}'); + if (event.reason == DisconnectReason.connectionClosed && !_isClosed) { await handleDisconnect(ClientDisconnectReason.signal); + } else { + events.emit(EngineDisconnectedEvent( + reason: event.reason, + )); } }) ..on((event) async { @@ -835,7 +958,7 @@ class Engine extends Disposable with EventsEmittable { 'Received ${SignalTrickleEvent} but publisher or subscriber was null.'); return; } - logger.fine('got ICE candidate from peer'); + logger.fine('got ICE candidate from peer (target: ${event.target})'); if (event.target == lk_rtc.SignalTarget.SUBSCRIBER) { await subscriber!.addIceCandidate(event.candidate); } else if (event.target == lk_rtc.SignalTarget.PUBLISHER) { @@ -848,22 +971,29 @@ class Engine extends Disposable with EventsEmittable { }) ..on((event) async { if (event.canReconnect) { - fullReconnect = true; + fullReconnectOnNext = true; // reconnect immediately instead of waiting for next attempt - _connectionState = ConnectionState.reconnecting; - _updateConnectionState(ConnectionState.reconnecting); await handleDisconnect(ClientDisconnectReason.leaveReconnect); } else { - if (_connectionState == ConnectionState.reconnecting) { + if (connectionState == ConnectionState.reconnecting) { logger.warning( '[Signal] Received Leave while engine is reconnecting, ignoring...'); return; } - _updateConnectionState(ConnectionState.disconnected, - reason: event.reason.toSDKType()); + await signalClient.cleanUp(); await cleanUp(); + events.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType())); } }); + + Future disconnect() async { + _isClosed = true; + if (connectionState == ConnectionState.connected) { + await signalClient.sendLeave(); + } else { + await cleanUp(); + } + } } extension EnginePrivateMethods on Engine { @@ -875,28 +1005,6 @@ extension EnginePrivateMethods on Engine { rtc.RTCDataChannelState _publisherDataChannelState(Reliability reliability) => _publisherDataChannel(reliability)?.state ?? rtc.RTCDataChannelState.RTCDataChannelClosed; - - void _updateConnectionState(ConnectionState newValue, - {DisconnectReason? reason}) { - if (_connectionState == newValue) return; - - logger.fine('Engine ConnectionState ' - '${_connectionState.name} -> ${newValue.name}'); - - bool didReconnect = _connectionState == ConnectionState.reconnecting && - newValue == ConnectionState.connected; - // update internal value - final oldState = _connectionState; - _connectionState = newValue; - - events.emit(EngineConnectionStateUpdatedEvent( - newState: _connectionState, - oldState: oldState, - didReconnect: didReconnect, - fullReconnect: fullReconnect, - disconnectReason: reason, - )); - } } extension EngineInternalMethods on Engine { diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index aee5262a8..3d635a970 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -201,13 +201,13 @@ class Room extends DisposableChangeNotifier with EventsEmittable { info: event.response.participant, ); - if (engine.fullReconnect) { + if (engine.fullReconnectOnNext) { _localParticipant!.updateFromInfo(event.response.participant); } if (connectOptions.protocolVersion.index >= ProtocolVersion.v8.index && engine.fastConnectOptions != null && - !engine.fullReconnect) { + !engine.fullReconnectOnNext) { var options = engine.fastConnectOptions!; var audio = options.microphone; @@ -261,6 +261,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } logger.fine('Room Connect completed'); + + events.emit(RoomConnectedEvent(room: this, metadata: _metadata)); }) ..on( (event) => _onParticipantUpdateEvent(event.participants)) @@ -339,13 +341,6 @@ class Room extends DisposableChangeNotifier with EventsEmittable { RoomRecordingStatusChanged(activeRecording: _isRecording)); } }) - ..on((event) { - // during reconnection, need to send sync state upon signal connection. - if (event.newState == ConnectionState.reconnecting) { - logger.fine('Sending syncState'); - _sendSyncState(); - } - }) ..on((event) async { final publication = localParticipant?.trackPublications[event.sid]; if (event.muted) { @@ -360,44 +355,53 @@ class Room extends DisposableChangeNotifier with EventsEmittable { }); void _setUpEngineListeners() => _engineListener - ..on((event) async { - if (event.didReconnect) { - events.emit(const RoomReconnectedEvent()); - // re-send tracks permissions - localParticipant?.sendTrackSubscriptionPermissions(); - } else if (event.fullReconnect && - event.newState == ConnectionState.connecting) { - events.emit(const RoomRestartingEvent()); - // clean up RemoteParticipants - for (final participant in _participants.values) { - events.emit(ParticipantDisconnectedEvent(participant: participant)); - await participant.dispose(); - } - _participants.clear(); - _activeSpeakers.clear(); - // reset params - _name = null; - _sid = null; - _metadata = null; - _serverVersion = null; - _serverRegion = null; - } else if (event.fullReconnect && - event.newState == ConnectionState.connected) { - events.emit(const RoomRestartedEvent()); - await _handlePostReconnect(event.fullReconnect); - } else if (event.newState == ConnectionState.reconnecting) { - events.emit(const RoomReconnectingEvent()); - } else if (event.newState == ConnectionState.disconnected) { - if (!event.fullReconnect) { - await _cleanUp(); - events.emit(RoomDisconnectedEvent(reason: event.disconnectReason)); + ..on((event) async { + events.emit(const RoomReconnectedEvent()); + // re-send tracks permissions + localParticipant?.sendTrackSubscriptionPermissions(); + }) + ..on((event) async { + events.emit(const RoomRestartingEvent()); + + // clean up RemoteParticipants + var copy = _participants.values.toList(); + + _participants.clear(); + _activeSpeakers.clear(); + // reset params + _name = null; + _sid = null; + _metadata = null; + _serverVersion = null; + _serverRegion = null; + + for (final participant in copy) { + events.emit(ParticipantDisconnectedEvent(participant: participant)); + await participant.dispose(); + } + }) + ..on((event) async { + events.emit(const RoomRestartedEvent()); + + // re-publish all tracks + await localParticipant?.rePublishAllTracks(); + + for (var participant in participants.values) { + for (var pub in participant.trackPublications.values) { + if (pub.subscribed) { + pub.sendUpdateTrackSettings(); + } } } - // always notify ChangeNotifier - notifyListeners(); }) - ..on((event) {}) - ..on((event) {}) + ..on((event) async { + events.emit(const RoomReconnectingEvent()); + await _sendSyncState(); + }) + ..on((event) async { + await _cleanUp(); + events.emit(RoomDisconnectedEvent(reason: event.reason)); + }) ..on( (event) => _onEngineActiveSpeakersUpdateEvent(event.speakers)) ..on(_onDataMessageEvent) @@ -440,9 +444,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable { /// Disconnects from the room, notifying server of disconnection. Future disconnect() async { - if (connectionState != ConnectionState.disconnected) { - engine.signalClient.sendLeave(); + if (engine.isClosed) { + events.emit(RoomDisconnectedEvent(reason: DisconnectReason.unknown)); + return; } + await engine.disconnect(); await _cleanUp(); } @@ -494,13 +500,15 @@ class Room extends DisposableChangeNotifier with EventsEmittable { continue; } - if (info.state == lk_models.ParticipantInfo_State.DISCONNECTED) { + final isNew = !_participants.containsKey(info.sid); + + if (info.state == lk_models.ParticipantInfo_State.DISCONNECTED && + !isNew) { hasChanged = true; await _handleParticipantDisconnect(info.sid); continue; } - final isNew = !_participants.containsKey(info.sid); final participant = _getOrCreateRemoteParticipant(info.sid, info); if (isNew) { @@ -658,20 +666,6 @@ class Room extends DisposableChangeNotifier with EventsEmittable { publishTracks: localParticipant?.publishedTracksInfo(), ); } - - Future _handlePostReconnect(bool isFullReconnect) async { - if (isFullReconnect) { - // re-publish all tracks - await localParticipant?.rePublishAllTracks(); - } - for (var participant in participants.values) { - for (var pub in participant.trackPublications.values) { - if (pub.subscribed) { - pub.sendUpdateTrackSettings(); - } - } - } - } } extension RoomPrivateMethods on Room { @@ -720,11 +714,18 @@ extension RoomDebugMethods on Room { bool? serverLeave, bool? switchCandidate, bool? signalReconnect, + bool? fullReconnect, + int? subscriberBandwidth, }) async { if (signalReconnect != null && signalReconnect) { await engine.signalClient.cleanUp(); return; } + if (fullReconnect != null && fullReconnect) { + engine.fullReconnectOnNext = true; + await engine.signalClient.cleanUp(); + return; + } engine.signalClient.sendSimulateScenario( speakerUpdate: speakerUpdate, nodeFailure: nodeFailure, diff --git a/lib/src/core/signal_client.dart b/lib/src/core/signal_client.dart index c2490b6e1..a492141a7 100644 --- a/lib/src/core/signal_client.dart +++ b/lib/src/core/signal_client.dart @@ -15,6 +15,9 @@ import 'dart:async'; import 'dart:collection'; +import 'package:flutter/foundation.dart'; + +import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:fixnum/fixnum.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:http/http.dart' as http; @@ -30,14 +33,15 @@ import '../options.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; import '../support/disposable.dart'; +import '../support/platform.dart'; import '../support/websocket.dart'; import '../types/other.dart'; import '../types/video_dimensions.dart'; import '../utils.dart'; class SignalClient extends Disposable with EventsEmittable { - // Connection state of the socket conection. ConnectionState _connectionState = ConnectionState.disconnected; + ConnectionState get connectionState => _connectionState; final WebSocketConnector _wsConnector; LiveKitWebSocket? _ws; @@ -52,6 +56,18 @@ class SignalClient extends Disposable with EventsEmittable { int get pingCount => _pingCount; int _pingCount = 0; + String? participantSid; + + ConnectivityResult? _connectivityResult; + StreamSubscription? connectivitySubscription; + + Future checkInternetConnection() async { + if (!kIsWeb && !lkPlatformIsTest()) { + return true; + } + _connectivityResult = await Connectivity().checkConnectivity(); + return _connectivityResult != ConnectivityResult.none; + } @internal SignalClient(WebSocketConnector wsConnector) : _wsConnector = wsConnector { @@ -62,6 +78,9 @@ class SignalClient extends Disposable with EventsEmittable { onDispose(() async { await cleanUp(); await events.dispose(); + if (!kIsWeb && !lkPlatformIsTest()) { + await connectivitySubscription?.cancel(); + } }); } @@ -72,23 +91,52 @@ class SignalClient extends Disposable with EventsEmittable { required ConnectOptions connectOptions, required RoomOptions roomOptions, bool reconnect = false, - String? sid, }) async { + if (!kIsWeb && !lkPlatformIsTest()) { + _connectivityResult = await Connectivity().checkConnectivity(); + connectivitySubscription ??= Connectivity() + .onConnectivityChanged + .listen((ConnectivityResult result) { + if (_connectivityResult != result) { + _connectivityResult = result; + if (result == ConnectivityResult.none) { + logger.warning('lost internet connection'); + } else { + logger.info('internet connection restored'); + events.emit(SignalConnectivityChangedEvent( + state: result, + )); + } + } + }); + + if (_connectivityResult == ConnectivityResult.none) { + logger.warning('no internet connection'); + events.emit(SignalDisconnectedEvent( + reason: DisconnectReason.noInternetConnection)); + throw ConnectException('no internet connection'); + } + } + final rtcUri = await Utils.buildUri( uriString, token: token, connectOptions: connectOptions, roomOptions: roomOptions, reconnect: reconnect, - sid: sid, + sid: reconnect ? participantSid : null, ); logger.fine('SignalClient connecting with url: $rtcUri'); try { - _updateConnectionState(reconnect - ? ConnectionState.reconnecting - : ConnectionState.connecting); + if (reconnect == true) { + _connectionState = ConnectionState.reconnecting; + events.emit(const SignalReconnectingEvent()); + } else { + _connectionState = ConnectionState.connecting; + events.emit(const SignalConnectingEvent()); + } // Clean up existing socket await cleanUp(); // Attempt to connect @@ -101,14 +149,15 @@ class SignalClient extends Disposable with EventsEmittable { ), ); // Successful connection - _updateConnectionState(ConnectionState.connected); + _connectionState = ConnectionState.connected; + events.emit(const SignalConnectedEvent()); } catch (socketError) { + // Skip validation if reconnect mode + if (reconnect) rethrow; + // Attempt Validation var finalError = socketError; try { - // Skip validation if reconnect mode - if (reconnect) rethrow; - // Re-build same uri for validate mode final validateUri = await Utils.buildUri( uriString, @@ -128,17 +177,25 @@ class SignalClient extends Disposable with EventsEmittable { finalError = error; } } finally { - _updateConnectionState(ConnectionState.disconnected); + events.emit(SignalDisconnectedEvent( + reason: DisconnectReason.signalingConnectionFailure)); throw finalError; } } } + Future sendLeave() async { + _sendRequest(lk_rtc.SignalRequest( + leave: lk_rtc.LeaveRequest( + canReconnect: false, + reason: lk_models.DisconnectReason.CLIENT_INITIATED))); + } + // resets internal state to a re-usable state @internal Future cleanUp() async { logger.fine('[${objectId}] cleanUp()'); - + _connectionState = ConnectionState.disconnected; await _ws?.dispose(); _ws = null; _queue.clear(); @@ -154,15 +211,16 @@ class SignalClient extends Disposable with EventsEmittable { return; } - if (_connectionState == ConnectionState.reconnecting && + if (connectionState == ConnectionState.reconnecting && req._canQueue() && enqueueIfReconnecting) { _queue.add(req); return; } - if (_ws == null) { - logger.warning('[$objectId] Could not send message, socket is null'); + if (connectionState != ConnectionState.connected) { + logger + .warning('[$objectId] Could not send message, socket not connected'); return; } @@ -182,6 +240,7 @@ class SignalClient extends Disposable with EventsEmittable { 'ping config timeout: ${msg.join.pingTimeout}, interval: ${msg.join.pingInterval} '); _startPingInterval(); } + participantSid = msg.join.participant.sid; events.emit(SignalJoinResponseEvent(response: msg.join)); break; case lk_rtc.SignalResponse_Message.answer: @@ -275,12 +334,14 @@ class SignalClient extends Disposable with EventsEmittable { } void _onSocketDispose() { - logger.fine('SignalClient onSocketDispose $_connectionState'); // don't emit event's when reconnecting state - if (_connectionState != ConnectionState.reconnecting) { - logger.fine('SignalClient did disconnect ${_connectionState}'); - _updateConnectionState(ConnectionState.disconnected); + logger.fine('SignalClient did disconnect ${_connectionState}'); + if (_connectionState == ConnectionState.reconnecting) { + return; } + _connectionState = ConnectionState.disconnected; + events.emit( + SignalDisconnectedEvent(reason: DisconnectReason.connectionClosed)); } void _sendPing() { @@ -456,6 +517,7 @@ extension SignalClientRequests on SignalClient { @internal void sendSyncState({ required lk_rtc.SessionDescription? answer, + required lk_rtc.SessionDescription? offer, required lk_rtc.UpdateSubscription subscription, required Iterable? publishTracks, required Iterable? dataChannelInfo, @@ -463,6 +525,7 @@ extension SignalClientRequests on SignalClient { _sendRequest(lk_rtc.SignalRequest( syncState: lk_rtc.SyncState( answer: answer, + offer: offer, subscription: subscription, publishTracks: publishTracks, dataChannels: dataChannelInfo, @@ -497,43 +560,11 @@ extension on lk_rtc.SignalRequest { // list of types that cannot be queued lk_rtc.SignalRequest_Message.syncState, lk_rtc.SignalRequest_Message.trickle, - lk_rtc.SignalRequest_Message.offer, lk_rtc.SignalRequest_Message.answer, lk_rtc.SignalRequest_Message.simulate ].contains(whichMessage()); } -extension SignalClientPrivateMethods on SignalClient { - void _updateConnectionState(ConnectionState newValue) { - if (_connectionState == newValue) return; - - logger.fine('SignalClient ConnectionState ' - '${_connectionState.name} -> ${newValue.name}'); - - bool didReconnect = _connectionState == ConnectionState.reconnecting && - newValue == ConnectionState.connected; - - final oldState = _connectionState; - - if (newValue == ConnectionState.connected && - oldState == ConnectionState.reconnecting) { - // restart ping interval as it's cleared for reconnection - _startPingInterval(); - } else if (newValue == ConnectionState.reconnecting) { - // clear ping interval and restart it once reconnected - _clearPingInterval(); - } - - _connectionState = newValue; - - events.emit(SignalConnectionStateUpdatedEvent( - newState: _connectionState, - oldState: oldState, - didReconnect: didReconnect, - )); - } -} - // internal methods extension SignalClientInternalMethods on SignalClient { @internal diff --git a/lib/src/core/transport.dart b/lib/src/core/transport.dart index ed03d0146..057618e4d 100644 --- a/lib/src/core/transport.dart +++ b/lib/src/core/transport.dart @@ -275,7 +275,6 @@ class Transport extends Disposable { try { final result = await pc.getRemoteDescription(); - logger.fine('pc.getRemoteDescription $result'); return result; } catch (_) { logger.warning('pc.getRemoteDescription failed with error: $_'); diff --git a/lib/src/events.dart b/lib/src/events.dart index 34dae53eb..e2b25d0b0 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -44,6 +44,18 @@ mixin EngineEvent implements LiveKitEvent {} /// Base type for all [SignalClient] events. mixin SignalEvent implements LiveKitEvent {} +class RoomConnectedEvent with RoomEvent { + final Room room; + final String? metadata; + const RoomConnectedEvent({ + required this.room, + required this.metadata, + }); + + @override + String toString() => '${runtimeType}(room: ${room})'; +} + /// When the connection to the server has been interrupted and it's attempting /// to reconnect. /// Emitted by [Room]. diff --git a/lib/src/extensions.dart b/lib/src/extensions.dart index 56477fbeb..a30007fce 100644 --- a/lib/src/extensions.dart +++ b/lib/src/extensions.dart @@ -94,13 +94,13 @@ extension RTCPeerConnectionStateExt on rtc.RTCPeerConnectionState { bool isConnected() => this == rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnected; - bool isClosed() => - this == rtc.RTCPeerConnectionState.RTCPeerConnectionStateClosed; - - bool isDisconnectedOrFailed() => [ + bool isDisconnected() => [ + rtc.RTCPeerConnectionState.RTCPeerConnectionStateClosed, rtc.RTCPeerConnectionState.RTCPeerConnectionStateDisconnected, - rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed, ].contains(this); + + bool isFailed() => + this == rtc.RTCPeerConnectionState.RTCPeerConnectionStateFailed; } extension RTCIceTransportPolicyExt on RTCIceTransportPolicy { diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index f4fe608f9..1bdbd62df 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:meta/meta.dart'; @@ -140,57 +141,83 @@ class SignalReconnectResponseEvent with SignalEvent, InternalEvent { }); } -/// Base class for a ConnectionStateUpdated event @internal -abstract class ConnectionStateUpdatedEvent with InternalEvent { - final ConnectionState newState; - final ConnectionState oldState; - final bool didReconnect; - final DisconnectReason? disconnectReason; - const ConnectionStateUpdatedEvent({ - required this.newState, - required this.oldState, - required this.didReconnect, - this.disconnectReason, +class SignalConnectivityChangedEvent with SignalEvent, InternalEvent { + final ConnectivityResult state; + const SignalConnectivityChangedEvent({ + required this.state, }); - @override - String toString() => '$runtimeType(newState: ${newState.name}, ' - 'didReconnect: ${didReconnect}, ' - 'disconnectReason: ${disconnectReason})'; } @internal -class SignalConnectionStateUpdatedEvent extends ConnectionStateUpdatedEvent - with SignalEvent { - const SignalConnectionStateUpdatedEvent({ - required ConnectionState newState, - required ConnectionState oldState, - required bool didReconnect, - DisconnectReason? disconnectReason, - }) : super( - newState: newState, - oldState: oldState, - didReconnect: didReconnect, - disconnectReason: disconnectReason, - ); +class EngineConnectedEvent with InternalEvent, SignalEvent, EngineEvent { + const EngineConnectedEvent(); } @internal -class EngineConnectionStateUpdatedEvent extends ConnectionStateUpdatedEvent - with EngineEvent { - final bool fullReconnect; - const EngineConnectionStateUpdatedEvent({ - required ConnectionState newState, - required ConnectionState oldState, - required bool didReconnect, - required this.fullReconnect, - DisconnectReason? disconnectReason, - }) : super( - newState: newState, - oldState: oldState, - didReconnect: didReconnect, - disconnectReason: disconnectReason, - ); +class EngineDisconnectedEvent with InternalEvent, EngineEvent { + DisconnectReason? reason; + EngineDisconnectedEvent({ + this.reason, + }); +} + +@internal +class EngineFullRestartingEvent with InternalEvent, EngineEvent { + const EngineFullRestartingEvent(); +} + +@internal +class EngineRestartedEvent with InternalEvent, EngineEvent { + const EngineRestartedEvent(); +} + +@internal +class EngineReconnectingEvent with InternalEvent, EngineEvent { + const EngineReconnectingEvent(); +} + +@internal +class EngineReconnectedEvent with InternalEvent, EngineEvent { + const EngineReconnectedEvent(); +} + +@internal +class EngineResumingEvent with InternalEvent, EngineEvent { + const EngineResumingEvent(); +} + +@internal +class EngineSignalResumedEvent with EngineEvent, InternalEvent { + const EngineSignalResumedEvent(); +} + +@internal +class SignalConnectedEvent with SignalEvent, InternalEvent { + const SignalConnectedEvent(); +} + +@internal +class SignalConnectingEvent with SignalEvent, InternalEvent { + const SignalConnectingEvent(); +} + +@internal +class SignalReconnectingEvent with SignalEvent, InternalEvent { + const SignalReconnectingEvent(); +} + +@internal +class SignalReconnectedEvent with SignalEvent, InternalEvent, EngineEvent { + const SignalReconnectedEvent(); +} + +@internal +class SignalDisconnectedEvent with SignalEvent, InternalEvent { + DisconnectReason? reason; + SignalDisconnectedEvent({ + this.reason, + }); } @internal diff --git a/lib/src/track/local/video.dart b/lib/src/track/local/video.dart index 2bcb8d350..613b01bd0 100644 --- a/lib/src/track/local/video.dart +++ b/lib/src/track/local/video.dart @@ -446,9 +446,13 @@ extension LocalVideoTrackExt on LocalVideoTrack { if (hasChanged) { params.encodings = encodings; - final result = await sender.setParameters(params); - if (result == false) { - logger.warning('Failed to update sender parameters'); + try { + final result = await sender.setParameters(params); + if (result == false) { + logger.warning('Failed to update sender parameters'); + } + } catch (e) { + logger.warning('Failed to update sender parameters $e'); } } } diff --git a/lib/src/types/internal.dart b/lib/src/types/internal.dart index 112352318..3570b54ee 100644 --- a/lib/src/types/internal.dart +++ b/lib/src/types/internal.dart @@ -18,8 +18,11 @@ import 'package:meta/meta.dart'; enum ClientDisconnectReason { user, peerConnectionClosed, + peerConnectionFailed, negotiationFailed, signal, reconnect, + reconnectRetry, leaveReconnect, + reconnectAttemptsExceeded, } diff --git a/lib/src/types/other.dart b/lib/src/types/other.dart index fdb8a6a30..615d331b5 100644 --- a/lib/src/types/other.dart +++ b/lib/src/types/other.dart @@ -84,6 +84,9 @@ enum DisconnectReason { roomDeleted, stateMismatch, joinFailure, + connectionClosed, + signalingConnectionFailure, + noInternetConnection, } /// The reason why a track failed to publish. diff --git a/test/core/room_e2e_test.dart b/test/core/room_e2e_test.dart index b6cee118a..b064585bd 100644 --- a/test/core/room_e2e_test.dart +++ b/test/core/room_e2e_test.dart @@ -143,8 +143,8 @@ void main() { test('leave', () async { expect( room.events.streamCtrl.stream, - emits(predicate((event) => - room.connectionState == ConnectionState.disconnected))); + emits(predicate( + (event) => event.reason == DisconnectReason.unknown))); ws.onData(leaveResponse.writeToBuffer()); }); }); diff --git a/test/core/signal_client_test.dart b/test/core/signal_client_test.dart index 492807765..012f5853e 100644 --- a/test/core/signal_client_test.dart +++ b/test/core/signal_client_test.dart @@ -41,10 +41,8 @@ void main() { expect( client.events.streamCtrl.stream, emitsInOrder([ - predicate( - (event) => event.newState == ConnectionState.connecting), - predicate( - (event) => event.newState == ConnectionState.connected), + predicate((event) => true), + predicate((event) => true), ])); await client.connect( exampleUri, @@ -53,15 +51,13 @@ void main() { roomOptions: roomOptions, ); }); + test('reconnect', () async { expect( client.events.streamCtrl.stream, emitsInOrder([ - predicate( - (event) => event.newState == ConnectionState.reconnecting), - predicate((event) => - event.newState == ConnectionState.connected && - event.didReconnect == true), + predicate((event) => true), + predicate((event) => true), ])); await client.connect( exampleUri,