Rust之tower如何构建请求中间件

文章目录

  1. 1. 初始请求
  2. 2. Service trait
  3. 3. 实现middleware
  4. 4. Layer trait
  5. 5. hyper Service trait

提到Rust请求中间件, 就不能不提tower

tower是一个请求协议无关的的中间件定义类库,主要定义了ServiceLayer两个trait来帮助实现可重用的请求处理中间件。

今天拿聊聊它如何巧妙构建起中间件。

初始请求

假设我们有一个请求handler, 用hyper官方的hello world例子代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use http_body_util::Full;
use hyper::{
body::{Bytes, Incoming},
server::conn::http1,
Request, Response,
};
use hyper_util::rt::TokioIo;
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;

async fn handler(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
tokio::spawn(async move {
// 请求在这里转成了Service
let svc = hyper::service::service_fn(handler);
if let Err(err) = http1::Builder::new().serve_connection(io, svc).await {
eprintln!("server error: {}", err);
}
});
}
}

这里service_fnhandler转成了Service,也就是server启动时要的是一个实现了Service trait的请求处理函数,这是后边构建中间件的基础。

注意,在hyper 发布v1之后,这里的Service准确说不是tower的Service trait,但理念是一样,我们后边在讲他们接口的不同

这时如果想在处理上边加上LoggerTimeout两个流程来分别记录请求日志和超时约束, 能很灵活的按如下方式组织

tower-service

1
2
3
4
5
6
7
8
let svc = hyper::service::service_fn(handler);
// 增加两个layer中间件
let svc = ServiceBuilder::new()
.layer_fn(Logger::new)
.layer_fn(|s| Timeout::new(s, std::time::Duration::from_secs(5)))
.service(svc);
// 先忽略下边为了接口转换,后边在展开这里
// let svc = TowerToHyperService::new(svc);

Service trait

这样处理,就像是一个service链,一个service处理完,再调用下一个service

所以tower定义了如下Service trait:

1
2
3
4
5
6
7
8
9
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
// 请求是否可以处理
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
// 处理逻辑
fn call(&mut self, req: Request) -> Self::Future;
}

这里poll_ready是决定是否可以执行请求处理call前的判断。

call拿到请求,返回一个异步处理的结果,这样当请求执行耗时时不阻塞其他请求的处理。

说个题外话,你可能会好奇为什么这里要返回一个Future而不是用async

这是因为之前Rust不支持trait中定义异步函数。不过Rust 1.75开始支持了,如果后边换成下边的实现就不奇怪了

1
2
3
4
5
trait Service<Request> {
type Response;
type Error;
async fn call(&mut self, req: Request) -> Result<Self::Response, Self::Error>;
}

实现middleware

Logger middleware实现来看如何构建起service

注释及代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use tower::Service;

#[derive(Debug, Clone)]
pub struct Logger<S> {
inner: S,
}
impl<S> Logger<S> {
pub fn new(inner: S) -> Self {
Logger { inner }
}
}
type Req = hyper::Request<Incoming>;
impl<S> Service<Req> for Logger<S>
where
S: Service<Req> + Clone,
{
// Logger拿到的也是一个Service,返回类型也没有变化,直接指定即可
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
// 直接可以处理,无需额外满足条件
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Req) -> Self::Future {
println!("processing request: {} {}", req.method(), req.uri().path());
// 处理完调用下一个Service
self.inner.call(req)
}
}

这样添加Logger可以这么添加

1
let svc = Logger::new(svc);

再加个Timeout

1
let svc = Timeout::new(svc, std::time::Duration::from_secs(5));

不够优雅,而且执行顺序也是反的,最后添加的中间件先执行,要是链式操作就好了,这时就到Layer trait显身手了

Layer trait

1
2
3
4
pub trait Layer<S> {
type Service;
fn layer(&self, inner: S) -> Self::Service;
}

只要实现以上Layer trait

1
2
3
4
5
6
impl<S> Layer<S> for Logger<S> {
type Service = Logger<S>;
fn layer(&self, inner: S) -> Self::Service {
Logger { inner }
}
}

然后就能用ServiceBuilder构建service

1
2
3
4
tower::ServiceBuilder::new()
.layer(LoggerLayer)
.layer_fn(|s| Timeout::new(s, std::time::Duration::from_secs(5)))
.service(svc);

当然layer_fn也可以直接将函数转为实现Layer trait

最重要的是顺序是按调用顺序。

hyper Service trait

hyper之前依赖了tower Service,但v1稳定版发布前替换成了自己的Service

一方面是tower还没有稳定版本

另一方面为了简化请求处理:

  • 移除了poll_ready

  • call也不再需要&mut self,即不再考虑通过其修改请求,如果需要的话可以加Arc<Mutex<_>>state

1
2
3
4
5
6
7
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;

fn call(&self, req: Request) -> Self::Future;
}

这也就是为什么如果你想直接复用tower Serivice(如Timeout等)需要TowerToHyperService转一下:

1
let svc = TowerToHyperService::new(svc);

另外hyper作为比较底层的请求库,很多web框架(Axum, Actix web等)都依赖他。 也就也支持了tower, 使得tower实现的中间件就更容易复用了。

总的来说,tower能用Service trait构建一个请求中间件的规范,确实很神奇。从目前实现反推似乎很简单,但其实设计过程中还是有很多考虑的,尤其像需要处理返回的future时。推荐看看官方的这篇inventing-the-service-trait

想了解中间件实现过程的话也推荐看看David Pedersen的Rust live codingTower deep dive (看不了的同学可以B站找找…)

如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : https://newbmiao.github.io/2024/02/03/rust-tower-service.html