async/await 和 Stream 流处理
在入门章节中,我们简单学习了该如何使用 async/.await
, 同时在后面也了解了一些底层原理,现在是时候继续深入了。
async/.await
是 Rust 语法的一部分,它在遇到阻塞操作时( 例如 IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
有两种方式可以使用 async
: async fn
用于声明函数,async { ... }
用于声明语句块,它们会返回一个实现 Future
特征的值:
async
是懒惰的,直到被执行器 poll
或者 .await
后才会开始运行,其中后者是最常用的运行 Future
的方法。 当 .await
被调用时,它会尝试运行 Future
直到完成,但是若该 Future
进入阻塞,那就会让出当前线程的控制权。当 Future
后面准备再一次被运行时(例如从 socket
中读取到了数据),执行器会得到通知,并再次运行该 Future
,如此循环,直到完成。
以上过程只是一个简述,详细内容在底层探秘中已经被深入讲解过,因此这里不再赘述。
async
的生命周期
async fn
函数如果拥有引用类型的参数,那它返回的 Future
的生命周期就会被这些参数的生命周期所限制:
意味着 async fn
函数返回的 Future
必须满足以下条件: 当 x
依然有效时, 该 Future
就必须继续等待( .await
), 也就是说 x
必须比 Future
活得更久。
在一般情况下,在函数调用后就立即 .await
不会存在任何问题,例如foo(&x).await
。但是,若 Future
被先存起来或发送到另一个任务或者线程,就可能存在问题了:
以上代码会报错,因为 x
的生命周期只到 bad
函数的结尾。 但是 Future
显然会活得更久:
error[E0597]: `x` does not live long enough
--> src/main.rs:4:14
|
4 | borrow_x(&x) // ERROR: `x` does not live long enough
| ---------^^-
| | |
| | borrowed value does not live long enough
| argument requires that `x` is borrowed for `'static`
5 | }
| - `x` dropped here while still borrowed
其中一个常用的解决方法就是将具有引用参数的 async fn
函数转变成一个具有 'static
生命周期的 Future
。 以上解决方法可以通过将参数和对 async fn
的调用放在同一个 async
语句块来实现:
如上所示,通过将参数移动到 async
语句块内, 我们将它的生命周期扩展到 'static
, 并跟返回的 Future
保持了一致。
async move
async
允许我们使用 move
关键字来将环境中变量的所有权转移到语句块内,就像闭包那样,好处是你不再发愁该如何解决借用生命周期的问题,坏处就是无法跟其它代码实现对变量的共享:
当.await 遇见多线程执行器
需要注意的是,当使用多线程 Future
执行器( executor
)时, Future
可能会在线程间被移动,因此 async
语句块中的变量必须要能在线程间传递。 至于 Future
会在线程间移动的原因是:它内部的任何.await
都可能导致它被切换到一个新线程上去执行。
由于需要在多线程环境使用,意味着 Rc
、 RefCell
、没有实现 Send
的所有权类型、没有实现 Sync
的引用类型,它们都是不安全的,因此无法被使用
需要注意!实际上它们还是有可能被使用的,只要在
.await
调用期间,它们没有在作用域范围内。
类似的原因,在 .await
时使用普通的锁也不安全,例如 Mutex
。原因是,它可能会导致线程池被锁:当一个任务获取锁 A
后,若它将线程的控制权还给执行器,然后执行器又调度运行另一个任务,该任务也去尝试获取了锁 A
,结果当前线程会直接卡死,最终陷入死锁中。
因此,为了避免这种情况的发生,我们需要使用 futures
包下的锁 futures::lock
来替代 Mutex
完成任务。
Stream 流处理
Stream
特征类似于 Future
特征,但是前者在完成前可以生成多个值,这种行为跟标准库中的 Iterator
特征倒是颇为相似。
关于 Stream
的一个常见例子是消息通道( futures
包中的)的消费者 Receiver
。每次有消息从 Send
端发送后,它都可以接收到一个 Some(val)
值, 一旦 Send
端关闭( drop
),且消息通道中没有消息后,它会接收到一个 None
值。
迭代和并发
跟迭代器类似,我们也可以迭代一个 Stream
。 例如使用 map
,filter
,fold
方法,以及它们的遇到错误提前返回的版本: try_map
,try_filter
,try_fold
。
但是跟迭代器又有所不同,for
循环无法在这里使用,但是命令式风格的循环while let
是可以用的,同时还可以使用next
和 try_next
方法:
上面代码是一次处理一个值的模式,但是需要注意的是:如果你选择一次处理一个值的模式,可能会造成无法并发,这就失去了异步编程的意义。 因此,如果可以的话我们还是要选择从一个 Stream
并发处理多个值的方式,通过 for_each_concurrent
或 try_for_each_concurrent
方法来实现: