Skip to content

Commit

Permalink
Improve /api/status
Browse files Browse the repository at this point in the history
Include stats around packets sent and received
  • Loading branch information
Sean-Der committed Feb 15, 2024
1 parent 0672999 commit 92dc2d8
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 40 deletions.
91 changes: 82 additions & 9 deletions internal/webrtc/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,24 @@ type (
stream struct {
// Does this stream have a publisher?
// If stream was created by a WHEP request hasWHIPClient == false
hasWHIPClient atomic.Bool
videoTrackLabels []string
audioTrack *webrtc.TrackLocalStaticRTP
hasWHIPClient atomic.Bool

videoTracks []*videoTrack

audioTrack *webrtc.TrackLocalStaticRTP
audioPacketsReceived atomic.Uint64

pliChan chan any

whepSessionsLock sync.RWMutex
whepSessions map[string]*whepSession
}

videoTrack struct {
rid string
packetsReceived atomic.Uint64
}

videoTrackCodec int
)

Expand Down Expand Up @@ -99,18 +107,19 @@ func deleteStream(streamKey string) {
delete(streamMap, streamKey)
}

func addTrack(stream *stream, rid string) error {
func addTrack(stream *stream, rid string) (*videoTrack, error) {
streamMapLock.Lock()
defer streamMapLock.Unlock()

for i := range stream.videoTrackLabels {
if rid == stream.videoTrackLabels[i] {
return nil
for i := range stream.videoTracks {
if rid == stream.videoTracks[i].rid {
return stream.videoTracks[i], nil
}
}

stream.videoTrackLabels = append(stream.videoTrackLabels, rid)
return nil
t := &videoTrack{rid: rid}
stream.videoTracks = append(stream.videoTracks, t)
return t, nil
}

func getPublicIP() string {
Expand Down Expand Up @@ -310,3 +319,67 @@ func Configure() {
webrtc.WithSettingEngine(createSettingEngine(false, udpMuxCache)),
)
}

type StreamStatusVideo struct {
RID string `json:"rid"`
PacketsReceived uint64 `json:"packetsReceived"`
}

type StreamStatus struct {
StreamKey string `json:"streamKey"`
AudioPacketsReceived uint64 `json:"audioPacketsReceived"`
VideoStreams []StreamStatusVideo `json:"videoStreams"`
WHEPSessions []whepSessionStatus `json:"whepSessions"`
}

type whepSessionStatus struct {
ID string `json:"id"`
CurrentLayer string `json:"currentLayer"`
SequenceNumber uint16 `json:"sequenceNumber"`
Timestamp uint32 `json:"timestamp"`
PacketsWritten uint64 `json:"packetsWritten"`
}

func GetStreamStatuses() []StreamStatus {
streamMapLock.Lock()
defer streamMapLock.Unlock()

out := []StreamStatus{}

for streamKey, stream := range streamMap {
whepSessions := []whepSessionStatus{}
stream.whepSessionsLock.Lock()
for id, whepSession := range stream.whepSessions {
currentLayer, ok := whepSession.currentLayer.Load().(string)
if !ok {
continue
}

whepSessions = append(whepSessions, whepSessionStatus{
ID: id,
CurrentLayer: currentLayer,
SequenceNumber: whepSession.sequenceNumber,
Timestamp: whepSession.timestamp,
PacketsWritten: whepSession.packetsWritten,
})
}
stream.whepSessionsLock.Unlock()

streamStatusVideo := []StreamStatusVideo{}
for _, videoTrack := range stream.videoTracks {
streamStatusVideo = append(streamStatusVideo, StreamStatusVideo{
RID: videoTrack.rid,
PacketsReceived: videoTrack.packetsReceived.Load(),
})
}

out = append(out, StreamStatus{
StreamKey: streamKey,
AudioPacketsReceived: stream.audioPacketsReceived.Load(),
VideoStreams: streamStatusVideo,
WHEPSessions: whepSessions,
})
}

return out
}
6 changes: 4 additions & 2 deletions internal/webrtc/whep.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type (
currentLayer atomic.Value
sequenceNumber uint16
timestamp uint32
packetsWritten uint64
}

simulcastLayerResponse struct {
Expand All @@ -36,8 +37,8 @@ func WHEPLayers(whepSessionId string) ([]byte, error) {
defer streamMap[streamKey].whepSessionsLock.Unlock()

if _, ok := streamMap[streamKey].whepSessions[whepSessionId]; ok {
for i := range streamMap[streamKey].videoTrackLabels {
layers = append(layers, simulcastLayerResponse{EncodingId: streamMap[streamKey].videoTrackLabels[i]})
for i := range streamMap[streamKey].videoTracks {
layers = append(layers, simulcastLayerResponse{EncodingId: streamMap[streamKey].videoTracks[i].rid})
}

break
Expand Down Expand Up @@ -171,6 +172,7 @@ func (w *whepSession) sendVideoPacket(rtpPkt *rtp.Packet, layer string, timeDiff
return
}

w.packetsWritten += 1
w.sequenceNumber += 1
w.timestamp += timeDiff

Expand Down
37 changes: 8 additions & 29 deletions internal/webrtc/whip.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pion/webrtc/v4"
)

func audioWriter(remoteTrack *webrtc.TrackRemote, audioTrack *webrtc.TrackLocalStaticRTP) {
func audioWriter(remoteTrack *webrtc.TrackRemote, stream *stream) {
rtpBuf := make([]byte, 1500)
for {
rtpRead, _, err := remoteTrack.Read(rtpBuf)
Expand All @@ -23,7 +23,8 @@ func audioWriter(remoteTrack *webrtc.TrackRemote, audioTrack *webrtc.TrackLocalS
return
}

if _, writeErr := audioTrack.Write(rtpBuf[:rtpRead]); writeErr != nil && !errors.Is(writeErr, io.ErrClosedPipe) {
stream.audioPacketsReceived.Add(1)
if _, writeErr := stream.audioTrack.Write(rtpBuf[:rtpRead]); writeErr != nil && !errors.Is(writeErr, io.ErrClosedPipe) {
log.Println(writeErr)
return
}
Expand All @@ -36,7 +37,8 @@ func videoWriter(remoteTrack *webrtc.TrackRemote, stream *stream, peerConnection
id = videoTrackLabelDefault
}

if err := addTrack(s, id); err != nil {
videoTrack, err := addTrack(s, id)
if err != nil {
log.Println(err)
return
}
Expand Down Expand Up @@ -73,6 +75,8 @@ func videoWriter(remoteTrack *webrtc.TrackRemote, stream *stream, peerConnection
return
}

videoTrack.packetsReceived.Add(1)

rtpPkt.Extension = false
rtpPkt.Extensions = nil

Expand Down Expand Up @@ -105,7 +109,7 @@ func WHIP(offer, streamKey string) (string, error) {

peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
if strings.HasPrefix(remoteTrack.Codec().RTPCodecCapability.MimeType, "audio") {
audioWriter(remoteTrack, stream.audioTrack)
audioWriter(remoteTrack, stream)
} else {
videoWriter(remoteTrack, stream, peerConnection, stream)

Expand Down Expand Up @@ -140,28 +144,3 @@ func WHIP(offer, streamKey string) (string, error) {
<-gatherComplete
return peerConnection.LocalDescription().SDP, nil
}

type StreamStatus struct {
StreamKey string `json:"streamKey"`
WHEPSessionsCount int `json:"whepSessionsCount"`
}

func GetStreamStatuses() []StreamStatus {
streamMapLock.Lock()
defer streamMapLock.Unlock()

out := []StreamStatus{}

for streamKey, stream := range streamMap {
stream.whepSessionsLock.Lock()
whepSessionsCount := len(stream.whepSessions)
stream.whepSessionsLock.Unlock()

out = append(out, StreamStatus{
StreamKey: streamKey,
WHEPSessionsCount: whepSessionsCount,
})
}

return out
}

0 comments on commit 92dc2d8

Please sign in to comment.