Rust并发控制之Barrier

文章目录

Rust有很多种控制并发的方式,Barrier(屏障)是其中一种用来同步多线程计算的方式。

今天拿代码来简单看下。

比如我们要多线程计算,期望所有线程都计算完毕再输出最终结果。常规多线程代码示例可以用线程join来等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::{Arc, Mutex};

fn main() {
let numthreads = 10;
let my_mutex = Arc::new(Mutex::new(0));
let mut handlers = Vec::with_capacity(numthreads);

for _ in 0..numthreads {
let my_lock = my_mutex.clone();
handlers.push(std::thread::spawn(move || {
let mut guard = my_lock.lock().unwrap();
*guard += 1;
}));
}
for handler in handlers {
handler.join().unwrap();
}

let answer = { *my_mutex.lock().unwrap() };
assert_eq!(answer, numthreads);
}

而如果用Barrier,我们可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use std::sync::{Arc, Barrier, Mutex};

fn main() {
let numthreads = 10;
let my_mutex = Arc::new(Mutex::new(0));

// We use a barrier to ensure the readout happens after all writing
let barrier = Arc::new(Barrier::new(numthreads + 1));

for i in 0..numthreads {
let my_barrier = barrier.clone();
let my_lock = my_mutex.clone();
std::thread::spawn(move || {
let mut guard = my_lock.lock().unwrap();
*guard += 1;

// Release the lock to prevent a deadlock
drop(guard);
println!("thread {} is ready", i);
// Blocks the current thread until all threads have rendezvoused here.
my_barrier.wait();
println!("thread {} is done", i)
});
}

// A barrier will block `n`-1 threads which call [`wait()`] and then wake
// up all threads at once when the `n`th thread calls [`wait()`].
barrier.wait();

let answer = { *my_mutex.lock().unwrap() };
assert_eq!(answer, numthreads);
}

Barrier可以用wait来控制n个线程的同步,数量需要提前指明。
当调用wait时,如果不是第n个,就会一直阻塞当前线程,直到第n个wait调用,才能进行后续操作。

这种机制就像在多个线程中插入了一道屏障,当所有线程都执行到这里时,才能解除屏障继续向后执行。

当然这样实现相较于第一种,在线程数量大的时候也是会有比较明显的性能开销的,底层是使用condvar+mutex来实现的。这种组合也是一种有意思的并发控制方式,下次我们再聊聊它们。

如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2023/11/12/rust-sync-barrier.html