Rust官方sync
包中提供了mpsc
模式的 (多生产者,单消费者:multi-producer, single-consumer) channel,可以实现基于消息并发控制,而不是依赖控制内存共享(加锁)。这正是go语言作者 R. Pike
所推崇的方式:
Don’t communicate by sharing memory; share memory by communicating. (R. Pike)
今天就聊聊mpsc
提供的sync_channel
和channel
。
规则
首先一般channel机制都保证了
- 生产者(producer/sender) 可以发送(send)消息,消费者(consumer/receiver)可以接受(recv)消息,生产和消费的顺序一致(一般都有消息队列保证顺序
FIFO
) - 消费者在没有消息可接收前会阻塞等待,直到有消息或channel关闭
- channel可以限制同时可处理消息上限(buffer size)
- 生产者发送的消息累积到buffer上限时就要阻塞到有消息被消费
从这些规则中,可以看出,channel保证了生产总是先于消费,消息处理总是先进先出(FIFO
)。
sync_channel - spsc
buffer size 最特别的情况就是0,就是单生产者单消费者模式(mpsc
): send后会阻塞,直到有recv处理,才能再send下一个消息。
这就能很好的实现对并发顺序的控制, 比如下边代码,用两组channel实现1和2的交替打印。
不同channel的send和recv交叉等待,保证了打印的顺序,就像这中间持有锁一样
1 | use std::sync::mpsc::sync_channel; |
sync_channel - mpsc
buffer size增加,就是正常mpsc
摸式,可以控制同时能并发的上限(实际内部提前分配了数组来支持buffer)。
达到上限,sender就需要等待有receiver消费才能够继续发送消息。
当然没消息的话,别忘了drop也是可以结束recv一直等待消息的。
如下边代码所示:
1 | use std::sync::mpsc::sync_channel; |
channel
明白了sync_channel
,channel
就简单了,就是buffer size无限模式(实际是内部维护了一个链表自动扩容)。 所有的send都不会阻塞,只有recv在没消息时需要阻塞等待channel中产生新的消息。
1 | use std::sync::mpsc::{channel, sync_channel}; |
如果想及时check是否能recv消息时,可以用try_recv
TryRecvError::Empty
代表目前为空,但channel连接还在TryRecvError::Disconnected
则是连接已关闭,不可能再受到消息了
1 | use std::sync::mpsc::{channel, Receiver, RecvError, TryRecvError}; |
并发安全
1 | unsafe impl<T: Send> Send for Sender<T> {} |
最后来看看rust如何保证channel的并发安全
Sender<T>
同时支持Send
和Sync
,其维护的消息队列可以安全的在线程间传递所有权,也可以了共享引用,即可以被多个线程同时进行send操作。
其中T
需要实现Send
, 以确保消息可以在线程间安全传递所有权,避免竞争条件或使用已释放的内存
而Receiver<T>
只支持Send,只能在线程间传递自身所有权,但不能在线程间共享引用。同时只能有一个线程拥有其所有权,进而独占的去消费Sender<T>
的消息队列。
依旧是巧妙的通过Send
和Sync
标记trait保证了并发的安全,轻松实现无畏并发。
如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2023/12/08/rust-sync-channel.html