-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathpipeline.go
127 lines (110 loc) · 1.85 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package concurrency
import (
"math"
"sync"
)
func echo(numbs []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numbs {
out <- n
}
close(out)
}()
return out
}
// 平方函数
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 奇数过滤函数
func odd(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n&1 == 1 {
out <- n
}
}
close(out)
}()
return out
}
// 求和函数
func sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
res := 0
for n := range in {
res += n
}
out <- res
close(out)
}()
return out
}
// 如果不想多层嵌套,可以使用一个代理来完成
type EchoFunc func([]int) <-chan int
type PipeFunc func(<-chan int) <-chan int
func proxy(nums []int, echo EchoFunc, pipeFns ...PipeFunc) <-chan int {
ch := echo(nums)
for i := range pipeFns {
ch = pipeFns[i](ch)
}
return ch
}
// Fan In/Out 一对多 OR 多对一
// 通过并发的方式来对数组中的质数求和
// 先把数组分段求和,然后汇总
func makeRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
a[i] = min + i
}
return a
}
func isPrime(value int) bool {
for i := 2; i <= int(math.Floor(float64(value)/2)); i++ {
if value%i == 0 {
return false
}
}
return value > 1
}
func prime(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if isPrime(n) {
out <- n
}
}
close(out)
}()
return out
}
func merge(cs []<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}