Decrusting the tokio crate

主要分为四个部分:

  1. runtime,tokio 是如何执行 Future
  2. resources,与真实世界交互的内容
  3. utilities,其他的 tokio 提供的功能
  4. best practice

运行时

tokio::runtime - Rust

主要包含 event loop、timer、scheduler。

先说 scheduler,它的任务是获取输入的 Future 然后调用它们的 poll 方法。

poll 方法返回的是一个枚举,如果调用后返回 Ready(T),那么 scheduler 就不会做什么;如果返回了 Pending,那么就会把它再放回去,等待下一次再被 poll

tokio 有两个 scheduler,多线程版本和当前线程版本。大多数情况你应该选择多线程版本,它默认会为每个 CPU 核心创建一个线程,这也是比较好的选择。

最最简单的 API 就是 block_on会阻塞当前线程,直到 Future resolved,它会把值返回给你。

另外一个 API 是 spawn,会把你的 Future 放到异步运行时上,然后返回给你 JoinHandleDrop JoinHandle 时并不会调用 abort(),它什么都不会做。

你向 spawn 输入一个 Future,它生成的是一个 task。你的每一个 Future 可能都会链式调用很多 Future,只有最上面一层暴露出来的 Future 才是 task,tokio 只关心 task

对于当前线程版本的 spawn,什么都不会发生,你需要调用 block_on 让 tokio 执行它。

对于多线程版本的 scheduler,每个 CPU 线程都会对应一个线程,每一个线程都维护一个 local task queue,还有一个所有线程可见的全局 queue。

他们的行为都是差不多的,检查 queue,如果有 PendingFuture,就调用 poll 执行;此外,对于多线程版本的 scheduler,在 local queue 没有内容时,它会检查全局 queue,如果也没有等待的 Future,那么可以从其他线程的 local queue steal 一些 Future 执行。所以它是一个 work stealing scheduler。(当然 Future 必须实现 Send

这里有详细的策略介绍:tokio::runtime - Rust

你还可以对 runtime 使用 shutdown_background 或者 shutdown_timeout

阻塞

不要在异步上下文中随意做一个会阻塞的操作,比如使用了同步的 io 接口,这会导致操作系统层面的这个线程被阻塞,它上面的所有 Future 自然也没法执行。

实际上你应该避免长时间占用一个 worker thread. 比如做阻塞操作、运行一个时间很长的计算。

在你要做一个“阻塞”操作时,你应该告诉 tokio

一个解决方案是使用运行时的 spawn_blocking 方法。tokio 实际上维护了两个线程池,一个用于异步任务,一个用于完成阻塞计算,后者的数量上限很高,而且它就是被设计出来做阻塞操作的。

不过这种方法要求你的闭包满足 Send

还有一个方案是 tokio::task::block_in_place,这个函数传入的闭包不需要 Send,此时这个线程会被阻塞,并且把它的所有异步任务都交给一个新的 worker thread.(实际上要更聪明一点)

这种方法使用时要仔细阅读文档。

block_in_place in tokio::task - Rust

LocalSet

tokio 提供了 task::LocalSet,这里面的 task 并不会被发送到其他线程执行。但你并不能在 Future 链的深处使用。

LocalSet in tokio::task - Rust

tokio 的 Mutex 和 std Mutex 有什么区别?

最重要的区别是 tokio 的 Mutex 是异步的。看起来很简单,但实际上着涉及到一些本质问题。标准库的 Mutex 实际上非常高效,而 tokio 为了让 Mutex 成为异步的牺牲了一些性能。

提供异步锁的原因并不是因为上锁的时候调用 .await 很酷,一个原因是因为你的 worker thread 持有标准库的同步锁太久并不好,另一个重要的原因是,如果你持有锁,并且在持有锁的期间使用了 .await 而你没有释放锁,在此期间如果 Future 被移动到其他线程,那就会有问题。正因此,标准库的锁并不是 Send 的,这就导致你的 Future 也不会是 Send,这时你应该使用 tokio 的 Mutex。

Mutex in tokio::sync - Rust

IO 资源

目前只介绍了 runtime 的内容,它只负责执行 Future,而你需要和现实世界交互。

tokio 提供了很多工具来与现实世界交互,例如 TcpStream、文件系统等等。

如果一个对象可以在异步语境下被读取,那么 tokio 会为它实现 AsyncRead trait,它是异步的 primitive。

实际上可以这样思考,通常在 Future 链组成的任务的最底层,你会与真实世界交互,而 Future 不能执行下去的原因(返回 Pending)是因为这些 IO 资源还没有准备好。

使用 .await 实际上是 desugar,会把你的异步上下文等内容传入函数,异步上下文只有 Waker 重要,它定义在标准库里。如果一个 Future 返回 Pending 那么 tokio 会让他处于等待队列,之后某个组件调用了 wake(),则是通知 tokio,这个 Future 可以继续运行了。谁调用的 wake() ? 通常是底层的 I/O 事件循环。tokio 会在顶层调用任务时创建 Waker,你的异步上下文会一路传递到调用链的最底层,等 I/O 事件循环通知你完成后,再调用 wake()

Waker 的实现使用了 vtable,为什么它不是 trait?因为它要实现 Clone trait,而 Clone 不是类型安全的。

实际上某些结构的方法也允许你自己调用 wake()

为什么 tokio 没有使用标准库的 Read trait ? 因为 Read trait 需要的是 &mut SelfAsyncRead 需要的是 Pin<&mut Self>,你需要 Pin。此外需要 Poll<T> 表明操作到底是成功还是 Pending,标准库的 Read 返回的是 WouldBlock Err,很难受。

fs

这些 IO 设施看起来和标准库差不多,当然除了 tokio 提供的方法都是异步的。

但也有一些差别。

tokio::fs 是异步版本的文件系统,但实际上有点蛋疼,因为很多系统根本不提供异步访问文件的方式。tokio 目前使用 spawn_blocking 在后台完成这些操作。

tokio::fs - Rust

process

tokio 跟标准库的进程相似的地方是,当你 drop 掉子线程的 handle,子线程并不会停止。

你生成的子进程并不是一个 Future,你只能调用 wait 来等待他。

tokio::process - Rust

Ext

除了 AsyncRead, AsyncWrite 外,实际上还提供了 AysncReadExt, AsyncWriteEct,这两个 trait 提供了很多工具函数很方便。Buf 版本同理。

此外,最好不要让你的 IO 资源上锁,你可以选择用 channel 来组织多线程通信。

Stream & Sink

AsyncReadAsyncWrite 是你想读取/写入的独立的一系列的字节。

Stream 读取一系列收到的元素,类似于迭代器,每次调用 next() 都会给你一个元素;AsyncRead 是读取一系列的字节,并不在乎 frame。

Sink 同理,跟 AsyncWrite 呼应,二者的关系跟 Stream / AsyncRead 类似。

Stream / Sink 就像是异步版本的迭代器一样。

那怎么知道读取到的字节能不能组成一个元素?tokio_util 提供了 codec 用于完成这个工作。

但要注意的是,tokio 的基础类型不会实现 Steam / Sink trait,因为不会引入公共依赖。但提供了 tokio_stream::wrappers

tokio_stream - Rust

tokio_util::codec - Rust

Stream 可以常用于 Websocket

同步

sync 模块提供了很多同步 Future 的设施。

常见的通用的 MPSC channel 自然可以解决很多问题,不过通常会有更好的方法。

首先就是 oneshot,它的特点是提供了一个 blocking_recv(),此外,发送方的 send 也不是异步的。oneshot 可以连接同步和异步的世界。

然后就是不太常用的 broadcast,广播。但注意的是 broadcast 受制于接受消息最慢的那个客户端,你要仔细考虑程序的设计,并且程序被拖累时执行正确的行为。

比较酷的是 watch channel,是一种 broadcast,但容量只有 1,也就是说它只保留最后一个值。这样订阅者就只能看到最新的一个值了。

Notify 是一个比较底层的设施,在 Future 中等待被通知,然后你可以在同步的语境下调用 notify_one(),即可通知别人,你可以考虑在实现自己的 IO 资源中使用这个工具。这样你就不用蛋疼的使用底层的轮询设施了。它也可以跨越同步和异步的界限。

最后一个值得介绍的是 Semaphore,你通常不应该直接使用它。但你可以了解一下,比如锁就是一个只有 1 的信号量。通过它你可以限制你的服务器的访问人数,或者请求数量等等

tokio::sync - Rust

JoinSet

你生成了一大堆 Future 然后想要等待他们完成,你可能会想用 vector 来做这个事情,但实际上使用 JoinSet 更好,它并不会像循环一样受制于访问 Future 的顺序,而是以他们完成的顺序返回。你只需要调用 join_next()

JoinSet in tokio::task - Rust

tokio 还有 join!,可以并发的等待固定个数的 Future

select! 以及 cancellation

select! 代表你有一堆 Future,但你只想等待其中一个完成;比如等待 TCP 连接的同时等待用户按 ctrl-c。

使用 select! 时,每个分支都要求是一个 Future,他们会并发执行,如果其中一个 Future 结束了,就会运行他那个分支的代码块的其余代码,之后释放其他的 Future

**要特别注意的是,如果你的 select! 在循环中,那么其分支的 Future 需要允许被安全的取消。**因为你的 Future 内部可能维护了一些状态,需要确保他们可以被安全的取消。(换句话说你要确保你需要的状态是可控的)

tokio 的文档列出了组件是否 cancel safety;还有一种办法是,使用 tokio::pin! 来 pin 住不安全的 Future,然后在 select! 的分支里使用 &mut fut,这样做行得通是因为 pin 住后你就不会在 select! 的作用域里取消 Future 了。

当然,如果你的 select! 不在循环里,那就简单了,因为通常你也无所谓 Future 被取消之后的状态。

select in tokio - Rust

Cancellation Token

tokio_util crate 中有很多有意思的工具,比如 CancellationToken,如果你创建了一大堆 Future 在后台运行,然后你想一并取消他们,那这个工具可以派上用场。

你可以创建一个 CancellationToken,然后调用 .cancelled(),这个方法只会在 token 的 cancle() 被调用后才会返回,然后再配合 select!

它有点像一个 Notify 一样。

所有的 CancellationToken 副本都是相同的,所有持有它的都会收到取消通知。

这个东西不在标准库,所以如果是库作者就很难受了,只能选择实现两套 API,一个多接受 CancellationToken,允许用户取消;另一个则是阻塞/永远等待。

CancellationToken in tokio_util::sync - Rust