diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 3629c2d..0132f12 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -430,6 +430,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI if ok, err := c.waitMedia(ctx); !ok { return false, err } + c.setStatus(CallActive) return true, nil } @@ -464,7 +465,11 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI } ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) defer cancel() - if err := c.joinRoom(ctx, disp.Room); err != nil { + status := CallRinging + if pinPrompt { + status = CallActive + } + if err := c.joinRoom(ctx, disp.Room, status); err != nil { return errors.Wrap(err, "failed joining room") } // Publish our own track. @@ -759,7 +764,7 @@ func (c *inboundCall) setStatus(v CallStatus) { }) } -func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomConfig) error { +func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomConfig, status CallStatus) error { ctx, span := tracer.Start(ctx, "inboundCall.createLiveKitParticipant") defer span.End() partConf := &rconf.Participant @@ -769,7 +774,7 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomCo for k, v := range c.extraAttrs { partConf.Attributes[k] = v } - partConf.Attributes[livekit.AttrSIPCallStatus] = CallActive.Attribute() + partConf.Attributes[livekit.AttrSIPCallStatus] = status.Attribute() c.forwardDTMF.Store(true) select { case <-ctx.Done(): @@ -799,7 +804,7 @@ func (c *inboundCall) publishTrack() error { return nil } -func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) error { +func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig, status CallStatus) error { if c.joinDur != nil { c.joinDur() } @@ -810,7 +815,7 @@ func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) error { "participantName", rconf.Participant.Name, ) c.log.Infow("Joining room") - if err := c.createLiveKitParticipant(ctx, rconf); err != nil { + if err := c.createLiveKitParticipant(ctx, rconf, status); err != nil { c.log.Errorw("Cannot create LiveKit participant", err) c.close(true, callDropped, "participant-failed") return errors.Wrap(err, "cannot create LiveKit participant") diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index ca61c84..6368c1c 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -324,7 +324,7 @@ func (c *outboundCall) connectMedia() { c.media.HandleDTMF(c.handleDTMF) } -func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan struct{}) (*sip.Response, error) { +func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan struct{}, setState func(code sip.StatusCode)) (*sip.Response, error) { cnt := 0 for { select { @@ -337,13 +337,15 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru case <-tx.Done(): return nil, psrpc.NewErrorf(psrpc.Canceled, "transaction failed to complete (%d intermediate responses)", cnt) case res := <-tx.Responses(): - switch res.StatusCode { - default: + status := res.StatusCode + if status/100 != 1 { // != 1xx return res, nil - case 100, 180, 183: - // continue - cnt++ } + if setState != nil { + setState(res.StatusCode) + } + // continue + cnt++ } } } @@ -402,7 +404,13 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { toUri := CreateURIFromUserAndAddress(c.sipConf.to, c.sipConf.address, TransportFrom(c.sipConf.transport)) - sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer) + ringing := false + sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer, func(code sip.StatusCode) { + if !ringing && code >= sip.StatusRinging { + ringing = true + c.setStatus(CallRinging) + } + }) if err != nil { // TODO: should we retry? maybe new offer will work var e *ErrorStatus @@ -434,7 +442,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { c.c.cmu.Unlock() c.mon.InviteAccept() - err = c.cc.AckInvite(ctx) + err = c.cc.AckInviteOK(ctx) if err != nil { c.log.Infow("SIP accept failed", "error", err) return err @@ -577,7 +585,7 @@ func (c *sipOutbound) RemoteHeaders() Headers { return c.inviteOk.Headers() } -func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte) ([]byte, error) { +func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte, setState func(code sip.StatusCode)) ([]byte, error) { ctx, span := tracer.Start(ctx, "sipOutbound.Invite") defer span.End() c.mu.Lock() @@ -602,7 +610,7 @@ func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, hea } authLoop: for { - req, resp, err = c.attemptInvite(ctx, dest, toHeader, sdpOffer, authHeaderRespName, authHeader, sipHeaders) + req, resp, err = c.attemptInvite(ctx, dest, toHeader, sdpOffer, authHeaderRespName, authHeader, sipHeaders, setState) if err != nil { return nil, err } @@ -692,15 +700,15 @@ authLoop: return c.inviteOk.Body(), nil } -func (c *sipOutbound) AckInvite(ctx context.Context) error { - ctx, span := tracer.Start(ctx, "sipOutbound.AckInvite") +func (c *sipOutbound) AckInviteOK(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "sipOutbound.AckInviteOK") defer span.End() c.mu.Lock() defer c.mu.Unlock() return c.c.sipCli.WriteRequest(sip.NewAckRequest(c.invite, c.inviteOk, nil)) } -func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.ToHeader, offer []byte, authHeaderName, authHeader string, headers Headers) (*sip.Request, *sip.Response, error) { +func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.ToHeader, offer []byte, authHeaderName, authHeader string, headers Headers, setState func(code sip.StatusCode)) (*sip.Request, *sip.Response, error) { ctx, span := tracer.Start(ctx, "sipOutbound.attemptInvite") defer span.End() req := sip.NewRequest(sip.INVITE, &to.Address) @@ -726,7 +734,7 @@ func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.To } defer tx.Terminate() - resp, err := sipResponse(ctx, tx, c.c.closing.Watch()) + resp, err := sipResponse(ctx, tx, c.c.closing.Watch(), setState) return req, resp, err } diff --git a/pkg/sip/participant.go b/pkg/sip/participant.go index 1597e28..6477ff6 100644 --- a/pkg/sip/participant.go +++ b/pkg/sip/participant.go @@ -55,6 +55,8 @@ func (v CallStatus) Attribute() string { return "" // no attribute for these statuses case CallDialing: return "dialing" + case CallRinging: + return "ringing" case CallAutomation: return "automation" case CallActive: @@ -82,6 +84,7 @@ const ( callDropped = CallStatus(iota) callFlood CallDialing + CallRinging CallAutomation CallActive CallHangup diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index 20aabca..68e12ad 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -100,7 +100,7 @@ func sendAndACK(ctx context.Context, c Signaling, req *sip.Request) { return } defer tx.Terminate() - r, err := sipResponse(ctx, tx, nil) + r, err := sipResponse(ctx, tx, nil, nil) if err != nil { return } @@ -180,7 +180,7 @@ func sendRefer(ctx context.Context, c Signaling, req *sip.Request, stop <-chan s defer tx.Terminate() ctx = context.WithoutCancel(ctx) - resp, err := sipResponse(ctx, tx, stop) + resp, err := sipResponse(ctx, tx, stop, nil) if err != nil { return nil, err }