Rust异步编程之Future并发处理

文章目录

  1. 1. join
  2. 2. try_join
  3. 3. spawn
  4. 4. select
    1. 4.1. 顺序执行
    2. 4.2. precondition
    3. 4.3. 分支修改
    4. 4.4. cancel

上篇文章我们知道,RustFuture是异步执行,await时是阻塞在当前的异步任务task上,直到完成。

当多个异步任务执行时,如果只能都阻塞一个个执行,那就变成同步串行执行了,当然不是我们通常希望的并发处理方式,今天就来聊聊多个异步任务的一些并发处理方式。

join

多个异步任务执行时,如果希望全部执行完成后统一返回,可以让他们都并发去执行,等全部完成后再一起返回。join!宏就可以实现它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async fn async_fn1() -> u32 {
1
}

async fn async_fn2() -> u32 {
2
}

#[tokio::main]
async fn main() {
let (first, second) = tokio::join!(async_fn1(), async_fn2());
assert_eq!(first, 1);
assert_eq!(second, 2);
}

try_join

如果其中有失败的话,也会返回失败的Err。如果想一有失败就立马返回,不等待其他任务完成,可以使用try_join!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async fn async_fn1() -> Result<u32, &'static str> {
Ok(1)
}

async fn async_fn2() -> Result<u32, &'static str> {
Err("async_fn2 failed")
}

#[tokio::main]
async fn main() {
let res = tokio::try_join!(async_fn1(), async_fn2());

match res {
Ok((first, second)) => {
println!("first = {}, second = {}", first, second);
}
Err(err) => {
println!("error: {}", err);
}
}
}

spawn

上边join虽然是让多个异步任务并发执行,但其实际还是在同一个task上异步执行,如果想让每个异步任务都在一个新的task独立执行,可以用spawn

异步任务spawn后会在后台立即开始运行,即便没有对其返回的JoinHandle进行await

这就有点像多线程里的spawn,只不过这里粒度不是线程,是task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use futures::future::join_all;
use tokio::{join, task::JoinHandle};

async fn async_op(id: i32) -> String {
let s = format!("Start task {}", id);
println!("{}", s);
s
}

#[tokio::main]
async fn main() {
let ops = vec![1, 2, 3];
let mut tasks: Vec<JoinHandle<String>> = ops
.into_iter()
.map(|op| tokio::spawn(async_op(op)))
.collect();

// option 1
// let outputs = join!(
// tasks.pop().unwrap(),
// tasks.pop().unwrap(),
// tasks.pop().unwrap()
// );

// println!("{:?}", outputs);
// tuple of results:
// (Ok("Start task 3"), Ok("Start task 2"), Ok("Start task 1"))

// option 2
let outputs = join_all(tasks).await;
println!("{:?}", outputs);
// vector of results:
// [Ok("Start task 1"), Ok("Start task 2"), Ok("Start task 3")]
}

select

如果是多个异步分支(branch)有一个完成就返回,并取消(drop来释放异步资源)其他异步分支的话,可以用select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async fn async_fn1() {}

async fn async_fn2() {}

#[tokio::main]
async fn main() {
tokio::select! {
_ = async_fn1() => {
println!("async_fn1() completed first")
}
_ = async_fn2() => {
println!("async_fn2() completed first")
}
};
}

顺序执行

这里select会对每个分支随机执行,顺序是不保证的。如果期望顺序执行,可以用biased

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#[tokio::main]
async fn main() {
let mut count = 0u8;

loop {
tokio::select! {
// 顺序执行
biased;

_ = async {}, if count < 1 => {
count += 1;
assert_eq!(count, 1);
}
_ = async {}, if count < 2 => {
count += 1;
assert_eq!(count, 2);
}

else => {
break;
}
};
}
}

precondition

上边例子中,分支使用了if precondition,如果当前select循环中运行到该分支,条件满足则执行;不满足的话会标记分支为失效(disabled)本次select中不会执行。

如果在loop中,下一次进入select循环会重新标记disabled状态

另外当前循环如果所以分支都被标记为disabled状态,就必须要有else分支,使select仍可运行。不然就会收到panic: all branches are disabled and there is no else branch.

分支修改

select的分支也可修改, 比如下边通过Pin::set来修改Pin住的异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use tokio::select;

async fn action(input: Option<i32>) -> Option<String> {
match input {
Some(input) => Some(input.to_string()),
None => return None,
}
}

#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);

let mut done = false;
let operation = action(None);
tokio::pin!(operation);

tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});

loop {
select! {
res = &mut operation, if !done => {
println!("Got = {:?}", res);
done = true;

if let Some(_) = res {
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}

这里第一个分支的precondition是必须的,不然就会有可能出现多次执行一个已完成的异步任务,会panic: async fn resumed after completion

cancel

最后在聊聊分支取消。

select有分支完成时,其他分支会被取消。取消依托于Drop。当futuredrop,其也会停止被异步调度。

比如下边代码,当oneshot::Receiver被取消而Drop时,会向Sender发送close通知,以便于清理sender并中断其执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel::<u32>();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async move {
tokio::select! {
_ = tx1.closed() => {
// `val = rx1` is canceled
println!("tx1 closed");
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);

}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2024/01/08/rust-async-multi-future-concurrency.html