上次提到的Barrier 用到了Rust的condvar和mutex,今天来看下condvar的用法。
condvar即condition variable(条件变量),是一种线程同步的方式,用于线程间的通信。它可以阻塞(wait)线程,期间不消耗CPU,直到某个时间发生唤醒(notify)线程。
代码举例来说:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 use  std::sync::{Arc, Condvar, Mutex};use  std::thread;fn  main     let  pair = Arc::new((Mutex::new(false ), Condvar::new()));     let  pair2 = Arc::clone(&pair);     thread::spawn(move  || {         let  (lock, cvar) = &*pair2;         let  mut  started = lock.lock().unwrap();         *started = true ;                  cvar.notify_one();     });          let  (lock, cvar) = &*pair;     let  mut  started = lock.lock().unwrap();     while  !*started {         started = cvar.wait(started).unwrap();     } } 
代码中,创建一个线程在修改started变量后唤醒等待的线程。main中等待的的线程会一直阻塞(wait)直到started的值被修改。
其中wait会需要一个锁的MutexGuard来配合,wait会自动释放锁,并阻塞当前线程,直到被唤醒时重新获取锁,并返回锁的MutexGuard,来获取锁当前保护的值
Tips: MutexGuard实现了销毁时自动释放锁和可以通过解引用(deref)到它保护的值
 
这里有两个有意思的点:
为什么要和mutex一起使用? 
为什么唤醒时要检查条件是否满足? 
 
这个要从condvar唤醒的机制说起。
唤醒顺序不保证 先来看下唤醒的顺序,我们起两批同样数目的线程,一批线程每个线程会修改一次变量并唤醒一个另一批等待的线程,为了观测唤醒顺序,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 use  std::sync::{Arc, Condvar, Mutex};use  std::thread::{self };struct  SharedData     counter: Mutex<usize >,     condvar: Condvar, } fn  main     let  shared_data = Arc::new(SharedData {         counter: Mutex::new(0 ),         condvar: Condvar::new(),     });     let  thread_num = 5 ;     let  mut  workers = Vec ::new();     let  mut  waits = Vec ::new();     for  i in  0 ..thread_num {         do_wait(i, Arc::clone(&shared_data), &mut  waits);     }     for  i in  0 ..thread_num {         do_work(i, Arc::clone(&shared_data), &mut  workers)     }     waits.into_iter().for_each(|w| w.join().unwrap());     workers.into_iter().for_each(|w| w.join().unwrap()); } fn  do_work i32 , data: Arc<SharedData>, workers: &mut  Vec <thread::JoinHandle<()>>) {    workers.push(thread::spawn(move  || {         let  SharedData { counter, condvar } = &*data;         let  mut  data = counter.lock().unwrap();         *data += 1 ;         println! ("Woker thread {} before notify: Counter {}" , i, data);         condvar.notify_one();     })); } fn  do_wait i32 , data: Arc<SharedData>, waits: &mut  Vec <thread::JoinHandle<()>>) {    waits.push(thread::spawn(move  || {         let  SharedData { counter, condvar } = &*data;         let  mut  data = counter.lock().unwrap();         data = condvar.wait(data).unwrap();         println! ("   Wait thread {} after wake up: Counter {}" , i, data);     })); } 
运行结果不唯一,比如如下结果,五次修改触发了五次唤醒,但是wait唤醒顺序不一定是按照worker修改顺序(而修改顺序是符合预期的,因为是加锁保证的):
1 2 3 4 5 6 7 8 9 10 Woker thread 0 before notify: Counter 1 Woker thread 4 before notify: Counter 2 Woker thread 2 before notify: Counter 3    Wait thread 1 after wake up: Counter 3    Wait thread 3 after wake up: Counter 3 Woker thread 3 before notify: Counter 4    Wait thread 0 after wake up: Counter 4 Woker thread 1 before notify: Counter 5    Wait thread 4 after wake up: Counter 5    Wait thread 2 after wake up: Counter 5 
甚至有可能是唤醒次数少于五次,导致有些线程一直阻塞,比如如下结果,只有四次唤醒,导致有1个线程一直阻塞:
1 2 3 4 5 6 7 8 9 10 Woker thread 1 before notify: Counter 1    Wait thread 2 after wake up: Counter 1 Woker thread 3 before notify: Counter 2 Woker thread 0 before notify: Counter 3    Wait thread 4 after wake up: Counter 3 Woker thread 4 before notify: Counter 4    Wait thread 3 after wake up: Counter 4    Wait thread 1 after wake up: Counter 4 Woker thread 2 before notify: Counter 5 
为什么顺序不保证呢?condvar实现是基于操作系统的条件变量实现,顺序取决于操作系统调度时当前可唤醒的线程是哪个,要保证唤醒顺序需要额外的开销,而这个开销是不必要的,因为唤醒顺序对于线程间的通信是没有意义的,所以底层实现并不保证唤醒顺序。这里 有相关讨论
所以多个线程等待同一条件变量时,notify_one唤醒和等待也不是一定是一对一的调用,每次唤醒也不能保证都是不同的等待线程。
至于为什么会有线程一直阻塞的情况,是因为唤醒次数少于等待次数,导致有些线程一直阻塞。
虚假唤醒 还有就是虚假唤醒,即wait返回时,条件由于并发原因已经不满足,还可能因为唤醒并不是由于显示的notify调用,这个听起来很奇怪,但不是一个bug,是底层操作系统实现导致的,具体看看wiki 上的说明吧。
综上这两点,condvar唤醒时是需要重新检查条件是否依旧满足,而且需要和mutex一起使用,来确保条件值获取的并发安全。
除此condvar还有一些方便的方法,比如提供了
notify_all来广播唤醒所有等待的线程; 
wait_while可以根据条件等待条件直到满足; 
wait_timeout只等待一段时间如果不能及时被唤醒。 
 
官方文档都有例子,就不展开了。
关于condvar比较实际的例子有WaitGroup,不需要像Barrier一样初始化时指定线程数量,而是在运行时动态增加线程数量,在crossbeam-utils 中有实现,代码很精炼,感兴趣可以看下
	
	
	如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 
  https://newbmiao.github.io/2023/11/18/rust-sync-condvar.html