Rust异步编程之Future初探

文章目录

  1. 1. 状态机
  2. 2. 调度
  3. 3. 运行时
  4. 4. async
  5. 5. pin

RustFuture是用来实现异步编程的。今天我们围绕其了解下Rust的异步编程是如何构建。

Rustasync就能轻松创建开销很小的可异步执行的函数,在await时其才会被调度执行。

其比较轻量级,有别于异步多线程,依托在操作系统线程之上,构建大量并发则需要大量的线程资源,对资源的消耗比较大。

比如下边用async构建异步任务:

1
2
3
4
5
6
7
8
async fn async_fn() {
// handle async logic
}

#[tokio::main]
async fn main() {
async_fn().await
}

状态机

async其实也是帮你自动实现了下边的Future trait,用结构体维护了一个状态机

1
2
3
4
5
6
7
trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}

Future定义一个poll方法,可以查询异步任务状态。对于异步任务,有PendingReady两种状态,Pending时会让出控制,等待可以处理时再被唤醒继续处理,如此重复,直到Ready

我们来尝试通过实现一个DelayFuture了解这个状态流转的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
if Instant::now() >= self.when {
Poll::Ready("done")
} else {
// 还未ready,注册下一次唤醒
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(3);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
}

Delay每次poll时会检查,时间是否满足,满足则Ready,否则schedule下一次执行并返回Pending

状态机是有了,Future怎么调度呢?

调度

Rust需要运行时runtime来调度异步任务taskruntime负责调度,检查future的状态。

调度一般在Pending时会交出task的控制,并schedule下一次什么时候唤醒(wake)。

流程处理展开来说,常规Ready处理:

graph TD
    executor --> task
    task -- poll --> future
    future -- ready --> task

Pending时, future要被schedule下一次唤醒,而每次唤醒可能不会都是在同一个task上执行。
这里用于唤醒的waker会在每次poll时以context传递下去,

graph TD
    executor --> task
    executor --> waker
    executor -.-> task
    task -- poll with context (waker) --> future
    future -- update --> waker
    future -- pending --> task
    waker -- wake --> executor

运行时

了解了调度,我们再展开说下运行时。rust的运行时没在标准库中实现,需要依赖第三方的运行时,常用的有tokio

就比如如下的tokio宏实际是添加了一个多线程(multi thread)的运行时,会阻塞当前线程直到异步任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#[tokio::main]
async fn main() {
println!("hello");
}

// tranform to
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}

当然也可以用单线程的运行时(current thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("Hello world");
}
// tranform to
fn main() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}

async

其实一般很少直接去实现Future trait, 直接使用async去自动实现Future trait就足够了。上边Delay完全可以这么实现,简洁且高效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::Notify;

async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();

thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

notify_clone.notify_one();
});

notify.notified().await;
}

#[tokio::main]
async fn main() {
delay(Duration::from_secs(1)).await;
}

pin

还记得future trait上参数有个Pin<&mut Self>, 为什么要Pin future的引用?

来看下边一段代码:

1
2
3
4
5
6
7
8
9
10
11
async fn my_async_fn() {
// async logic here
}

#[tokio::main]
async fn main() {
let mut future = my_async_fn();
(&mut future).await;
// error:
// within `impl Future<Output = ()>`, the trait `Unpin` is not implemented for `[async fn body@src/main.rs:1:24: 3:2]`
}

当尝试执行一个异步函数的引用时,编译器会报错要求其是Unpin trait

为什么呢?

future本质是一个封装的状态机结构体,调度时会被移动,如果其包含引用,引用的地址要能保证生命周期至少在其完成前还存活,不然就会出现引用一个已失效的地址。

所以Rust引入了Unpin trait。 这个Unpin是代表其不需要固定地址,可以安全引用。

常规的类型一般都是实现了的。对于未实现的!Unpin类型,一般可以将其Box::pin到堆上或用宏pin!到栈上来确保其地址在future移动期间是有效的。

代码如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use tokio::pin;

async fn my_async_fn() {
// async logic here
}

#[tokio::main]
async fn main() {
let future = my_async_fn();
// option 1
pin!(future);
(&mut future).await;

// option 2
// let pinned_fut = Box::pin(future);
// pinned_fut.await;
}

好了,今天就聊到这里,下一篇我们再聊聊多个异步同时怎么处理。

Pin感兴趣可以看看官方更详细的文档:Pinning

异步编程更深入了解的话也推荐看下tokio的这篇:Async in depth

如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2024/01/06/rust-async-future.html