Skip to content

Commit

Permalink
[Extend Governor APIs] Correlation ID (#93)
Browse files Browse the repository at this point in the history
* add correlation ID

* add correlation ID and trace

* fix test

* leave trace config to user with http Transport

* undo request options

* Add correlation ID context injection and extraction functions
  • Loading branch information
bailinhe authored Dec 12, 2023
1 parent 91c3a81 commit 9a2ee37
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 3 deletions.
22 changes: 19 additions & 3 deletions internal/eventbus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap"

events "github.com/metal-toolbox/governor-api/pkg/events/v1alpha1"
"github.com/nats-io/nats.go"
)

const (
Expand All @@ -21,6 +22,7 @@ const (

type conn interface {
Publish(subject string, data []byte) error
PublishMsg(m *nats.Msg) error
Drain() error
}

Expand Down Expand Up @@ -86,7 +88,7 @@ func (c *Client) Publish(ctx context.Context, sub string, event *events.Event) e

c.logger.Info("publishing event to the event bus", zap.String("subject", subject), zap.Any("action", event.Action))

ctx, span := c.tracer.Start(ctx, "events.nats.PublishEvent", trace.WithAttributes(
_, span := c.tracer.Start(ctx, "events.nats.PublishEvent", trace.WithAttributes(
attribute.String("events.action", event.Action),
attribute.String("event.subject", subject),
attribute.String("event.actor_id", event.ActorID),
Expand All @@ -101,13 +103,27 @@ func (c *Client) Publish(ctx context.Context, sub string, event *events.Event) e

event.TraceContext = mapCarrier

j, err := json.Marshal(event)
payload, err := json.Marshal(event)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return c.conn.Publish(subject, j)
headers := nats.Header{}

if cid := events.ExtractCorrelationID(ctx); cid != "" {
c.logger.Debug("publishing event with correlation ID", zap.String("correlationID", cid))
span.SetAttributes(attribute.String("event.correlation_id", cid))
headers.Add(events.GovernorEventCorrelationIDHeader, cid)
}

msg := &nats.Msg{
Subject: subject,
Data: payload,
Header: headers,
}

return c.conn.PublishMsg(msg)
}
16 changes: 16 additions & 0 deletions internal/eventbus/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"testing"

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
Expand Down Expand Up @@ -44,6 +45,21 @@ func (m *mockConn) Drain() error {
return nil
}

// PublishMsg is a mock publish message function
func (m *mockConn) PublishMsg(msg *nats.Msg) error {
if m.err != nil {
return m.err
}

m.t.Logf("got data payload %s", string(msg.Data))

if !bytes.Equal(m.data, msg.Data) {
return errors.New("unexpected data payload") //nolint:goerr113
}

return nil
}

func Test_NewClient(t *testing.T) {
client := NewClient()

Expand Down
25 changes: 25 additions & 0 deletions pkg/api/v1alpha1/concurrency_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package v1alpha1

import (
"github.com/gin-gonic/gin"
"github.com/google/uuid"
events "github.com/metal-toolbox/governor-api/pkg/events/v1alpha1"
"go.uber.org/zap"
)

func (r *Router) mwContextInjectCorrelationID(c *gin.Context) {
var correlationID string

if cid := c.Request.Header.Get(events.GovernorEventCorrelationIDHeader); cid != "" {
correlationID = cid
} else {
correlationID = uuid.New().String()
}

r.Logger.Debug("mwCorrelationID", zap.String("correlationID", correlationID))

c.Request = c.Request.WithContext(events.InjectCorrelationID(
c.Request.Context(),
correlationID,
))
}
2 changes: 2 additions & 0 deletions pkg/api/v1alpha1/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Router struct {

// Routes sets up protected routes and sets the scopes for said routes
func (r *Router) Routes(rg *gin.RouterGroup) {
rg.Use(r.mwContextInjectCorrelationID)

rg.GET(
"/user",
r.AuditMW.AuditWithType("GetUser"),
Expand Down
11 changes: 11 additions & 0 deletions pkg/api/v1alpha1/testing.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package v1alpha1

import (
"github.com/nats-io/nats.go"
)

type mockNATSConn struct {
Subject string
Payload []byte
Expand All @@ -12,3 +16,10 @@ func (m *mockNATSConn) Publish(s string, p []byte) error {

return nil
}

func (m *mockNATSConn) PublishMsg(msg *nats.Msg) error {
m.Subject = msg.Subject
m.Payload = msg.Data

return nil
}
22 changes: 22 additions & 0 deletions pkg/events/v1alpha1/contexts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package v1alpha1

import "context"

type contextKey struct{}

// GovernorEventCorrelationIDContextKey is the context key for the correlation ID
var governorEventCorrelationIDContextKey = &contextKey{}

// InjectCorrelationID injects the correlation ID into the context
func InjectCorrelationID(ctx context.Context, correlationID string) context.Context {
return context.WithValue(ctx, governorEventCorrelationIDContextKey, correlationID)
}

// ExtractCorrelationID extracts the correlation ID from the context
func ExtractCorrelationID(ctx context.Context) string {
if cid, ok := ctx.Value(governorEventCorrelationIDContextKey).(string); ok {
return cid
}

return ""
}
6 changes: 6 additions & 0 deletions pkg/events/v1alpha1/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (
GovernorExtensionsEventSubject = "extensions"
// GovernorExtensionResourceDefinitionsEventSubject is the subject name for extensions resource definition events (minus the subject prefix)
GovernorExtensionResourceDefinitionsEventSubject = "extension.erds"

// GovernorEventCorrelationIDHeader is the header name for the correlation ID
GovernorEventCorrelationIDHeader = "Correlation-ID"
)

// Event is an event notification from Governor.
Expand All @@ -64,4 +67,7 @@ type Event struct {

// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`

// Headers is a map of headers to be passed along with the event.
Headers map[string][]string `json:"-"`
}

0 comments on commit 9a2ee37

Please sign in to comment.