-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathliveness.go
130 lines (112 loc) · 3.28 KB
/
liveness.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package ctrl
import (
"context"
"sync"
"time"
"github.com/metal-toolbox/rivets/v2/events"
"github.com/metal-toolbox/rivets/v2/events/pkg/kv"
"github.com/metal-toolbox/rivets/v2/events/registry"
"github.com/sirupsen/logrus"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)
var (
once sync.Once
checkinLivenessTTL = 3 * time.Minute
)
type LivenessCheckin interface {
StartLivenessCheckin(ctx context.Context)
ControllerID() registry.ControllerID
}
// NatsLiveness provides methods to register and periodically check into the controller registry.
//
// It implements the LivenessCheckin interface
type NatsLiveness struct {
logger *logrus.Logger
stream events.Stream
natsConfig events.NatsOptions
controllerID registry.ControllerID
interval time.Duration
hostname string
}
// NewNatsLiveness returns a NATS implementation of the LivenessCheckin interface
func NewNatsLiveness(
cfg events.NatsOptions, // nolint:gocritic // heavy param is heavy
stream events.Stream,
l *logrus.Logger,
hostname string,
interval time.Duration,
) LivenessCheckin {
return &NatsLiveness{
logger: l,
stream: stream,
natsConfig: cfg,
hostname: hostname,
interval: interval,
}
}
func (n *NatsLiveness) checkinKVOpts() []kv.Option {
opts := []kv.Option{
kv.WithTTL(checkinLivenessTTL),
kv.WithReplicas(n.natsConfig.KVReplicationFactor),
}
return opts
}
// Returns the controller ID for this instance
func (n *NatsLiveness) ControllerID() registry.ControllerID {
return n.controllerID
}
// This starts a go-routine to peridically check in with the NATS kv
func (n *NatsLiveness) StartLivenessCheckin(ctx context.Context) {
once.Do(func() {
n.controllerID = registry.GetID(n.hostname)
if err := registry.InitializeRegistryWithOptions(n.stream.(*events.NatsJetstream), n.checkinKVOpts()...); err != nil {
metricsNATSError("initialize liveness registry")
n.logger.WithError(err).Error("unable to initialize active controller registry")
return
}
go n.checkinRoutine(ctx)
})
}
func (n *NatsLiveness) checkinRoutine(ctx context.Context) {
if err := registry.RegisterController(n.controllerID); err != nil {
n.logger.WithError(err).Warn("unable to do initial controller liveness registration")
}
tick := time.NewTicker(n.interval)
defer tick.Stop()
var stop bool
for !stop {
select {
case <-tick.C:
err := registry.ControllerCheckin(n.controllerID)
if err != nil {
n.logger.WithError(err).
WithField("id", n.controllerID.String()).
Warn("controller check-in failed")
metricsNATSError("liveness check-in")
if err = refreshControllerToken(n.controllerID); err != nil {
n.logger.WithError(err).
WithField("id", n.controllerID.String()).
Fatal("unable to refresh controller liveness token")
}
}
case <-ctx.Done():
n.logger.Info("liveness check-in stopping on done context")
stop = true
}
}
}
// try to de-register/re-register this id.
func refreshControllerToken(id registry.ControllerID) error {
err := registry.DeregisterController(id)
if err != nil && !errors.Is(err, nats.ErrKeyNotFound) {
metricsNATSError("liveness refresh: de-register")
return err
}
err = registry.RegisterController(id)
if err != nil {
metricsNATSError("liveness refresh: register")
return err
}
return nil
}