如何用rust实现一个异步channel

这篇具有很好参考价值的文章主要介绍了如何用rust实现一个异步channel。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

使用通信来共享内存,而不是通过共享内存来通信

上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里,几乎所有的go程序里都有channel的身影。
rust的异步和go的goroutine有异曲同工之妙,甚至可以把 tokio::spawn 理解为go关键字。但在rust中好像并没有异步channel的实现。本着求人不如求己的原则,决定diy一个类似go的channel。

思路

先看一下发送流程

再看一下接收流程

总体来说流程清晰易懂,不管接收还是发送,都是先尝试从缓存队列中操作值,不成功则加入到对应队列,等待再次执行。反之则唤起相关任务,结束操作。

实现功能

  1. 首先需要实现一个存放值的环形缓冲区,并且每个单元应该是单独加锁的,从而避免全局锁。
  2. 需要两个任务队列,用来存放在饥饿模式(从缓存操作失败)下的 发送任务和接受任务。
  3. 按照rust习惯,将发送者和接受者拆开,并各自实现future
  4. 因为唤醒不是同步的,需要通过一个唤醒器来唤醒沉默的任务。
  5. 使用原子操作替代锁

代码实现

具体的就不写了,放在github上了
github地址:https://github.com/woshihaoren4/wd_tools/tree/main/src/channel

测试

这里主要和async-channel测试一下

  • async-channel 是最常见的异步channel,在crateio上有两千万的下载。

先引测试版包

cargo.toml

[dependencies]
tokio = {version = "1.22.0",features=["full"]}
wd_tools = {version = "0.8.3",features = ["sync","chan"]}
async-channel = "1.8.0"
  • wd_tools 是我们的channel,这里引用的sync chan两个feature,前者用于测试,后者是chan实现。

测试代码

测试场景:设置缓存长度为10,发100万数据,接100万数据。在1发送者1接受者,1发送者10接受者,10发送者1接受者,10发送者10接受者四种情况下的收发性能。

use std::fmt::Debug;
use wd_tools::channel as wd;
use async_channel as ac;

#[tokio::main]
async fn main(){
    let ts = TestService::new(10);
    println!("test start ------------> wd_tools");
    ts.send_to_recv("1-v-1",true,100_0000,1,100_0000,1,|x|x).await;
    ts.send_to_recv("1-v-10",true,100_0000,1,10_0000,10,|x|x).await;
    ts.send_to_recv("10-v-1",true,10_0000,10,100_0000,1,|x|x).await;
    ts.send_to_recv("10-v-10",true,10_0000,10,10_0000,10,|x|x).await;
    println!("wd_tools <------------- test over");
    println!("test start ------------> async-channel");
    ts.send_to_recv("1-v-1",false,100_0000,1,100_0000,1,|x|x).await;
    ts.send_to_recv("1-v-10",false,100_0000,1,10_0000,10,|x|x).await;
    ts.send_to_recv("10-v-1",false,10_0000,10,100_0000,1,|x|x).await;
    ts.send_to_recv("10-v-10",false,10_0000,10,10_0000,10,|x|x).await;
    println!("async-channel <------------ test over");
}

struct TestService<T>{
    wd_sender : wd::Sender<T>,
    wd_receiver : wd::Receiver<T>,
    ac_sender : ac::Sender<T>,
    ac_receiver : ac::Receiver<T>
}

impl<T:Unpin+Send+Sync+Debug+'static> TestService<T>{
    pub fn new(cap:usize)->TestService<T>{
        let (wd_sender,wd_receiver) = wd::Channel::new(cap);
        let (ac_sender,ac_receiver) = ac::bounded(cap);
        TestService{wd_sender,wd_receiver,ac_sender,ac_receiver}
    }

    pub fn send<G:Fn(usize)->T+Send+Sync+'static>(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize,generater:G){
        let wd_sender = self.wd_sender.clone();
        let ac_sender = self.ac_sender.clone();

        wg.defer_args1(|is_wd|async move{
            for i in 0..max {
                let t = generater(i);
                if is_wd {
                    wd_sender.send(t).await.expect(" 发送失败");
                }else{
                    ac_sender.send(t).await.expect(" 发送失败");
                }
            }
        },is_wd);
    }
    pub fn recv(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize){
        let wd_receiver = self.wd_receiver.clone();
        let ac_receiver = self.ac_receiver.clone();

        wg.defer_args1(|is_wd|async move{
            for _i in 0..max {
                if is_wd {
                    wd_receiver.recv().await.expect(" 接收失败");
                }else{
                    ac_receiver.recv().await.expect(" 接收失败");
                }
            }
        },is_wd);
    }

    pub async fn send_to_recv<G:Fn(usize)->T+Send+Sync+Clone+'static>(&self,info:&'static str, is_wd:bool, sbase:usize, sgroup:usize, rbase:usize, rgroup:usize, generater:G){
        let now = std::time::Instant::now();
        let wg = wd_tools::sync::WaitGroup::default();
        let wg_send = wd_tools::sync::WaitGroup::default();
        let wg_recv = wd_tools::sync::WaitGroup::default();

        for _ in 0..sgroup{
            self.send(wg_send.clone(),is_wd,sbase,generater.clone());
        }
        for _ in 0..rgroup{
            self.recv(wg_recv.clone(),is_wd,rbase);
        }

        wg.defer(move ||async move{
            let now = std::time::Instant::now();
            wg_send.wait().await;
            println!("test[{}] ---> send use time:{}ms",info,now.elapsed().as_millis());
        });
        wg.defer(move ||async move{
            let now = std::time::Instant::now();
            wg_recv.wait().await;
            println!("test[{}] ---> recv use time:{}ms",info,now.elapsed().as_millis());
        });

        wg.wait().await;
        println!("test[{}] ---> all use time:{}ms",info,now.elapsed().as_millis());
    }
}

结果与分析

测试10次,取平均值做表,如下
如何用rust实现一个异步channel
如上图,得结论

  • 在1发收者和10发收者的情况下,两种channel效率相差不多。
  • 在发送者和接受者数量不等时,wd_tools::channel的性能明显优于async-channel

思考

分析结论之前先看一下async-channel的实现。虽然async-channel也是异步,但它并不依赖某个异步运行时来进行任务的上线文切换,而是使用concurrent-queueevent-listener进行消息调度,底层依赖于std::thread::park_timeout

相比event-listener的调度方式,直接管理tokio的Context则更适用于异步环境。尤其是存在大量等待的场景。如上面测试,接受者和发送者数量不等,需要长时间等待的情况。实际开发中,接受者或者发送者可能长时间处于饥饿的情况下,wd_tools::channel不会产生多余的资源开销,毕竟上下文被挂起了,也就不会被cpu执行。

当然实际是复杂的,因情而异,使用的CPU数量(线程数),缓存长度,异步任务数同样会影响消息队列的性能,尤其是不需要等待的场景下async-channel性能更优。

wd_tools::channel则更适合tokio异步环境。并且不会引起线程park,而产生其他影响。

尾语

wd_tools::channel 目前只是一个初级版本,还有很多地方待优化,比如过多的状态判断,对缓存区直接轮训加锁,而没有采用优化算法, 唤醒器完全可以通过一定优化策略替换带。
但这个思路是没错的,欢迎有想法的同志加入进来。文章来源地址https://www.toymoban.com/news/detail-506367.html

到了这里,关于如何用rust实现一个异步channel的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 00-Rust前言

            问:为什么要近期想学习Rust?         答: Rust出来也是有一段时间了,从Microsoft吵着要重构他们的C++\\\"祖传代码\\\"开始,Rust就披着“高效,安全”的头衔。而自己决定要学习Rust,是因为近期发现:涉及工作的部分开源代码也在悄然的发现变化。         例如:Googl

    2024年01月20日
    浏览(27)
  • 【Rust 基础篇】Rust 通道(Channel)

    在 Rust 中,通道(Channel)是一种用于在多个线程之间传递数据的并发原语。通道提供了一种安全且高效的方式,允许线程之间进行通信和同步。本篇博客将详细介绍 Rust 中通道的使用方法,包含代码示例和对定义的详细解释。 在 Rust 中,我们可以使用 std::sync::mpsc 模块提供的

    2024年02月15日
    浏览(28)
  • 基于channel的异步事件总线

    通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型将数据从一方移交给另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List 。 主要区别在于,此集合管理同步,并通过工厂创建选项

    2024年02月08日
    浏览(25)
  • System.Threading.Channels 高性能异步队列

    System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它提供一个异步数据集合,可用于生产者和消费者之前的数据异步传递。 它提供如下方法: BoundedChannelOptions Provides options that control the behavior of bounded ChannelT instances. 提供通道的行

    2024年01月24日
    浏览(36)
  • Rust 并行库 crossbeam 的 Channel 示例

    一个不完整的示例: 该例子中,rx 可以被多个线程使用,是线程安全的。这就是所谓的 MPMC 模式。设想 channel 中有 10 个数据,MPMC 模式允许10个线程同时利用 rx 从 channel 中读取数据。 Rust 自带的 channel 是 MPSC 模式的,一次仅允许一个线程从 channel 读取数据。显然 crossbeam 效率

    2024年03月15日
    浏览(24)
  • Django Channels、WS协议及同步与异步详解

    在 Django 中,同步和异步主要涉及到请求处理的方式。这两种方式的主要区别在于它们如何处理多个并发请求: 同步(Synchronous):在同步模式下,Django 会为 每个请求 创建一个 单独 的线程或进程。这意味着,如果一个请求正在等待响应(例如,等待数据库查询返回结果),

    2024年02月07日
    浏览(27)
  • win10系统rust串口通信实现

    命令:cargo new comport 命令:cd ./comport cargo build 命令输入格式:cargo run COM6 921600 \\\"ATn\\\"

    2024年02月11日
    浏览(32)
  • 如何用Python实现一个简单的爬虫?

    作为一名程序员,我深知爬虫技术在现代互联网领域中的重要性。因此,今天我来分享一下如何用Python实现一个简单的爬虫。 简单来说,爬虫就是一种自动化程序,通过网络协议来获取特定网站的信息,例如图片、文字、视频等等。这些信息可以是公开数据,也可以是需要用

    2024年02月07日
    浏览(32)
  • Flutter实现CombineExecutor进行多个异步分组监听,监听第一个异步执行的开始和最后一个异步执行结束时机。

    1.场景 我们在调用接口时,很多时候会同时调用多个接口,接口都是异步执行,我们很难知道调用的多个接口哪个会最后执行完成,我们有时候需要对最后一个接口执行完成的时机监听,所以基于该需求,设计了CombineExecutor,对类似的需求进行监听。 2.代码 group_key.dart execu

    2024年02月09日
    浏览(21)
  • 如何用ReadWriteLock实现一个通用的缓存中心?

    摘要: 在并发场景中,Java SDK中提供了ReadWriteLock来满足读多写少的场景。 本文分享自华为云社区《【高并发】基于ReadWriteLock开了个一款高性能缓存》,作者:冰 河。 在实际工作中,有一种非常普遍的并发场景:那就是读多写少的场景。在这种场景下,为了优化程序的性能

    2024年02月06日
    浏览(7)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包