Skip to content

Commit

Permalink
Add DISABLE_STATUS
Browse files Browse the repository at this point in the history
Disables the /api/status endpoint

Resolves #47
  • Loading branch information
Sean-Der committed Dec 16, 2023
1 parent 386df70 commit 4e1bea2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 23 deletions.
16 changes: 13 additions & 3 deletions internal/webrtc/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/pion/ice/v2"
"github.com/pion/interceptor"
Expand All @@ -28,9 +29,14 @@ const (

type (
stream struct {
audioTrack *webrtc.TrackLocalStaticRTP
// Does this stream have a publisher?
// If stream was created by a WHEP request hasWHIPClient == false
hasWHIPClient atomic.Bool
videoTrackLabels []string
pliChan chan any
audioTrack *webrtc.TrackLocalStaticRTP

pliChan chan any

whepSessionsLock sync.RWMutex
whepSessions map[string]*whepSession
}
Expand Down Expand Up @@ -63,7 +69,7 @@ func getVideoTrackCodec(in string) videoTrackCodec {
return 0
}

func getStream(streamKey string) (*stream, error) {
func getStream(streamKey string, forWHIP bool) (*stream, error) {
foundStream, ok := streamMap[streamKey]
if !ok {
audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
Expand All @@ -79,6 +85,10 @@ func getStream(streamKey string) (*stream, error) {
streamMap[streamKey] = foundStream
}

if forWHIP {
foundStream.hasWHIPClient.Store(true)
}

return foundStream, nil
}

Expand Down
17 changes: 13 additions & 4 deletions internal/webrtc/whep.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,21 @@ func WHEPChangeLayer(whepSessionId, layer string) error {
return nil
}

func deleteWHEPSession(stream *stream, streamKey string, whepSessionId string) {
stream.whepSessionsLock.Lock()
defer stream.whepSessionsLock.Unlock()
delete(stream.whepSessions, whepSessionId)

// If WHEP is gone and we have no publisher delete the stream
if len(stream.whepSessions) == 0 && !stream.hasWHIPClient.Load() {
deleteStream(streamKey)
}
}

func WHEP(offer, streamKey string) (string, string, error) {
streamMapLock.Lock()
defer streamMapLock.Unlock()
stream, err := getStream(streamKey)
stream, err := getStream(streamKey, false)
if err != nil {
return "", "", err
}
Expand All @@ -93,9 +104,7 @@ func WHEP(offer, streamKey string) (string, string, error) {
log.Println(err)
}

stream.whepSessionsLock.Lock()
defer stream.whepSessionsLock.Unlock()
delete(stream.whepSessions, whepSessionId)
deleteWHEPSession(stream, streamKey, whepSessionId)
}
})

Expand Down
24 changes: 19 additions & 5 deletions internal/webrtc/whip.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func WHIP(offer, streamKey string) (string, error) {

streamMapLock.Lock()
defer streamMapLock.Unlock()
stream, err := getStream(streamKey)
stream, err := getStream(streamKey, true)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -138,13 +138,27 @@ func WHIP(offer, streamKey string) (string, error) {
return peerConnection.LocalDescription().SDP, nil
}

func GetAllStreams() (out []string) {
type StreamStatus struct {
StreamKey string `json:"streamKey"`
WHEPSessionsCount int `json:"whepSessionsCount"`
}

func GetStreamStatuses() []StreamStatus {
streamMapLock.Lock()
defer streamMapLock.Unlock()

for s := range streamMap {
out = append(out, s)
out := []StreamStatus{}

for streamKey, stream := range streamMap {
stream.whepSessionsLock.Lock()
whepSessionsCount := len(stream.whepSessions)
stream.whepSessionsLock.Unlock()

out = append(out, StreamStatus{
StreamKey: streamKey,
WHEPSessionsCount: whepSessionsCount,
})
}

return
return out
}
16 changes: 5 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,8 @@ func whepLayerHandler(res http.ResponseWriter, req *http.Request) {
}
}

type StreamStatus struct {
StreamKey string `json:"streamKey"`
}

func statusHandler(res http.ResponseWriter, req *http.Request) {
statuses := []StreamStatus{}
for _, s := range webrtc.GetAllStreams() {
statuses = append(statuses, StreamStatus{StreamKey: s})
}

if err := json.NewEncoder(res).Encode(statuses); err != nil {
if err := json.NewEncoder(res).Encode(webrtc.GetStreamStatuses()); err != nil {
logHTTPError(res, err.Error(), http.StatusBadRequest)
}
}
Expand Down Expand Up @@ -198,10 +189,13 @@ func main() {
mux.Handle("/", indexHTMLWhenNotFound(http.Dir("./web/build")))
mux.HandleFunc("/api/whip", corsHandler(whipHandler))
mux.HandleFunc("/api/whep", corsHandler(whepHandler))
mux.HandleFunc("/api/status", corsHandler(statusHandler))
mux.HandleFunc("/api/sse/", corsHandler(whepServerSentEventsHandler))
mux.HandleFunc("/api/layer/", corsHandler(whepLayerHandler))

if os.Getenv("DISABLE_STATUS") == "" {
mux.HandleFunc("/api/status", corsHandler(statusHandler))
}

server := &http.Server{
Handler: mux,
Addr: os.Getenv("HTTP_ADDRESS"),
Expand Down

0 comments on commit 4e1bea2

Please sign in to comment.