Java集合之Disruptor 介绍

这篇具有很好参考价值的文章主要介绍了Java集合之Disruptor 介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 Disruptor

1.1 简介

1.1.1 定义

Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke’s Choice Awards(Duke 选择大奖)。

Disruptor 提供的功能类似于 KafkaRocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存),Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题,Disruptor 有个最大的优点就是快

Disruptor被设计用于在生产者消费者producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟
Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到 Disruptor,它可以带来显著的性能提升。其实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在并发、缓冲区、生产者—消费者模型、事务处理这些元素的程序来说,Disruptor 提出了一种大幅提升性能(TPS)的方案。

github 地址

Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

1.1.2 Java中线程安全队列

JDK 中常见的线程安全的队列如下:

队列名字 是否有界
ArrayBlockingQueue 加锁(ReentrantLock) 有界
LinkedBlockingQueue 加锁(ReentrantLock) 有界
LinkedTransferQueue 无锁(CAS) 无界
ConcurrentLinkedQueue 无锁(CAS) 无界

从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。
Disruptor 就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。

1.1.3 Disruptor 核心概念

Disruptor 核心概念:

  • Event:可以把 Event 理解为存放在队列中等待消费的消息对象。
    Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
  • EventFactory:事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。
  • EventHandlerEvent 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。
    Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现
  • EventProcessorEventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)
  • Disruptor:事件的生产和消费需要用到 Disruptor 对象。
  • RingBufferRingBuffer(环形数组)用于保存事件
    如其名,环形的缓冲区。曾经 RingBufferDisruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
  • WaitStrategy:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
  • Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型
  • ProducerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似)。
  • SequencerSequencerDisruptor 的真正核心。此接口有两个实现类 - SingleProducerSequencerMultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
  • Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。
    虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的 CPU 缓存伪共享(Flase Sharing)问题。(注:这是 Disruptor 实现高性能的关键点之一)
  • Sequence Barrier:用于保持对 RingBuffermain published SequenceConsumer 依赖的其它 ConsumerSequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

Java集合之Disruptor 介绍,# Java集合,java,开发语言

1.2 操作

1.2.1 坐标依赖

pom.xml

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Gradle:

implementation 'com.lmax:disruptor:3.4.4'

1.2.2 创建事件

我们先来定义一个代表日志事件的类:LogEvent 。

事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message。

@Data
public class LogEvent {
    private String message;
}

我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message

1.2.3 创建事件工厂

创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。
LogEventFactory 继承 EventFactory 接口并实现了 newInstance() 方法 。

public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

1.2.4 创建处理事件Handler–消费者

创建一个用于处理后续发布的事件的类:LogEventHandler 。
LogEventHandler 继承 EventHandler 接口并实现了 onEvent() 方法 。

public class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(logEvent.getMessage());
    }
}

EventHandler 接口的 onEvent() 方法共有 3 个参数:

  • event:待消费/处理的事件
  • sequence:正在处理的事件在环形数组(RingBuffer)中的位置
  • endOfBatch:表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)

1.2.5 初始化 Disruptor

1.2.5.1 静态类

我们这里定义一个方法用于获取 Disruptor 对象

private static Disruptor<LogEvent> getLogEventDisruptor() {
    // 创建 LogEvent 的工厂
    LogEventFactory logEventFactory = new LogEventFactory();
    // Disruptor 的 RingBuffer 缓存大小
    int bufferSize = 1024 * 1024;
    // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
    //实例化 Disruptor
    return new Disruptor<>(
            logEventFactory,
            bufferSize,
            threadFactory,
            // 单生产者
            ProducerType.SINGLE,
            // 阻塞等待策略
            new BlockingWaitStrategy());
}
1.2.5.2 配置类

使用配置类的方式

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<LogEvent> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };

        //指定事件工厂
        LogEventFactory factory = new LogEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
        	 bufferSize, 
        	 threadFactory,
        	 ProducerType.SINGLE, 
        	 new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        //Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。
       
        disruptor.handleEventsWith(new LogEventHandler ());
      // Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
       //就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。
       //disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
       
        // 启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
1.2.5.3 Disruptor 构造函数讲解

Disruptor 的推荐使用的构造函数如下:

public class Disruptor<T> {
  public Disruptor(
          final EventFactory<T> eventFactory,
          final int ringBufferSize,
          final ThreadFactory threadFactory,
          final ProducerType producerType,
          final WaitStrategy waitStrategy)
  {
      this(
          RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
          new BasicExecutor(threadFactory));
  }

......
}

我们需要传递 5 个参数:

  • eventFactory:我们自定义的事件工厂。
  • ringBufferSize:指定 RingBuffer 的容量大小。
  • threadFactory:自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。
  • producerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。
  • waitStrategy:等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。

ProducerType 的源码如下,它是一个包含两个变量的枚举类型

  • SINGLE:单个事件发布者模式,不需要保证线程安全。
  • MULTI:多个事件发布者模式,基于 CAS 来保证线程安全。

WaitStrategy (等待策略)接口的实现类中只有两个方法:

  • waitFor() :等待新事件的到来。
  • signalAllWhenBlocking():唤醒所有等待的消费者。
public interface WaitStrategy
{
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    void signalAllWhenBlocking();
}

WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。
Java集合之Disruptor 介绍,# Java集合,java,开发语言

除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。

使用这个构造函数创建的 Disruptor 对象会默认使用 ProducerType.MULTI(多个事件发布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。文章来源地址https://www.toymoban.com/news/detail-518864.html

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

1.2.6 发布事件

1.2.6.1 main方法测试
//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
    // 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
    long sequence = ringBuffer.next();
    try {
        LogEvent logEvent = ringBuffer.get(sequence);
        // 初始化 Event,对其赋值
        logEvent.setMessage("这是第%d条日志消息".formatted(i));
    } finally {
        // 发布事件
        ringBuffer.publish(sequence);
    }
}
// 关闭 Disruptor
disruptor.shutdown();
1.2.6.2 使用配置方式
public interface DisruptorMqService {

    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<LogEvent> messageModelRingBuffer;

    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

到了这里,关于Java集合之Disruptor 介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【JAVA语言-第15话】集合框架(二)——List、ArrayList、LinkedList、Vector集合

    目录 List集合 1.1 概述 1.2 特点 1.3 常用方法 1.4 ArrayList集合 1.4.1 概述  1.4.2 练习 1.5 LinkedList集合  1.5.1 概述 1.5.2 特点 1.5.3 常用方法 1.5.4 练习 1.6 Vector类 1.6.1 概述 1.6.2 练习 1.7 List实现类的异同点         java.util.List: List是一个接口,它继承自Collection接口。 常用的实现

    2024年01月25日
    浏览(57)
  • 【Java集合类面试二十六】、介绍一下ArrayList的数据结构?

    文章底部有个人公众号: 热爱技术的小郑 。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享? 踩过的坑没必要让别人在再踩,自己复盘也能加深记忆。利己利人、所谓双赢。 面试官:介绍一下ArrayList的数据结构? 参考答案: ArrayList的底

    2024年02月08日
    浏览(44)
  • Java开发基础系列(十三):集合对象(Set接口)

    😊 @ 作者: 一恍过去 💖 @ 主页: https://blog.csdn.net/zhuocailing3390 🎊 @ 社区: Java技术栈交流 🎉 @ 主题: Java开发基础系列(十三):集合对象(Set接口) ⏱️ @ 创作时间: 2023年07月27日 HashSet: 基于哈希表实现的集合,不保证元素的顺序。 LinkedHashSet: 基于哈希表和双向链表实现的

    2024年02月15日
    浏览(37)
  • 【JAVA学习笔记】 56 - 开发中如何选择集合实现类,以及Collection工具类

    https://github.com/yinhai1114/Java_Learning_Code/blob/main/IDEA_Chapter14/src/com/yinhai/Collections_.java 目录 项目代码 Collections工具类 一、Collections工具类介绍 1.排序操作: (均为static方法) 2.查找、替换 在开发中,选择什么集合实现类,主要取决于业务操作特点,然后根据集合实现类特性进行 选择

    2024年02月06日
    浏览(42)
  • Java语言介绍

    Java 是一种广泛使用的计算机编程语言,由 Sun Microsystems 公司于 1995 年推出。它是一个健壮的、面向对象的、跨平台的语言,被用于开发各种应用程序和系统,包括 Web 应用程序、移动应用程序、桌面应用程序、游戏以及企业级系统等。 Java 具有许多优点,包括可移植性、安全

    2024年02月01日
    浏览(31)
  • 【Py/Java/C++三种语言详解】LeetCode每日一题240117【哈希集合】LeetCode2744、最大字符串匹配数目

    LeetCode2744、最大字符串匹配数目 给你一个下标从 0 开始的数组 words ,数组中包含 互不相同 的字符串。 如果字符串 words[i] 与字符串 words[j] 满足以下条件,我们称它们可以匹配: 字符串 words[i] 等于 words[j] 的反转字符串。 0 = i j words.length 请你返回数组 words 中的 最大 匹配数

    2024年01月18日
    浏览(56)
  • java 开发英文自我介绍

       My name is XXX, I am 21 years old, I graduated from the xx PLA information engineering university computer science and technology major, have a solid core Java foundation, good programming style; Familiar with JSP + servlet + javabean pattern web development; Familiar with the open source framework, struts, hibernate, spring, etc. Familiar with tomcat,

    2024年02月13日
    浏览(59)
  • Java开发框架:Spring介绍

    Spring 是 Java EE 编程领域中的一个轻量级开源框架,由 Rod Johnson 在 2002 年最早提出并随后创建,目的是解决企业级编程开发中的复杂性,实现敏捷开发的应用型框架 。其中,轻量级表现在 Spring 是非侵入式的,即开发应用中的对象可以不依赖于 Spring 的 API 类。另外,Spring 针对

    2024年02月08日
    浏览(55)
  • 开发语言漫谈-Java

            由于C++过于复杂,Java诞生了。与C++相比,Java更易于学习和使用,它去掉C++中的指针和解决了内存管理问题。Java提供了垃圾自动回收机制,自动管理不再使用的内存。Python又进一步简化,使得语法更简洁,更易于阅读和编写。当然随着简化,性能就越来越低。    

    2024年04月10日
    浏览(49)
  • Java 的简要介绍及开发环境的搭建(超级详细)

    图片来源于互联网 目录 | CONTENT Java 简介 一、什么是 Java 二、认识 Java 版本 三、选择哪个版本比较好 搭建 Java 开发环境 一、下载 Java 软件开发工具包 JDK  二、配置环境变量 自动配置 手动配置 三、下载合适的 IDE IntelliJ IDEA Visual Studio Code Eclipse NetBeans​​​​​​ JDevelope

    2024年02月05日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包