上篇文章我们知道,Rust
的Future
是异步执行,await
时是阻塞在当前的异步任务task
上,直到完成。
当多个异步任务执行时,如果只能都阻塞一个个执行,那就变成同步串行执行了,当然不是我们通常希望的并发处理方式,今天就来聊聊多个异步任务的一些并发处理方式。
join
多个异步任务执行时,如果希望全部执行完成后统一返回,可以让他们都并发去执行,等全部完成后再一起返回。join!
宏就可以实现它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| async fn async_fn1() -> u32 { 1 }
async fn async_fn2() -> u32 { 2 }
#[tokio::main] async fn main() { let (first, second) = tokio::join!(async_fn1(), async_fn2()); assert_eq!(first, 1); assert_eq!(second, 2); }
|
try_join
如果其中有失败的话,也会返回失败的Err
。如果想一有失败就立马返回,不等待其他任务完成,可以使用try_join!
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| async fn async_fn1() -> Result<u32, &'static str> { Ok(1) }
async fn async_fn2() -> Result<u32, &'static str> { Err("async_fn2 failed") }
#[tokio::main] async fn main() { let res = tokio::try_join!(async_fn1(), async_fn2());
match res { Ok((first, second)) => { println!("first = {}, second = {}", first, second); } Err(err) => { println!("error: {}", err); } } }
|
spawn
上边join
虽然是让多个异步任务并发执行,但其实际还是在同一个task
上异步执行,如果想让每个异步任务都在一个新的task
上独立执行,可以用spawn
。
异步任务spawn
后会在后台立即开始运行,即便没有对其返回的JoinHandle
进行await
这就有点像多线程里的spawn
,只不过这里粒度不是线程,是task
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| use futures::future::join_all; use tokio::{join, task::JoinHandle};
async fn async_op(id: i32) -> String { let s = format!("Start task {}", id); println!("{}", s); s }
#[tokio::main] async fn main() { let ops = vec![1, 2, 3]; let mut tasks: Vec<JoinHandle<String>> = ops .into_iter() .map(|op| tokio::spawn(async_op(op))) .collect();
let outputs = join_all(tasks).await; println!("{:?}", outputs); }
|
select
如果是多个异步分支(branch
)有一个完成就返回,并取消(drop
来释放异步资源)其他异步分支的话,可以用select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| async fn async_fn1() {}
async fn async_fn2() {}
#[tokio::main] async fn main() { tokio::select! { _ = async_fn1() => { println!("async_fn1() completed first") } _ = async_fn2() => { println!("async_fn2() completed first") } }; }
|
顺序执行
这里select
会对每个分支随机执行,顺序是不保证的。如果期望顺序执行,可以用biased
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #[tokio::main] async fn main() { let mut count = 0u8;
loop { tokio::select! { biased;
_ = async {}, if count < 1 => { count += 1; assert_eq!(count, 1); } _ = async {}, if count < 2 => { count += 1; assert_eq!(count, 2); }
else => { break; } }; } }
|
precondition
上边例子中,分支使用了if precondition
,如果当前select
循环中运行到该分支,条件满足则执行;不满足的话会标记分支为失效(disabled
)本次select
中不会执行。
如果在loop
中,下一次进入select
循环会重新标记disabled
状态
另外当前循环如果所以分支都被标记为disabled
状态,就必须要有else
分支,使select
仍可运行。不然就会收到panic
: all branches are disabled and there is no else branch
.
分支修改
select
的分支也可修改, 比如下边通过Pin::set
来修改Pin
住的异步任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| use tokio::select;
async fn action(input: Option<i32>) -> Option<String> { match input { Some(input) => Some(input.to_string()), None => return None, } }
#[tokio::main] async fn main() { let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut done = false; let operation = action(None); tokio::pin!(operation);
tokio::spawn(async move { let _ = tx.send(1).await; let _ = tx.send(3).await; let _ = tx.send(2).await; });
loop { select! { res = &mut operation, if !done => { println!("Got = {:?}", res); done = true;
if let Some(_) = res { return; } } Some(v) = rx.recv() => { if v % 2 == 0 { operation.set(action(Some(v))); done = false; } } } } }
|
这里第一个分支的precondition
是必须的,不然就会有可能出现多次执行一个已完成的异步任务,会panic
: async fn resumed after completion
。
cancel
最后在聊聊分支取消。
当select
有分支完成时,其他分支会被取消。取消依托于Drop
。当future
被drop
,其也会停止被异步调度。
比如下边代码,当oneshot::Receiver
被取消而Drop
时,会向Sender
发送close
通知,以便于清理sender
并中断其执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| use tokio::sync::oneshot;
#[tokio::main] async fn main() { let (mut tx1, rx1) = oneshot::channel::<u32>(); let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move { tokio::select! { _ = tx1.closed() => { println!("tx1 closed"); } } }); tokio::spawn(async { let _ = tx2.send("two"); }); tokio::select! { val = rx1 => { println!("rx1 completed first with {:?}", val);
} val = rx2 => { println!("rx2 completed first with {:?}", val); } } }
|
如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com
本文链接 : https://newbmiao.github.io/2024/01/08/rust-async-multi-future-concurrency.html