说说fanIn和fanOut

文章目录

  1. 1. fanIn
    1. 1.1. 协程版
    2. 1.2. 递归版
    3. 1.3. 反射版
  2. 2. fanOut
    1. 2.1. 同步版
    2. 2.2. 协程异步版
    3. 2.3. 反射版

今天回顾下常用的两种channel应用模式: fanInfanOut,

分别对应了,对一组相同类型chan的合并和广播。

fanIn

将全部输入chan都聚合到一个out chan中,在全部聚合完成后,关闭out chan.

协程版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func fanIn(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})

go func() {
var wg sync.WaitGroup
wg.Add(len(chans))
for _, ch := range chans {
go func(ch <-chan interface{}) {
for v := range ch {
out <- v
}
wg.Done()
}(ch)
}
// 等待协程全部结束
wg.Wait()
close(out)
}()
return out
}

这里用waitGroup是防止关闭out时还有写入(out <- v),避免panic

递归版

2 分递归并合并。

其中合并mergeTwo主要用了nil chan对读写均阻塞。

chan关闭时,设置为nil,阻塞读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
// 无可聚合chan,返回一个已关闭chan,可读不可写
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
// 一分为二,递归
m := len(chans) / 2
return mergeTwo(
fanInRecur(chans[:m]...),
fanInRecur(chans[m:]...))
}
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { // 只要还有可读的chan
select {
case v, ok := <-a:
if !ok { // a 已关闭,设置为nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已关闭,设置为nil
b = nil
continue
}
c <- v
}
}
}()
return c
}

反射版

利用reflect.SelectCase构造批量可Select的发送chan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// 构造SelectCase slice
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}

// 循环,从cases中选择一个可用的
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
// 此channel已经close, 从切片移除
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}

附上压测数据

性能对比

fanOut

同步版

最直观的方式,直接向每一个chan都同步发送一遍
返回前关闭这组chan, 即不再写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出时关闭所有的输出chan
for i := range out {
close(out[i])
}
}()

for v := range ch { // 从输入chan中读取数据
v := v
for i := range out {
i := i
out[i] <- v // 放入到输出chan中,同步方式
}
}
}()
}

协程异步版

发送这里用起协程的方式,实现异步,发送操作耗时情况下无需阻塞等待

可是有个问题,不知道你看出来没。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出时关闭所有的输出chan
for i := range out {
close(out[i])
}
}()

for v := range ch { // 从输入chan中读取数据
v := v
for i := range out {
i := i
// 协程异步
go func(){}
out[i] <- v
}()
}
}
}()
}

乍一看好像没什么问题, 但退出时关闭时,很可能发送的协程写入还没完成,

毕竟这里out之前写入的要有人读才能继续写。

这里加waitGroup可以等待全部发送完毕在关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
go func() {
var wg sync.WaitGroup
defer func() { // 退出时关闭所有的输出chan
wg.Wait()
for i := range out {
close(out[i])
}
}()

for v := range ch { // 从输入chan中读取数据
v := v
for i := range out {
i := i
wg.Add(1)
go func() { // 异步,避免一个out阻塞的时候影响其他out
out[i] <- v
wg.Done()
}()
}
}
}()
}

反射版

构造一票chan send case, 遍历select,发送完成的将其置为nil阻塞,避免再次发送

不得不说,nil chan出镜率很高啊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出时关闭所有的输出chan
for i := range out {
close(out[i])
}
}()
cases := make([]reflect.SelectCase, len(out))
// 构造SelectCase slice
for i := range cases {
cases[i].Dir = reflect.SelectSend
}
for v := range ch {
v := v
// 先完成send case构造
for i := range cases {
cases[i].Chan = reflect.ValueOf(out[i])
cases[i].Send = reflect.ValueOf(v)
}
// 遍历select
for range cases {
chosen, _, _ := reflect.Select(cases)
// 已发送过,用nil阻塞,避免再次发送
cases[chosen].Chan = reflect.ValueOf(nil)
}
}
}()
}

附上压测数据

性能对比

具体测试代码详见:concurrency


欢迎关注公众号:newbmiao, 获取及时更新

–>

如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2021/08/25/fanIn-and-fanOut.html