go的并发模式是一个很有意思的东西,这里先做个搬运工。
本文是go官方talk里对并发的定义及并发的模式的一些搜集
原文引自:
Advanced Go Concurrency PatternsGo Concurrency Patterns
Concurrency Is Not Parallelism
Concurrency Is Not Parallelism
Concurrency (并发) 是程序能组织执行过程使同时可以处理多件事
Parallelism (并行) 是程序能同时执行多件事
所以Rob Pike说:并发关乎结构,并行关乎执行
Concurrency
Concurrency is the composition of independently executing computations.
Concurrency is a way to structure software, particularly as a way to write clean code that interacts well with the real world.
Concurrency is about dealing with lots of things at once.
Parallelism
Parallelism is the simultaneous execution of (possibly related) computations.
Parallelism is about doing lots of things at once.
VS
Concurrency is about structure, parallelism is about execution.
Concurrency provides a way to structure a solution to solve a problem that may (but not necessarily) be parallelizable.
Go Concurrency Support
- concurrent execution (goroutines) 协程支持
- synchronization and messaging (channels) CSP的通信方式来共享内存
此处需了解go的内存模型 - multi-way concurrent control (select) 控制协程切换
- All channels are evaluated.
每个通道都会被评估 - Selection blocks until one communication can proceed, which then does.
没有可处理的通道时,select会一直阻塞 - If multiple can proceed, select chooses pseudo-randomly.
多个通道可执行是,select会伪随机选一个 - A default clause, if present, executes immediately if no channel is ready.
有default申明时,若没有可处理通道,则default立马执行
- All channels are evaluated.
Go Concurrency Patterns
Service Channel
channel是go里first class
值,想string这些类型一样。
用作函数返回时,通过返回的channel进行交互,可以起到服务一样的效果。
1 | func main() { |
Multiplexing
就是将多个输入处理成一个,常见实现是使用go启动多个协程去合并;或者使用for-select去合并
1 | func main() { |
fan-in: A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed.
fan-out: Multiple functions can read from the same channel until that channel is closed)
pipeline
Sequencing
按顺序执行,用全局通道去发送接受来控制任务依次执行
运行
1 | type Message struct { |
for-select
简化go创建多个协程的声明方式
1 | func fanIn(input1, input2 <-chan string) <-chan string { |
timeout-select
对select可以增加超时通道,超时则返回,避免select一直阻塞
1 | func main() { |
quit channel
使用通道控制执行是否提前结束
- just quit
1
2
3
4
5
6
7
8
9
10
11
12//main
quit := make(chan struct{})
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- struct{}
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
} - use quit deliver msg
quit := make(chan string)
Daisy-chain
关于这个模式stackoverflow有个讨论,可以看看
1 | func f(left, right chan int) { |
Advanced Go Concurrency Patterns
Three techniques
- for-select loop
select防止loop阻塞在某一状态,便于调度多项任务 - service channel, reply channels (chan chan error)
service channel就是常见的pattern,不多说
reply channels实现了close操作中关闭和错误返回无data race
:
close通过chan chan error
向loop请求关闭,并等待其返回关闭前是否有错误
1 | func (s *sub) Close() error { |
- nil channels in select cases
向值为nil的channel发送和接收都会阻塞,在select中使用它可控制是否执行
此外,分享中提到一些点也值得注意:
other improvement
- limit:限制pending(处理任务队列)的数目,不要让其无限增大,避免过多请求及内存消耗
- async io:拆解fetch的请求和结果处理,使用状态channel同步,使fetch中不要阻塞其他处理进行
拆解前
1 | case <-startFetch: |
拆解后
startFetch和fetchDone同一时刻,只有一个不为nil
1 | type fetchResult struct{ fetched []Item; next time.Time; err error } |
这是对以上模式整合实现的一个rss聚合器demo,可以仔细研究下
运行
如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2018/02/09/advanced-go-concurrency-patterns.html