-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwait.go
137 lines (117 loc) · 3.27 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
Waiting for Jobs to Complete
The amboy package proves a number of generic methods that, using the
Queue.Stats() method, block until all jobs are complete. They provide
different semantics, which may be useful in different
circumstances. All of these functions wait until the total number of
jobs submitted to the queue is equal to the number of completed jobs,
and as a result these methods don't prevent other threads from adding
jobs to the queue after beginning to wait.
Additionally, there are a set of methods that allow callers to wait for
a specific job to complete.
*/
package amboy
import (
"context"
"time"
)
// Wait takes a queue and blocks until all tasks are completed or the
// context is canceled. This
// operation runs in a tight-loop, which means that the Wait will
// return *as soon* as possible all tasks or complete. Conversely,
// it's also possible that frequent repeated calls to Stats() may
// contend with resources needed for dispatching jobs or marking them
// complete.
func Wait(ctx context.Context, q Queue) bool {
for {
if ctx.Err() != nil {
return false
}
stat := q.Stats(ctx)
if stat.IsComplete() {
return true
}
}
}
// WaitInterval provides the Wait operation and accepts a context
// for cancellation while also waiting for an interval between stats
// calls. The return value reports if the operation was canceled or if
// all tasks are complete.
func WaitInterval(ctx context.Context, q Queue, interval time.Duration) bool {
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return false
case <-timer.C:
if q.Stats(ctx).IsComplete() {
return true
}
timer.Reset(interval)
}
}
}
// WaitIntervalNum waits for a certain number of jobs to complete,
// with the same semantics as WaitCtxInterval.
func WaitIntervalNum(ctx context.Context, q Queue, interval time.Duration, num int) bool {
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return false
case <-timer.C:
if q.Stats(ctx).Completed >= num {
return true
}
}
}
}
// WaitJob blocks until the job, based on its ID, is marked complete
// in the queue, or the context is canceled. The return value is false
// if the job does not exist (or is removed) and true when the job
// completes.
func WaitJob(ctx context.Context, j Job, q Queue) bool {
var err error
for {
if ctx.Err() != nil {
return false
}
j, err = q.Get(ctx, j.ID())
if err != nil {
return false
}
if ctx.Err() != nil {
return false
}
if j.Status().Completed {
return true
}
}
}
// WaitJobInterval takes a job and queue object and waits for the job
// to be marked complete. The interval parameter controls how long the
// operation waits between checks, and can be used to limit the impact
// of waiting on a busy queue. The operation returns false if the job
// is not registered in the queue, and true when the job completes.
func WaitJobInterval(ctx context.Context, j Job, q Queue, interval time.Duration) bool {
var err error
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return false
case <-timer.C:
j, err = q.Get(ctx, j.ID())
if err != nil {
return false
}
if j.Status().Completed {
return true
}
timer.Reset(interval)
}
}
}