-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconsumerOption.go
76 lines (67 loc) · 2.46 KB
/
consumerOption.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package bunnify
import (
"encoding/json"
)
type consumerOption struct {
deadLetterQueue string
exchange string
defaultHandler wrappedHandler
handlers map[string]wrappedHandler
prefetchCount int
prefetchSize int
quorumQueue bool
notificationCh chan<- Notification
retries int
}
// WithBindingToExchange specifies the exchange on which the queue
// will bind for the handlers provided.
func WithBindingToExchange(exchange string) func(*consumerOption) {
return func(opt *consumerOption) {
opt.exchange = exchange
}
}
// WithQoS specifies the prefetch count and size for the consumer.
func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption) {
return func(opt *consumerOption) {
opt.prefetchCount = prefetchCount
opt.prefetchSize = prefetchSize
}
}
// WithQuorumQueue specifies that the queue to consume will be created as quorum queue.
// Quorum queues are used when data safety is the priority.
func WithQuorumQueue() func(*consumerOption) {
return func(opt *consumerOption) {
opt.quorumQueue = true
}
}
// WithRetries specifies the retries count before the event is discarded or sent to dead letter.
// Quorum queues are required to use this feature.
// The event will be processed at max as retries + 1.
// If specified amount is 3, the event can be processed up to 4 times.
func WithRetries(retries int) func(*consumerOption) {
return func(opt *consumerOption) {
opt.retries = retries
}
}
// WithDeadLetterQueue indicates which queue will receive the events
// that were NACKed for this consumer.
func WithDeadLetterQueue(queueName string) func(*consumerOption) {
return func(opt *consumerOption) {
opt.deadLetterQueue = queueName
}
}
// WithDefaultHandler specifies a handler that can be use for any type
// of routing key without a defined handler. This is mostly convenient if you
// don't care about the specific payload of the event, which will be received as a byte array.
func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption) {
return func(opt *consumerOption) {
opt.defaultHandler = newWrappedHandler(handler)
}
}
// WithHandler specifies under which routing key the provided handler will be invoked.
// The routing key indicated here will be bound to the queue if the WithBindingToExchange is supplied.
func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption) {
return func(opt *consumerOption) {
opt.handlers[routingKey] = newWrappedHandler(handler)
}
}