Skip to content

Commit

Permalink
Implement pending state for queue healthcheck (#339)
Browse files Browse the repository at this point in the history
Co-authored-by: Arnold Iakab <[email protected]>
  • Loading branch information
arnold-iakab and Arnold Iakab authored Jan 16, 2023
1 parent c62a8a5 commit e43b2e9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
9 changes: 7 additions & 2 deletions backend/queue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
)

type config struct {
HealthCheckResultTTL time.Duration `env:"RMQ_HEALTH_CHECK_RESULT_TTL" envDefault:"10s"`
MetricsRefreshInterval time.Duration `env:"RMQ_METRICS_REFRESH_INTERVAL" envDefault:"10s"`
HealthCheckResultTTL time.Duration `env:"RMQ_HEALTH_CHECK_RESULT_TTL" envDefault:"10s"`
// HealthCheckPendingStateInterval represents the time between a queue becoming unhealthy and marking it as unhealthy.
// Used to prevent a queue from becoming immediately unhealthy when a surge of deliveries occurs,
// providing time to the service that uses this implementation to deal with the sudden increase in work without being
// signaled as unhealthy.
HealthCheckPendingStateInterval time.Duration `env:"RMQ_HEALTH_CHECK_PENDING_STATE_INTERVAL" envDefault:"1m"`
MetricsRefreshInterval time.Duration `env:"RMQ_METRICS_REFRESH_INTERVAL" envDefault:"10s"`
}

var cfg config
Expand Down
28 changes: 23 additions & 5 deletions backend/queue/rmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"sync"
"time"

"github.com/adjust/rmq/v3"
"github.com/pace/bricks/backend/redis"
pberrors "github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/pace/bricks/maintenance/log"
"github.com/pace/bricks/pkg/routine"

"github.com/adjust/rmq/v3"
)

var (
Expand All @@ -21,6 +22,11 @@ var (
initMutex sync.Mutex
)

type queueHealth struct {
limit int
markedUnhealthyAt time.Time
}

func initDefault() error {
var err error
initMutex.Lock()
Expand Down Expand Up @@ -69,7 +75,7 @@ func NewQueue(name string, healthyLimit int) (rmq.Queue, error) {
if _, ok := queueHealthLimits.Load(name); ok {
return queue, nil
}
queueHealthLimits.Store(name, healthyLimit)
queueHealthLimits.Store(name, &queueHealth{limit: healthyLimit})
return queue, nil
}

Expand Down Expand Up @@ -101,13 +107,25 @@ func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.Health
}
queueHealthLimits.Range(func(k, v interface{}) bool {
name := k.(string)
healthLimit := v.(int)
hl := v.(*queueHealth)
now := time.Now()
stat := stats.QueueStats[name]
if stat.ReadyCount > int64(healthLimit) {
h.state.SetErrorState(fmt.Errorf("Queue '%s' exceeded safe health limit of '%d'", name, healthLimit))
if stat.ReadyCount > int64(hl.limit) {
if hl.markedUnhealthyAt.IsZero() {
hl.markedUnhealthyAt = now
h.state.SetHealthy()
return true
}
// queue health is still pending
if !now.After(hl.markedUnhealthyAt.Add(cfg.HealthCheckPendingStateInterval)) {
return true
}

h.state.SetErrorState(fmt.Errorf("Queue '%s' exceeded safe health limit of '%d'", name, hl.limit))
return false
}
h.state.SetHealthy()
hl.markedUnhealthyAt = time.Time{}
return true
})
return h.state.GetState()
Expand Down
40 changes: 36 additions & 4 deletions backend/queue/rmq_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package queue_test
package queue

import (
"context"
"testing"
"time"

"github.com/pace/bricks/backend/queue"
"github.com/pace/bricks/maintenance/log"
"github.com/stretchr/testify/assert"
)
Expand All @@ -15,13 +14,14 @@ func TestIntegrationHealthCheck(t *testing.T) {
t.SkipNow()
}
ctx := log.WithContext(context.Background())
q1, err := queue.NewQueue("integrationTestTasks", 1)
cfg.HealthCheckPendingStateInterval = time.Second * 2
q1, err := NewQueue("integrationTestTasks", 1)
assert.NoError(t, err)
err = q1.Publish("nothing here")
assert.NoError(t, err)

time.Sleep(time.Second)
check := &queue.HealthCheck{IgnoreInterval: true}
check := &HealthCheck{IgnoreInterval: true}
res := check.HealthCheck(ctx)
if res.State != "OK" {
t.Errorf("Expected health check to be OK for a non-full queue: state %s, message: %s", res.State, res.Msg)
Expand All @@ -30,8 +30,40 @@ func TestIntegrationHealthCheck(t *testing.T) {
err = q1.Publish("nothing here either")
assert.NoError(t, err)

// queue health started pending
res = check.HealthCheck(ctx)
if res.State != "OK" {
t.Errorf("Expected health check to be OK")
}
// queue health pending
time.Sleep(time.Second)
res = check.HealthCheck(ctx)
if res.State != "OK" {
t.Errorf("Expected health check to be OK")
}
// queue health no longer pending
time.Sleep(time.Second * 2)
res = check.HealthCheck(ctx)
if res.State == "OK" {
t.Errorf("Expected health check to be ERR for a full queue")
}

_, _ = q1.PurgeReady()
// queue health back to OK
res = check.HealthCheck(ctx)
if res.State != "OK" {
t.Errorf("Expected health check to be OK")
}

err = q1.Publish("nothing here")
assert.NoError(t, err)
err = q1.Publish("nothing here either")
assert.NoError(t, err)
// queue health pending again
res = check.HealthCheck(ctx)
if res.State != "OK" {
t.Errorf("Expected health check to be OK")
}

_, _ = q1.PurgeReady()
}

0 comments on commit e43b2e9

Please sign in to comment.