From 92dc2d8ae91fc89e69c319d9f7f1649f837502bc Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Thu, 15 Feb 2024 12:39:18 -0500 Subject: [PATCH] Improve /api/status Include stats around packets sent and received --- internal/webrtc/webrtc.go | 91 +++++++++++++++++++++++++++++++++++---- internal/webrtc/whep.go | 6 ++- internal/webrtc/whip.go | 37 ++++------------ 3 files changed, 94 insertions(+), 40 deletions(-) diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index 400a375..393dea6 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -31,9 +31,12 @@ type ( stream struct { // Does this stream have a publisher? // If stream was created by a WHEP request hasWHIPClient == false - hasWHIPClient atomic.Bool - videoTrackLabels []string - audioTrack *webrtc.TrackLocalStaticRTP + hasWHIPClient atomic.Bool + + videoTracks []*videoTrack + + audioTrack *webrtc.TrackLocalStaticRTP + audioPacketsReceived atomic.Uint64 pliChan chan any @@ -41,6 +44,11 @@ type ( whepSessions map[string]*whepSession } + videoTrack struct { + rid string + packetsReceived atomic.Uint64 + } + videoTrackCodec int ) @@ -99,18 +107,19 @@ func deleteStream(streamKey string) { delete(streamMap, streamKey) } -func addTrack(stream *stream, rid string) error { +func addTrack(stream *stream, rid string) (*videoTrack, error) { streamMapLock.Lock() defer streamMapLock.Unlock() - for i := range stream.videoTrackLabels { - if rid == stream.videoTrackLabels[i] { - return nil + for i := range stream.videoTracks { + if rid == stream.videoTracks[i].rid { + return stream.videoTracks[i], nil } } - stream.videoTrackLabels = append(stream.videoTrackLabels, rid) - return nil + t := &videoTrack{rid: rid} + stream.videoTracks = append(stream.videoTracks, t) + return t, nil } func getPublicIP() string { @@ -310,3 +319,67 @@ func Configure() { webrtc.WithSettingEngine(createSettingEngine(false, udpMuxCache)), ) } + +type StreamStatusVideo struct { + RID string `json:"rid"` + PacketsReceived uint64 `json:"packetsReceived"` +} + +type StreamStatus struct { + StreamKey string `json:"streamKey"` + AudioPacketsReceived uint64 `json:"audioPacketsReceived"` + VideoStreams []StreamStatusVideo `json:"videoStreams"` + WHEPSessions []whepSessionStatus `json:"whepSessions"` +} + +type whepSessionStatus struct { + ID string `json:"id"` + CurrentLayer string `json:"currentLayer"` + SequenceNumber uint16 `json:"sequenceNumber"` + Timestamp uint32 `json:"timestamp"` + PacketsWritten uint64 `json:"packetsWritten"` +} + +func GetStreamStatuses() []StreamStatus { + streamMapLock.Lock() + defer streamMapLock.Unlock() + + out := []StreamStatus{} + + for streamKey, stream := range streamMap { + whepSessions := []whepSessionStatus{} + stream.whepSessionsLock.Lock() + for id, whepSession := range stream.whepSessions { + currentLayer, ok := whepSession.currentLayer.Load().(string) + if !ok { + continue + } + + whepSessions = append(whepSessions, whepSessionStatus{ + ID: id, + CurrentLayer: currentLayer, + SequenceNumber: whepSession.sequenceNumber, + Timestamp: whepSession.timestamp, + PacketsWritten: whepSession.packetsWritten, + }) + } + stream.whepSessionsLock.Unlock() + + streamStatusVideo := []StreamStatusVideo{} + for _, videoTrack := range stream.videoTracks { + streamStatusVideo = append(streamStatusVideo, StreamStatusVideo{ + RID: videoTrack.rid, + PacketsReceived: videoTrack.packetsReceived.Load(), + }) + } + + out = append(out, StreamStatus{ + StreamKey: streamKey, + AudioPacketsReceived: stream.audioPacketsReceived.Load(), + VideoStreams: streamStatusVideo, + WHEPSessions: whepSessions, + }) + } + + return out +} diff --git a/internal/webrtc/whep.go b/internal/webrtc/whep.go index 0e496a4..d86983a 100644 --- a/internal/webrtc/whep.go +++ b/internal/webrtc/whep.go @@ -19,6 +19,7 @@ type ( currentLayer atomic.Value sequenceNumber uint16 timestamp uint32 + packetsWritten uint64 } simulcastLayerResponse struct { @@ -36,8 +37,8 @@ func WHEPLayers(whepSessionId string) ([]byte, error) { defer streamMap[streamKey].whepSessionsLock.Unlock() if _, ok := streamMap[streamKey].whepSessions[whepSessionId]; ok { - for i := range streamMap[streamKey].videoTrackLabels { - layers = append(layers, simulcastLayerResponse{EncodingId: streamMap[streamKey].videoTrackLabels[i]}) + for i := range streamMap[streamKey].videoTracks { + layers = append(layers, simulcastLayerResponse{EncodingId: streamMap[streamKey].videoTracks[i].rid}) } break @@ -171,6 +172,7 @@ func (w *whepSession) sendVideoPacket(rtpPkt *rtp.Packet, layer string, timeDiff return } + w.packetsWritten += 1 w.sequenceNumber += 1 w.timestamp += timeDiff diff --git a/internal/webrtc/whip.go b/internal/webrtc/whip.go index 36dbcb3..7b350db 100644 --- a/internal/webrtc/whip.go +++ b/internal/webrtc/whip.go @@ -11,7 +11,7 @@ import ( "github.com/pion/webrtc/v4" ) -func audioWriter(remoteTrack *webrtc.TrackRemote, audioTrack *webrtc.TrackLocalStaticRTP) { +func audioWriter(remoteTrack *webrtc.TrackRemote, stream *stream) { rtpBuf := make([]byte, 1500) for { rtpRead, _, err := remoteTrack.Read(rtpBuf) @@ -23,7 +23,8 @@ func audioWriter(remoteTrack *webrtc.TrackRemote, audioTrack *webrtc.TrackLocalS return } - if _, writeErr := audioTrack.Write(rtpBuf[:rtpRead]); writeErr != nil && !errors.Is(writeErr, io.ErrClosedPipe) { + stream.audioPacketsReceived.Add(1) + if _, writeErr := stream.audioTrack.Write(rtpBuf[:rtpRead]); writeErr != nil && !errors.Is(writeErr, io.ErrClosedPipe) { log.Println(writeErr) return } @@ -36,7 +37,8 @@ func videoWriter(remoteTrack *webrtc.TrackRemote, stream *stream, peerConnection id = videoTrackLabelDefault } - if err := addTrack(s, id); err != nil { + videoTrack, err := addTrack(s, id) + if err != nil { log.Println(err) return } @@ -73,6 +75,8 @@ func videoWriter(remoteTrack *webrtc.TrackRemote, stream *stream, peerConnection return } + videoTrack.packetsReceived.Add(1) + rtpPkt.Extension = false rtpPkt.Extensions = nil @@ -105,7 +109,7 @@ func WHIP(offer, streamKey string) (string, error) { peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { if strings.HasPrefix(remoteTrack.Codec().RTPCodecCapability.MimeType, "audio") { - audioWriter(remoteTrack, stream.audioTrack) + audioWriter(remoteTrack, stream) } else { videoWriter(remoteTrack, stream, peerConnection, stream) @@ -140,28 +144,3 @@ func WHIP(offer, streamKey string) (string, error) { <-gatherComplete return peerConnection.LocalDescription().SDP, nil } - -type StreamStatus struct { - StreamKey string `json:"streamKey"` - WHEPSessionsCount int `json:"whepSessionsCount"` -} - -func GetStreamStatuses() []StreamStatus { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - out := []StreamStatus{} - - for streamKey, stream := range streamMap { - stream.whepSessionsLock.Lock() - whepSessionsCount := len(stream.whepSessions) - stream.whepSessionsLock.Unlock() - - out = append(out, StreamStatus{ - StreamKey: streamKey, - WHEPSessionsCount: whepSessionsCount, - }) - } - - return out -}