Decrusting the tokio crate
CAUTION
主要分为四个部分:
- runtime,tokio 是如何执行
Future的 - resources,与真实世界交互的内容
- utilities,其他的 tokio 提供的功能
- best practice
运行时
主要包含 event loop、timer、scheduler。
先说 scheduler,它的任务是获取输入的 Future 然后调用它们的 poll 方法。
poll 方法返回的是一个枚举,如果调用后返回 Ready(T),那么 scheduler 就不会做什么;如果返回了 Pending,那么就会把它再放回去,等待下一次再被 poll。
tokio 有两个 scheduler,多线程版本和当前线程版本。大多数情况你应该选择多线程版本,它默认会为每个 CPU 核心创建一个线程,这也是比较好的选择。
最最简单的 API 就是 block_on,会阻塞当前线程,直到 Future resolved,它会把值返回给你。
另外一个 API 是 spawn,会把你的 Future 放到异步运行时上,然后返回给你 JoinHandle。Drop JoinHandle 时并不会调用 abort(),它什么都不会做。
你向 spawn 输入一个 Future,它生成的是一个 task。你的每一个 Future 可能都会链式调用很多 Future,只有最上面一层暴露出来的 Future 才是 task,tokio 只关心 task。
对于当前线程版本的 spawn,什么都不会发生,你需要调用 block_on 让 tokio 执行它。
对于多线程版本的 scheduler,每个 CPU 线程都会对应一个线程,每一个线程都维护一个 local task queue,还有一个所有线程可见的全局 queue。
他们的行为都是差不多的,检查 queue,如果有 Pending 的 Future,就调用 poll 执行;此外,对于多线程版本的 scheduler,在 local queue 没有内容时,它会检查全局 queue,如果也没有等待的 Future,那么可以从其他线程的 local queue steal 一些 Future 执行。所以它是一个 work stealing scheduler。(当然 Future 必须实现 Send)
这里有详细的策略介绍:tokio::runtime - Rust
你还可以对 runtime 使用 shutdown_background 或者 shutdown_timeout
阻塞
不要在异步上下文中随意做一个会阻塞的操作,比如使用了同步的 io 接口,这会导致操作系统层面的这个线程被阻塞,它上面的所有 Future 自然也没法执行。
实际上你应该避免长时间占用一个 worker thread. 比如做阻塞操作、运行一个时间很长的计算。
在你要做一个“阻塞”操作时,你应该告诉 tokio。
一个解决方案是使用运行时的 spawn_blocking 方法。tokio 实际上维护了两个线程池,一个用于异步任务,一个用于完成阻塞计算,后者的数量上限很高,而且它就是被设计出来做阻塞操作的。
不过这种方法要求你的闭包满足 Send。
还有一个方案是 tokio::task::block_in_place,这个函数传入的闭包不需要 Send,此时这个线程会被阻塞,并且把它的所有异步任务都交给一个新的 worker thread.(实际上要更聪明一点)
这种方法使用时要仔细阅读文档。
LocalSet
tokio 提供了 task::LocalSet,这里面的 task 并不会被发送到其他线程执行。但你并不能在 Future 链的深处使用。
锁
tokio 的 Mutex 和 std Mutex 有什么区别?
最重要的区别是 tokio 的 Mutex 是异步的。看起来很简单,但实际上着涉及到一些本质问题。标准库的 Mutex 实际上非常高效,而 tokio 为了让 Mutex 成为异步的牺牲了一些性能。
提供异步锁的原因并不是因为上锁的时候调用 .await 很酷,一个原因是因为你的 worker thread 持有标准库的同步锁太久并不好,另一个重要的原因是,如果你持有锁,并且在持有锁的期间使用了 .await 而你没有释放锁,在此期间如果 Future 被移动到其他线程,那就会有问题。正因此,标准库的锁并不是 Send 的,这就导致你的 Future 也不会是 Send,这时你应该使用 tokio 的 Mutex。
IO 资源
目前只介绍了 runtime 的内容,它只负责执行 Future,而你需要和现实世界交互。
tokio 提供了很多工具来与现实世界交互,例如 TcpStream、文件系统等等。
如果一个对象可以在异步语境下被读取,那么 tokio 会为它实现 AsyncRead trait,它是异步的 primitive。
实际上可以这样思考,通常在 Future 链组成的任务的最底层,你会与真实世界交互,而 Future 不能执行下去的原因(返回 Pending)是因为这些 IO 资源还没有准备好。
使用
.await实际上是 desugar,会把你的异步上下文等内容传入函数,异步上下文只有Waker重要,它定义在标准库里。如果一个Future返回Pending那么 tokio 会让他处于等待队列,之后某个组件调用了wake(),则是通知 tokio,这个Future可以继续运行了。谁调用的wake()? 通常是底层的 I/O 事件循环。tokio 会在顶层调用任务时创建Waker,你的异步上下文会一路传递到调用链的最底层,等 I/O 事件循环通知你完成后,再调用wake()。
Waker的实现使用了 vtable,为什么它不是 trait?因为它要实现Clonetrait,而Clone不是类型安全的。实际上某些结构的方法也允许你自己调用
wake()
为什么 tokio 没有使用标准库的
Readtrait ? 因为Readtrait 需要的是&mut Self而AsyncRead需要的是Pin<&mut Self>,你需要Pin。此外需要Poll<T>表明操作到底是成功还是Pending,标准库的Read返回的是WouldBlockErr,很难受。
fs
这些 IO 设施看起来和标准库差不多,当然除了 tokio 提供的方法都是异步的。
但也有一些差别。
tokio::fs 是异步版本的文件系统,但实际上有点蛋疼,因为很多系统根本不提供异步访问文件的方式。tokio 目前使用 spawn_blocking 在后台完成这些操作。
process
tokio 跟标准库的进程相似的地方是,当你 drop 掉子线程的 handle,子线程并不会停止。
你生成的子进程并不是一个 Future,你只能调用 wait 来等待他。
Ext
除了 AsyncRead, AsyncWrite 外,实际上还提供了 AysncReadExt, AsyncWriteEct,这两个 trait 提供了很多工具函数很方便。Buf 版本同理。
此外,最好不要让你的 IO 资源上锁,你可以选择用 channel 来组织多线程通信。
Stream & Sink
AsyncRead 和 AsyncWrite 是你想读取/写入的独立的一系列的字节。
Stream 读取一系列收到的元素,类似于迭代器,每次调用 next() 都会给你一个元素;AsyncRead 是读取一系列的字节,并不在乎 frame。
Sink 同理,跟 AsyncWrite 呼应,二者的关系跟 Stream / AsyncRead 类似。
Stream / Sink 就像是异步版本的迭代器一样。
那怎么知道读取到的字节能不能组成一个元素?tokio_util 提供了 codec 用于完成这个工作。
但要注意的是,tokio 的基础类型不会实现 Steam / Sink trait,因为不会引入公共依赖。但提供了 tokio_stream::wrappers。
Stream可以常用于Websocket
同步
sync 模块提供了很多同步 Future 的设施。
常见的通用的 MPSC channel 自然可以解决很多问题,不过通常会有更好的方法。
首先就是 oneshot,它的特点是提供了一个 blocking_recv(),此外,发送方的 send 也不是异步的。oneshot 可以连接同步和异步的世界。
然后就是不太常用的 broadcast,广播。但注意的是 broadcast 受制于接受消息最慢的那个客户端,你要仔细考虑程序的设计,并且程序被拖累时执行正确的行为。
比较酷的是 watch channel,是一种 broadcast,但容量只有 1,也就是说它只保留最后一个值。这样订阅者就只能看到最新的一个值了。
Notify 是一个比较底层的设施,在 Future 中等待被通知,然后你可以在同步的语境下调用 notify_one(),即可通知别人,你可以考虑在实现自己的 IO 资源中使用这个工具。这样你就不用蛋疼的使用底层的轮询设施了。它也可以跨越同步和异步的界限。
最后一个值得介绍的是 Semaphore,你通常不应该直接使用它。但你可以了解一下,比如锁就是一个只有 1 的信号量。通过它你可以限制你的服务器的访问人数,或者请求数量等等。
JoinSet
你生成了一大堆 Future 然后想要等待他们完成,你可能会想用 vector 来做这个事情,但实际上使用 JoinSet 更好,它并不会像循环一样受制于访问 Future 的顺序,而是以他们完成的顺序返回。你只需要调用 join_next()
tokio 还有
join!,可以并发的等待固定个数的Future。
select! 以及 cancellation
select! 代表你有一堆 Future,但你只想等待其中一个完成;比如等待 TCP 连接的同时等待用户按 ctrl-c。
使用 select! 时,每个分支都要求是一个 Future,他们会并发执行,如果其中一个 Future 结束了,就会运行他那个分支的代码块的其余代码,之后释放其他的 Future。
**要特别注意的是,如果你的 select! 在循环中,那么其分支的 Future 需要允许被安全的取消。**因为你的 Future 内部可能维护了一些状态,需要确保他们可以被安全的取消。(换句话说你要确保你需要的状态是可控的)
tokio 的文档列出了组件是否 cancel safety;还有一种办法是,使用 tokio::pin! 来 pin 住不安全的 Future,然后在 select! 的分支里使用 &mut fut,这样做行得通是因为 pin 住后你就不会在 select! 的作用域里取消 Future 了。
当然,如果你的 select! 不在循环里,那就简单了,因为通常你也无所谓 Future 被取消之后的状态。
Cancellation Token
tokio_util crate 中有很多有意思的工具,比如 CancellationToken,如果你创建了一大堆 Future 在后台运行,然后你想一并取消他们,那这个工具可以派上用场。
你可以创建一个 CancellationToken,然后调用 .cancelled(),这个方法只会在 token 的 cancle() 被调用后才会返回,然后再配合 select!。
它有点像一个 Notify 一样。
所有的 CancellationToken 副本都是相同的,所有持有它的都会收到取消通知。
这个东西不在标准库,所以如果是库作者就很难受了,只能选择实现两套 API,一个多接受 CancellationToken,允许用户取消;另一个则是阻塞/永远等待。