Stream trait
trait Stream {
// 由 `stream` 产生的值的类型
type Item;
// 尝试解析 `stream` 中的下一项
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
Stream trait 是一个定义了异步流(asynchronous stream)的接口。
异步流表示一个异步可迭代集合,每个元素都可由异步代码异步生成
该模式常用于流式处理或与数据吞吐量无关的处理,带来性能提升。
Stream 定义了一个具有一个相关联类型 Self::Item 的抽象类型
这个类型是流内部元素的类型。
通过将此类型与 Option 结合使用,可以有效地表示流终止的状态。
Stream trait 的最重要的方法是 poll_next()
该方法是一个异步实现的访问器,它用于从流中获取下一个元素
在异步编程中,self 参数必须是 Pin<&mut Self> 类型
以确保安全地存储 stream 的内存,同时保持它们可变性和引用关系。
poll_next() 中,需要使用 Context 类型来跟踪和管理对流的异步访问。
Context 是一个包含有关异步上下文的上下文,例如调度器、唤醒器和运行程序的线程等。
了解上下文非常重要,因为它是在执行异步任务期间进行重要决策的关键因素。
poll_next() 方法返回一个 Poll<OptionSelf::Item> 枚举值
它表示将下一个元素轮询到结果的状态:
- 如果流仍具有元素,则返回 Poll::Ready(Some(element))
- 如果流结束,则返回 Poll::Ready(None)
- 如果无法轮询下一个元素,则返回 Poll::Pending
Stream trait 还定义了许多其他方法,如 map() 和 filter() 等
它们与 Iterator trait 相似,并允许转换、过滤和操作流中的元素。
例如
可以使用如下代码来实现 Stream trait,从而使类型成为异步流:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
struct MyStream {
// implement fields...
}
impl Stream for MyStream {
type Item = String;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
// implement poll_next() method
}
}
在这个例子中,定义了一个 MyStream 类型,它实现了 Stream trait 并包含了一个 String 类型的元素。
使用大多数异步流通常具备的 poll_next() 方法,该方法将 String 类型元素作为下一个生成项返回。
示例
[dependencies]
futures = "0.3"
tokio = { version = "1.16.0", features = ["full"] }
tokio-stream = "0.1.14"
生成1~9的序列
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream; // 引入第三方 crate `tokio_stream`
use tokio_stream::StreamExt;
use tokio::main;
struct MyStream {
value: i32,
}
impl Stream for MyStream {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// 模拟一些计算
let dst = self.value + 1;
// 这里不需要返回值
std::mem::replace(&mut self.value, dst);
// 如果值小于 10,返回它,否则返回 None
if self.value < 10 {
Poll::Ready(Some(self.value))
} else {
Poll::Ready(None)
}
}
}
#[tokio::main]
async fn main() {
let stream = MyStream { value: 0 };
println!("main begin");
// 使用异步方式处理每个生成项
tokio::pin!(stream); // 将流钉住以确保安全性
// await必须要在async块中
while let Some(val) = stream.next().await {
println!("Got {}", val);
}
}
迭代
有很多不同的方法可以迭代处理 Stream 中的值
map,filter 和 fold,以及“遇错即断”版本 try_map,try_filter 和 try_fold
for 循环不能用在 Stream 上,while let 以及 next/try_next 函数还可以使用
计算 stream 中所有元素的和
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
// StreamExt trait 中包含了一些非常有用的方法
// 例如 fold、fold_while、 for_each 和 next 等
// 这里是用next
use futures::stream::StreamExt;
let mut sum = 0;
// 迭代 stream 中的所有元素,并将它们相加
// 每次迭代时调用 next 方法返回下一个元素,并将其存储在 item 的变量中
// 如果 next 方法返回了 None,则循环将结束
// 这是异步代码,必须使用 await 关键字来等待 next 方法的完成
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
Futures 和 Streams 是 Rust 异步编程的础。
Futures 表示异步计算的结果,使用 async 和 await 将它们组合在一起,进行非阻塞的异步计算。
Streams 表示异步生成的一系值,使用 Stream trait 对它们进行操作,并逐渐生成结果。
使用try版本
// 该函数可能返回计算过程中的 IO 错误
async fn sum_with_try_next(mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // 对于 `try_next`
let mut sum = 0;
// 在这里使用了 try_next 方法
// 因为 stream 的元素类型是 Result<i32, io::Error>
// 这个方法可以返回 Err,进而把错误传递到函数的调用方
//
// 如果 try_next 方法返回一个 Ok 值
// 则 item 变量会包含值
// 如果返回值是一个 Err
// 则该错误通过 ? 操作符传播,导致整个函数退出并将错误返回给调用方
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
并发
如果每次只处理一个元素,如上文,就会失去并发的机会
为了并发处理一个 Stream 的多个值,使用 for_each_concurrent 或 try_for_each_concurrent 方法
使用异步并发处理繁重的、耗时的工作,可以更高效地处理大型计算,提高代码的可伸缩性。
但是过度并发也会导致程序性能下降或崩溃
// 参数指向流的可变引用,用于在异步流上提供异步迭代器功能
async fn jump_around(mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>) -> Result<(), io::Error> {
// 对于 `try_for_each_concurrent`
use futures::stream::TryStreamExt;
// 限制最大并发队列长度
const MAX_CONCURRENT_JUMPERS: usize = 100;
// try_for_each_concurrent 方法需要一个函数作为参数
// 该函数接受 num 参数,并返回一个 Future 对象
// 使用 async move {...}
// 来桥接 jump_n_times 和 report_n_jumps 两个异步函数
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
// 在异步代码中,必须使用异步执行
// 因此这里也使用 await 关键字来异步等待该函数的执行结果
// 模拟一个人跳跃 num 次
jump_n_times(num).await?;
// 报告一个人跳跃了 num 次。
report_n_jumps(num).await?;
// 没有IO错误
Ok(())
}).await?;
Ok(())
}
附录
#[tokio::main]
Rust 宏(macro),用于将一个异步函数标记为使用 Tokio 运行时(runtime)执行。
Tokio 是一种支持异步 I/O 和非阻塞 IO 操作的 Rust 异步编程框架,可以轻松处理并发任务和事件驱动的编程模型。
async fn 指的是一个异步函数,它返回一个 Future 对象,可以在执行期间轻松地挂起和恢复。而异步 I/O 和非阻塞 IO 操作可以提高程序性能,因为它们可以在等待某些操作完成时,同时处理其他任务。
#[tokio::main] 宏可以用于标记一个异步函数作为程序的主函数,并在其中使用 Tokio runtime 来执行异步任务。这个宏用于创建一个可执行程序,就像普通的 main 函数一样,但在 Tokio 的异步环境中运行异步函数。
示例
#[tokio::main]
async fn main() {
// 异步代码
}
定义了一个函数 main 并使用 #[tokio::main] 标记它
告诉编译器,这个函数应该被编译为一个异步函数,并使用 Tokio 运行时来执行异步任务。
在函数体内,可以包任意的异步代码,例如调用异步函数、执行异步 I/O 操作、进行并行或流式处理等等。
注意:
在一个 Rust 项目中,只允许一个 tokio::main 函数存在,且必须作为程序入口点使用。
如果编写库,可以使用 #[tokio::main(flavor = “current_thread”)] 注解
以确保库不会创建任何运行时。
此时,调用者代码将需要负责在适当的时间创建和启动任何需要的运行时。
Tokio 只是 Rust 异步编程的其中一种解决方案,并不能解决所有异步编程问题。
其他常见的方案包括 async-std 和 futures-rs。
std::mem::replace
Rust 的标准库 std 中的一个函数,用于在不丢失内存所有权的情况下更改变量的值。replace 函数通常用于对可变变量进行非常规的修改操作,例如交换变量值,插入或移除元素,或在特定条件下强制赋值。
replace 函数采用两个参数:第一个参数是待修改的变量名,第二个参数是新值。该函数会返回旧值,同时将新值赋给变量。
pub fn replace<T>(dest: &mut T, mut src: T) -> T
交换两个变量的值:
let mut a = String::from("hello");
let mut b = String::from("world");
// 把world赋值给a,然后将hello返回赋值给temp
let temp = std::mem::replace(&mut a, b);
// 然后b的值为temp
let b = temp;
println!("a: {}", a);
println!("b: {}", b);
使用 std::mem::replace 函数交换 a 和 b 的值,将变量 b 中的值存储在一个临时变量 temp 中,再将 temp 中的值存储回变量 b 中。
replace 函数会将旧值从变量 dest 中取出,然后将新值 src 存储在该变量中。
因此,它需要完成变量类型之间的复制或移动操作。
在处理大型结构体或复杂数据类型时,这可能会导致性能和内存问题。
此外,replace 函数只适用于可变引用。
如果使用的是非可变引用,则需要使用其他方法来修改变量的值。
?
Rust 编程语言中的一个语法糖,用于简化错误处理代码的编写。
它一般用于函数的返回值类型为 Result 或 Option 的情况下,可以把一系列可能的错误和状态转换的判断语句,缩减为一行代码。
在 Rust 中,当遇到错误时,常规的处理方式是通过 match 或 if let 等语句进行模式匹配和错误处理。这样会导致每次检查错误的代码严重膨胀,并且会淹没主流程的本质。
同时 Rust 还提供了内置的 Error 类型 Result<T, E>,以及在标准库中为此类型提供的一些方法,例如 unwrap、unwrap_or、map、and_then、or 等。这些方法可以很好地处理错误,但其缺点是将错误的检查和处理条件与主流程代码混为一体。
在这种情况下,? 操作符就是一种更加简单和精简语法,它的作用是将当前函数返回值的 Result 或 Option 类型的内容进行检查,如果是 Ok 则返回 Ok 中的值。如果是 Err,则直接将这个 Err 返回出去,并中断当前函数的执行。
?操作符在某种程度上类似于其他编程语言中的异常的处理机制,但由于 Rust 非常强制性地提供了对错误的抽象和处理,使得最终 Rust 可以避免异常所导致的问题。
use std::fs::File;
use std::io::Read;
fn read_file(path: &str) -> Result<String, std::io::Error> {
let mut file = File::open(path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}
定义一个 read_file 函数,它打开指定路径的文件,并读取文件内容。
函数的返回类型为 Result<String, std::io::Error>,表示该函数可能会返回一个字符串,或是一个包含 IO 错误的 Err。
使用 ? 操作符,可以将每个 I/O 操作的返回结果都检查一遍,以确保函数总是在遇到 IO 错误时返回错误类型。
在 Rust 中,? 操作符只能在 Result 或 Option 类型中使用,不能在其他类型中使用。文章来源:https://www.toymoban.com/news/detail-630346.html
如果错误类型不是这些类型的任何一种,可以将其包装到一个 Result 或 Option 中,然后再使用 ? 进行处理。文章来源地址https://www.toymoban.com/news/detail-630346.html
到了这里,关于rust学习-异步流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!