funcfanIn(chans ...<-chaninterface{}) <-chaninterface{} { out := make(chaninterface{})
gofunc() { var wg sync.WaitGroup wg.Add(len(chans)) for _, ch := range chans { gofunc(ch <-chaninterface{}) { for v := range ch { out <- v } wg.Done() }(ch) } // 等待协程全部结束 wg.Wait() close(out) }() return out }
funcmergeTwo(a, b <-chaninterface{}) <-chaninterface{} { c := make(chaninterface{}) gofunc() { deferclose(c) for a != nil || b != nil { // 只要还有可读的chan select { case v, ok := <-a: if !ok { // a 已关闭,设置为nil a = nil continue } c <- v case v, ok := <-b: if !ok { // b 已关闭,设置为nil b = nil continue } c <- v } } }() return c }
funcfanOutAsync(ch <-chaninterface{}, out []chaninterface{}) { gofunc() { var wg sync.WaitGroup deferfunc() { // 退出时关闭所有的输出chan wg.Wait() for i := range out { close(out[i]) } }()
for v := range ch { // 从输入chan中读取数据 v := v for i := range out { i := i wg.Add(1) gofunc() { // 异步,避免一个out阻塞的时候影响其他out out[i] <- v wg.Done() }() } } }() }