Rayon魔法:使Rust并行编程变得轻而易举

文章目录

  1. 1. 同步转并行
  2. 2. 背后的魔法
  3. 3. join
  4. 4. par_bridge

Rayon库是一个数据并行化(data-parallelism)的 Rust库。在并行编程里是一个很有趣的存在, 且非常的容易上手。它可以很轻松的将同步计算流程转化为并行计算。而且基本能保证编译通过就不会有data race

同步转并行

假设有个如下的求和的同步代码

1
2
3
4
5
6
7
8
9
10
11
fn main() {
let sum: i32 = (0..100)
.into_iter()
.map(|i| {
// Simulate some computation
sleep(Duration::from_nanos(1));
i
})
.sum();
assert_eq!(sum, 4950);
}

想要转成并行,只需要into_iter变成into_par_iter

Rayon会将同步的遍历转成并行的遍历,而且保证返回的顺序是一致的,瞬间并行是不是!

1
2
3
4
5
6
7
8
9
10
11
12
use rayon::prelude::*;
fn main() {
let sum: i32 = (0..100)
.into_par_iter() // 这里
.map(|i| {
// Simulate some computation
sleep(Duration::from_nanos(1));
i
})
.sum();
assert_eq!(sum, 4950);
}

divan在 10核的M1 pro上测试结果如下,一行改变让代码速度提升了不少。

Benchmark Fastest Slowest Median Mean Samples Iterations
iter 549.2 µs 1.244 ms 687.4 µs 738.5 µs 100 100
par_iter 195 µs 488.1 µs 315.1 µs 321.9 µs 100 100

背后的魔法

这个并行遍历是怎么处理的呢?

Rayon利用一个可伸缩线程池来执行并行任务,默认情况下,线程池的大小与系统的逻辑核心数量相匹配。

在进行并行任务时,Rayon将当前任务拆分成多个子任务(依据线程池大小),并尽可能地将它们分配给空闲的线程以执行,每个线程有自己的本地任务队列。

如果当前有空闲线程,但已分配的任务仍在等待其线程完成当前任务,空闲线程将尝试执行work stealing,从其他线程任务队列中中窃取一些任务来执行,以确保最大程度地利用CPU资源。

最终,将并行任务的结果进行两两合并,将线程结果全部汇总以完成整个并行计算过程。

这里任务拆分和work stealing就是将并行任务分而治之的精髓。

join

其底层很多使用了join, 将两个任务并行执行,并等待任务结果一起返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
use rayon::prelude::*;

fn main() {
let v1 = vec![1, 2, 3, 4, 5];
let v2 = vec![6, 7, 8, 9, 10];

let (sum1, sum2) = rayon::join(
|| v1.par_iter().sum::<i32>(),
|| v2.par_iter().sum::<i32>()
);

println!("sum1: {}, sum2: {}", sum1, sum2);
}

par_bridge

常规能很容易并行化拆分的par_iter就可以了,但是如果遇到不容易并行化的(有阻塞等待等),如channel或者文件、网络IO的操作, 则可以用par_bridge

性能会有些损耗,因为其执行的方式是每次获取下一个可遍历的内容,分发到线程池内可用线程上执行,同时也不保证结果返回的顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;

fn main() {
let rx = {
let (tx, rx) = channel();

(1..=3).into_iter().for_each(|i| {
let _ = tx.send(i);
});

rx
};

let mut output: Vec<i32> = rx.into_iter().par_bridge().collect();
output.sort_unstable(); // 重新保证顺序

assert_eq!(&*output, &[1, 2, 3]);
}

总之,对于串行化遍历任务,一般都可以用Rayon转化为并行处理,当然也要看有没有转化的必要,常规简单遍历自然是不需要并行化的,毕竟线程和任务并行调度也是有开销的。

想了解更多,推荐看看Rayon: data parallelism in Rust

如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2024/01/13/rust-rayon-parallel.html