Rust 并行库 crossbeam 的 Channel 示例

这篇具有很好参考价值的文章主要介绍了Rust 并行库 crossbeam 的 Channel 示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

示例1

一个不完整的示例:

let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];

for _ in 0..number {
    let rx = rx.clone();
    let handle = thread::spawn(move || {
        while let Some(task) = rx.recv() {
            task.call_box();
        }
    });

    handlers.push(handle);
}

该例子中,rx 可以被多个线程使用,是线程安全的。这就是所谓的 MPMC 模式。设想 channel 中有 10 个数据,MPMC 模式允许10个线程同时利用 rx 从 channel 中读取数据。

Rust 自带的 channel 是 MPSC 模式的,一次仅允许一个线程从 channel 读取数据。显然 crossbeam 效率更高。

示例2

use crossbeam::channel;
use crossbeam::thread;
use std::thread::sleep;
use std::time::Duration;

// 定义Task结构体
struct Task {
    data: usize, // 假设任务包含一个数据字段
    call_box: Box<dyn FnMut()>, // 假设任务包含一个可调用对象的装箱指针
}

impl Task {
    fn new(data: usize, call_box: impl FnMut() + 'static) -> Self {
        Task {
            data,
            call_box: Box::new(call_box),
        }
    }

    // 实现call_box方法
    fn call_box(&mut self) {
        (self.call_box)();
    }
}

fn main() {
    const NUMBER_OF_WORKERS: usize = 4; // 假设有4个工作线程
    let (tx, rx) = channel::unbounded::<Task>();
    let mut handlers = vec![];

    // 启动工作线程
    for _ in 0..NUMBER_OF_WORKERS {
        let rx = rx.clone();
        let handle = thread::spawn(move || {
            while let Some(task) = rx.recv() {
                task.call_box(); // 执行任务
            }
        });
        handlers.push(handle);
    }

    // 发送任务到通道
    for i in 0..10 { // 假设发送10个任务
        let task = Task::new(i, || {
            println!("Executing task with data: {}", i);
            sleep(Duration::from_secs(1)); // 模拟耗时操作
            println!("Finished task with data: {}", i);
        });
        tx.send(task).unwrap();
    }

    // 关闭发送通道
    drop(tx);

    // 等待所有工作线程完成
    for handle in handlers {
        handle.join().unwrap();
    }

    println!("All tasks are processed.");
}

在这个程序中,我们定义了一个Task结构体,它包含一个data字段和一个call_box字段,后者是一个装箱的可调用对象。我们实现了call_box方法,它调用这个装箱的可调用对象。

main函数中,我们创建了一个无界通道,用于在工作线程和主线程之间传递Task实例。我们启动了NUMBER_OF_WORKERS个工作线程,它们不断地从通道接收Task实例并调用call_box方法执行它们。

然后,主线程创建了一些Task实例,并通过通道发送它们给工作线程。一旦所有任务都被发送,主线程通过drop(tx)关闭了发送通道,这样工作线程在尝试接收任务时,如果没有更多任务可用,将会得到一个None,从而退出循环。

最后,主线程等待所有工作线程完成,并打印出消息表示所有任务都已经处理完毕。

请注意,为了简化示例,我使用了Box<dyn FnMut()>来允许Task存储任何可调用对象的装箱指针。这意味着任务中的可调用对象必须能够单独编译成一个独立的、无状态的函数,这样才能安全地在多个线程之间共享。在实际应用中,你可能需要根据你的具体需求调整Task结构体的设计和使用方式。文章来源地址https://www.toymoban.com/news/detail-840029.html

到了这里,关于Rust 并行库 crossbeam 的 Channel 示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rust学习-线程

    Rust 标准库只提供了 1:1 线程模型 Rust 是较为底层的语言,如果愿意牺牲性能来换取抽象,以获得对线程运行更精细的控制及更低的上下文切换成本,使用实现了 M:N 线程模型的 crate 在参数列表前使用 move 强制闭包获取其使用的环境值的所有权。 这个技巧在创建新线程

    2024年02月16日
    浏览(24)
  • Rust多线程编程

    rust标准库中提供了线程相关支持,直接引用即可使用: 使用spawn方法创建一个线程。如: 创建后线程自动运行: 默认的spawn方法传递不了参数。如果要为线程传递参数,需要使用匿名函数作为spawn的参数,匿名函数也称为闭包。 闭包(匿名函数) 闭包的基本语法: 使用闭包

    2024年02月11日
    浏览(25)
  • Rust语言之多线程

    多线程 是一种并发执行的技术,它允许一个程序或进程同时执行多个线程。每个线程都是程序执行的一个独立路径,它们可以并行运行,共享进程的资源(如内存空间),但每个线程有自己的指令指针、堆栈和局部变量。多线程的主要目的是提高程序的执行效率,通过同时执

    2024年02月22日
    浏览(27)
  • [Rust GUI]eframe(egui框架)代码示例

    你可以使用egui的其他绑定,例如:egui-miniquad,bevy_egui,egui_sdl2_gl 等。 egui库相当于核心库,需要借助eframe框架写界面。 eframe使用egui_glow渲染,而egui_glow需要opengl2.0+。 1、访问微软官网下载生成工具 2、勾选这个 3、对比勾选细节 4、点击安装 5、安装完成 6、关闭 Visual Studio

    2024年02月08日
    浏览(35)
  • 【Rabbitmq】报错:ERROR CachingConnectionFactory Channel shutdown: channel error;

    报错内容 ERROR CachingConnectionFactory Channel shutdown: channel error; protocol method: #methodchannel.close(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80) 原因   默认是自动ack,然后你代码又手动ack,导致ack了两次,报了错。 解决办法  配置文件设置为手动ack

    2024年02月16日
    浏览(34)
  • 创建应用通道失败: create channel failed: create channel failed:

    创建应用通道失败: create channel failed: create channel failed: SendEnvelope failed: calling orderer ‘localhost:7050’ failed: Orderer Client Status Code: (2) CONNECTION_FAILED. Description: dialing connection on target [localhost:7050]: connection is in TRANSIENT_FAILURE 这个错误通常是因为客户端无法连接到Orderer节点导致的。一

    2024年02月03日
    浏览(42)
  • 面试某大厂,被Channel给吊打了,这次一次性通关channel!

    目录 一 前言 面试题 然后我们进行一下扩展,玩转Channel! 二 解决面试题 1. 介绍一下Channel 2. Channel在go中起什么作用 3. Channel为什么需要两个队列实现 4. Go为什么要开发Channel,而别的语言为什么没有 5. Channel底层是使用锁控制并发的,为什么不直接使用锁 三 扩展面试题 1.

    2023年04月13日
    浏览(24)
  • rust写一个多线程和协程的例子

    当涉及到多线程和协程时,Rust提供了一些非常强大的工具,其中最常用的库之一是 tokio ,它用于异步编程和协程。下面我将为你展示一个简单的Rust程序,演示如何使用多线程和协程。 首先,你需要在你的项目的 Cargo.toml 文件中添加 tokio 库的依赖: [dependencies] tokio = { versi

    2024年02月11日
    浏览(38)
  • Rust 实现线程安全的 Lock Free 计数器

    完整代码:https://github.com/chiehw/hello_rust/blob/main/crates/counter/src/lib.rs Trait 可以看作是一种 能力的抽象 ,和接口有点类似。Trait 还能作为 泛型约束条件 ,作为参数的限制条件。 使用测试驱动开发可以让目标更明确,这里先写个简单的测试案例。 直接封装 AtomicUsize 使用多线程

    2024年04月13日
    浏览(23)
  • C#基础--线程Thread和线程池ThreadPool

    1. 开启一个线程 ThreadStart 是属于System.Threading 中的一个内置委托 ParameterizedThreadStart 是属于System.Threading 中的一个内置委托 2. 暂停/恢复线程 无法实时的去 “暂停 ” 或者 “恢复” 线程,因为线程是由系统调度执行的,而且中间存在一个延时现象,不可能直接帮你执行 3. 终结

    2024年02月16日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包