rust学习-异步流

这篇具有很好参考价值的文章主要介绍了rust学习-异步流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Stream trait

trait Stream {
    // 由 `stream` 产生的值的类型
    type Item;

    // 尝试解析 `stream` 中的下一项
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

Stream trait 是一个定义了异步流(asynchronous stream)的接口。
异步流表示一个异步可迭代集合,每个元素都可由异步代码异步生成
该模式常用于流式处理或与数据吞吐量无关的处理,带来性能提升。

Stream 定义了一个具有一个相关联类型 Self::Item 的抽象类型
这个类型是流内部元素的类型。
通过将此类型与 Option 结合使用,可以有效地表示流终止的状态。

Stream trait 的最重要的方法是 poll_next()
该方法是一个异步实现的访问器,它用于从流中获取下一个元素
在异步编程中,self 参数必须是 Pin<&mut Self> 类型
以确保安全地存储 stream 的内存,同时保持它们可变性和引用关系。

poll_next() 中,需要使用 Context 类型来跟踪和管理对流的异步访问。
Context 是一个包含有关异步上下文的上下文,例如调度器、唤醒器和运行程序的线程等。
了解上下文非常重要,因为它是在执行异步任务期间进行重要决策的关键因素。

poll_next() 方法返回一个 Poll<OptionSelf::Item> 枚举值
它表示将下一个元素轮询到结果的状态:

  • 如果流仍具有元素,则返回 Poll::Ready(Some(element))
  • 如果流结束,则返回 Poll::Ready(None)
  • 如果无法轮询下一个元素,则返回 Poll::Pending

Stream trait 还定义了许多其他方法,如 map() 和 filter() 等
它们与 Iterator trait 相似,并允许转换、过滤和操作流中的元素。

例如
可以使用如下代码来实现 Stream trait,从而使类型成为异步流:


use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

struct MyStream {
    // implement fields...
}

impl Stream for MyStream {
    type Item = String;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        // implement poll_next() method
    }
}

在这个例子中,定义了一个 MyStream 类型,它实现了 Stream trait 并包含了一个 String 类型的元素。
使用大多数异步流通常具备的 poll_next() 方法,该方法将 String 类型元素作为下一个生成项返回。

示例

[dependencies]
futures = "0.3"
tokio = { version = "1.16.0", features = ["full"] }
tokio-stream = "0.1.14"

生成1~9的序列

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream; // 引入第三方 crate `tokio_stream`
use tokio_stream::StreamExt;
use tokio::main;

struct MyStream {
    value: i32,
}

impl Stream for MyStream {
    type Item = i32;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // 模拟一些计算
        let dst = self.value + 1;
        // 这里不需要返回值
	    std::mem::replace(&mut self.value, dst);

        // 如果值小于 10,返回它,否则返回 None
        if self.value < 10 {
            Poll::Ready(Some(self.value))
        } else {
            Poll::Ready(None)
        }
    }
}

#[tokio::main]
async fn main() {
    let stream = MyStream { value: 0 };

    println!("main begin");
    // 使用异步方式处理每个生成项
    tokio::pin!(stream); // 将流钉住以确保安全性

	// await必须要在async块中
    while let Some(val) = stream.next().await {
        println!("Got {}", val);
    }
}

迭代

有很多不同的方法可以迭代处理 Stream 中的值
map,filter 和 fold,以及“遇错即断”版本 try_map,try_filter 和 try_fold
for 循环不能用在 Stream 上,while let 以及 next/try_next 函数还可以使用

计算 stream 中所有元素的和

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
	// StreamExt trait 中包含了一些非常有用的方法
	// 例如 fold、fold_while、 for_each 和 next 等
	// 这里是用next
    use futures::stream::StreamExt;
    let mut sum = 0;
    
    // 迭代 stream 中的所有元素,并将它们相加
    // 每次迭代时调用 next 方法返回下一个元素,并将其存储在 item 的变量中
    // 如果 next 方法返回了 None,则循环将结束
    // 这是异步代码,必须使用 await 关键字来等待 next 方法的完成
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

Futures 和 Streams 是 Rust 异步编程的础。
Futures 表示异步计算的结果,使用 async 和 await 将它们组合在一起,进行非阻塞的异步计算。
Streams 表示异步生成的一系值,使用 Stream trait 对它们进行操作,并逐渐生成结果。

使用try版本

// 该函数可能返回计算过程中的 IO 错误
async fn sum_with_try_next(mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // 对于 `try_next`
    let mut sum = 0;
	// 在这里使用了 try_next 方法
	// 因为 stream 的元素类型是 Result<i32, io::Error>
	// 这个方法可以返回 Err,进而把错误传递到函数的调用方 
	//
	// 如果 try_next 方法返回一个 Ok 值
	// 则 item 变量会包含值
	// 如果返回值是一个 Err
	// 则该错误通过 ? 操作符传播,导致整个函数退出并将错误返回给调用方
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

并发

如果每次只处理一个元素,如上文,就会失去并发的机会
为了并发处理一个 Stream 的多个值,使用 for_each_concurrent 或 try_for_each_concurrent 方法

使用异步并发处理繁重的、耗时的工作,可以更高效地处理大型计算,提高代码的可伸缩性。
但是过度并发也会导致程序性能下降或崩溃

// 参数指向流的可变引用,用于在异步流上提供异步迭代器功能
async fn jump_around(mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>) -> Result<(), io::Error> {
	// 对于 `try_for_each_concurrent`
    use futures::stream::TryStreamExt;
    // 限制最大并发队列长度
    const MAX_CONCURRENT_JUMPERS: usize = 100;

	// try_for_each_concurrent 方法需要一个函数作为参数
	// 该函数接受 num 参数,并返回一个 Future 对象
	// 使用 async move {...} 
	// 来桥接 jump_n_times 和 report_n_jumps 两个异步函数
    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
    	// 在异步代码中,必须使用异步执行
		// 因此这里也使用 await 关键字来异步等待该函数的执行结果
		
    	// 模拟一个人跳跃 num 次
        jump_n_times(num).await?;

		// 报告一个人跳跃了 num 次。
        report_n_jumps(num).await?;

		// 没有IO错误
        Ok(())
    }).await?;

    Ok(())
}

附录

#[tokio::main]

Rust 宏(macro),用于将一个异步函数标记为使用 Tokio 运行时(runtime)执行。
Tokio 是一种支持异步 I/O 和非阻塞 IO 操作的 Rust 异步编程框架,可以轻松处理并发任务和事件驱动的编程模型。

async fn 指的是一个异步函数,它返回一个 Future 对象,可以在执行期间轻松地挂起和恢复。而异步 I/O 和非阻塞 IO 操作可以提高程序性能,因为它们可以在等待某些操作完成时,同时处理其他任务。

#[tokio::main] 宏可以用于标记一个异步函数作为程序的主函数,并在其中使用 Tokio runtime 来执行异步任务。这个宏用于创建一个可执行程序,就像普通的 main 函数一样,但在 Tokio 的异步环境中运行异步函数。

示例

#[tokio::main]
async fn main() {
    // 异步代码
}

定义了一个函数 main 并使用 #[tokio::main] 标记它
告诉编译器,这个函数应该被编译为一个异步函数,并使用 Tokio 运行时来执行异步任务。
在函数体内,可以包任意的异步代码,例如调用异步函数、执行异步 I/O 操作、进行并行或流式处理等等。

注意:
在一个 Rust 项目中,只允许一个 tokio::main 函数存在,且必须作为程序入口点使用。
如果编写库,可以使用 #[tokio::main(flavor = “current_thread”)] 注解
以确保库不会创建任何运行时。
此时,调用者代码将需要负责在适当的时间创建和启动任何需要的运行时。

Tokio 只是 Rust 异步编程的其中一种解决方案,并不能解决所有异步编程问题。
其他常见的方案包括 async-std 和 futures-rs。

std::mem::replace

Rust 的标准库 std 中的一个函数,用于在不丢失内存所有权的情况下更改变量的值。replace 函数通常用于对可变变量进行非常规的修改操作,例如交换变量值,插入或移除元素,或在特定条件下强制赋值

replace 函数采用两个参数:第一个参数是待修改的变量名,第二个参数是新值。该函数会返回旧值,同时将新值赋给变量。

pub fn replace<T>(dest: &mut T, mut src: T) -> T

交换两个变量的值:

let mut a = String::from("hello");
let mut b = String::from("world");

// 把world赋值给a,然后将hello返回赋值给temp
let temp = std::mem::replace(&mut a, b);
// 然后b的值为temp
let b = temp;

println!("a: {}", a);
println!("b: {}", b);

使用 std::mem::replace 函数交换 a 和 b 的值,将变量 b 中的值存储在一个临时变量 temp 中,再将 temp 中的值存储回变量 b 中。

replace 函数会将旧值从变量 dest 中取出,然后将新值 src 存储在该变量中。
因此,它需要完成变量类型之间的复制或移动操作。
在处理大型结构体或复杂数据类型时,这可能会导致性能和内存问题。
此外,replace 函数只适用于可变引用。
如果使用的是非可变引用,则需要使用其他方法来修改变量的值。

Rust 编程语言中的一个语法糖,用于简化错误处理代码的编写。
它一般用于函数的返回值类型为 Result 或 Option 的情况下,可以把一系列可能的错误和状态转换的判断语句,缩减为一行代码。

在 Rust 中,当遇到错误时,常规的处理方式是通过 match 或 if let 等语句进行模式匹配和错误处理。这样会导致每次检查错误的代码严重膨胀,并且会淹没主流程的本质。
同时 Rust 还提供了内置的 Error 类型 Result<T, E>,以及在标准库中为此类型提供的一些方法,例如 unwrap、unwrap_or、map、and_then、or 等。这些方法可以很好地处理错误,但其缺点是将错误的检查和处理条件与主流程代码混为一体。

在这种情况下,? 操作符就是一种更加简单和精简语法,它的作用是将当前函数返回值的 Result 或 Option 类型的内容进行检查,如果是 Ok 则返回 Ok 中的值。如果是 Err,则直接将这个 Err 返回出去,并中断当前函数的执行。

?操作符在某种程度上类似于其他编程语言中的异常的处理机制,但由于 Rust 非常强制性地提供了对错误的抽象和处理,使得最终 Rust 可以避免异常所导致的问题。

use std::fs::File;
use std::io::Read;

fn read_file(path: &str) -> Result<String, std::io::Error> {
    let mut file = File::open(path)?;
    let mut contents = String::new();
    file.read_to_string(&mut contents)?;
    Ok(contents)
}

定义一个 read_file 函数,它打开指定路径的文件,并读取文件内容。

函数的返回类型为 Result<String, std::io::Error>,表示该函数可能会返回一个字符串,或是一个包含 IO 错误的 Err。

使用 ? 操作符,可以将每个 I/O 操作的返回结果都检查一遍,以确保函数总是在遇到 IO 错误时返回错误类型。

在 Rust 中,? 操作符只能在 Result 或 Option 类型中使用,不能在其他类型中使用。

如果错误类型不是这些类型的任何一种,可以将其包装到一个 Result 或 Option 中,然后再使用 ? 进行处理。文章来源地址https://www.toymoban.com/news/detail-630346.html

到了这里,关于rust学习-异步流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 如何用rust实现一个异步channel

    使用通信来共享内存,而不是通过共享内存来通信 上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里,几乎所有的go程序里都有channel的身影。 rust的异步和go的goroutine有异曲同工之妙,甚至可以把 tokio::spawn 理解为g

    2024年02月11日
    浏览(72)
  • 【Rust学习】安装Rust环境

    本笔记为了记录学习Rust过程,内容如有错误请大佬指教 使用IDE:vs code 参考教程:菜鸟教程链接: 菜鸟教程链接: 因为我已经安装过VSCode了,所以VSCode的安装方法在此处就不多介绍了,接下来就是安装Rust的编译工具。 Rust 编译工具 可以点击跳转下载Rust 编译工具 新建文件夹,

    2024年01月17日
    浏览(62)
  • 【Rust】Rust学习 第十七章Rust 的面向对象特性

    面向对象编程(Object-Oriented Programming,OOP)是一种模式化编程方式。对象(Object)来源于 20 世纪 60 年代的 Simula 编程语言。这些对象影响了 Alan Kay 的编程架构中对象之间的消息传递。他在 1967 年创造了  面向对象编程  这个术语来描述这种架构。关于 OOP 是什么有很多相互矛

    2024年02月11日
    浏览(45)
  • 【Rust】Rust学习 第九章错误处理

    Rust 将错误组合成两个主要类别: 可恢复错误 ( recoverable )和  不可恢复错误 ( unrecoverable )。可恢复错误通常代表向用户报告错误和重试操作是合理的情况,比如未找到文件。不可恢复错误通常是 bug 的同义词,比如尝试访问超过数组结尾的位置。 大部分语言并不区分这

    2024年02月13日
    浏览(37)
  • 【Rust】Rust学习 第八章常见集合

    Rust 标准库中包含一系列被称为  集合 ( collections )的非常有用的数据结构。大部分其他数据类型都代表一个特定的值,不过集合可以包含多个值。不同于内建的数组和元组类型,这些集合指向的数据是储存在堆上的,这意味着数据的数量不必在编译时就已知,并且还可以随

    2024年02月13日
    浏览(40)
  • 【Rust】Rust学习 第十五章智能指针

    指针  ( pointer )是一个包含内存地址的变量的通用概念。这个地址引用,或 “指向”(points at)一些其他数据。Rust 中最常见的指针是第四章介绍的  引用 ( reference )。引用以   符号为标志并借用了他们所指向的值。除了引用数据没有任何其他特殊功能。它们也没有任

    2024年02月12日
    浏览(33)
  • 【Rust】Rust学习 第十四章智能指针

    指针  ( pointer )是一个包含内存地址的变量的通用概念。这个地址引用,或 “指向”(points at)一些其他数据。Rust 中最常见的指针是第四章介绍的  引用 ( reference )。引用以   符号为标志并借用了他们所指向的值。除了引用数据没有任何其他特殊功能。它们也没有任

    2024年02月12日
    浏览(41)
  • 【Rust】Rust学习 第十六章无畏并发

    安全且高效的处理并发编程是 Rust 的另一个主要目标。 并发编程( Concurrent programming ),代表程序的不同部分相互独立的执行,而 并行编程( parallel programming )代表程序不同部分于同时执行 ,这两个概念随着计算机越来越多的利用多处理器的优势时显得愈发重要。由于历

    2024年02月12日
    浏览(41)
  • 【Rust】Rust学习 第十九章高级特征

    现在我们已经学习了 Rust 编程语言中最常用的部分。在第二十章开始另一个新项目之前,让我们聊聊一些总有一天你会遇上的部分内容。你可以将本章作为不经意间遇到未知的内容时的参考。本章将要学习的功能在一些非常特定的场景下很有用处。虽然很少会碰到它们,我们

    2024年02月11日
    浏览(43)
  • C#基础学习--异步编程

    目录 什么是异步 async/await 特性的结构  什么是异步方法  异步方法的控制流 await 表达式 启动程序时,系统会在内存中创建一个新的 进程 。进程是构成运行程序的资源的集合。进程是构成运行程序的资源的集合。这些资源包括虚地址空间,文件句柄和许多其他程序运行所需

    2023年04月25日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包