BlockingQueue实现简易消息队列处理器 可分区顺序消费

这篇具有很好参考价值的文章主要介绍了BlockingQueue实现简易消息队列处理器 可分区顺序消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,最近在巩固JUC并发包,突然想到如果自己的应用体量不大,但有需要消息队列来实现应用解耦和削峰来缓解服务器突增压力,比如抢票时,突然有比较用户同时抢票,就容易造成服务器同时连接数较多,拒绝其他用户的使用,就想着可以用消息队列来缓解,但是体量有不大,还没必要用MQ框架,那就直接自己写一个,这样,抢票请求来了就直接丢给队列处理器,然后再延迟查询处理结果,这样能减轻不少压力,老样子,先看下实现效果


BlockingQueue实现简易消息队列处理器 可分区顺序消费,java,架构方案,消息队列
然后看下测试代码:

public class TestOptional {
    @Test
    public void doTestOptional(){

        MxMQ<Message> mxMQ = MxMQ.getInstance();

        /**
         * 添加分区 无消息一直阻塞
         */
        mxMQ.addPartion("test", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });
        /**
         * 添加分区 无消息且等待时长超过20秒自动移除该分区
         */
        mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });

        for(int index = 0;index < 20;index++){
            int finalIndex = index;
            Message message = new Message("test_" + finalIndex);
            Message message2 = new Message("test2_" + finalIndex);
            try {
                mxMQ.sendMessage("test",message);
                mxMQ.sendMessage("test2",message2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        while (true){}

    }
}

还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类:
MxMQRunnable:

package com.mx.mxmq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MxMQRunnable<T> implements Runnable{

    boolean isRun = false;
    ArrayBlockingQueue<T> arrayBlockingQueue = null;
    MQHandler<T> mqHandler = null;
    int state = 0;

    MxMQ.QueueEmpty queueEmpty = null;

    public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
        this.queueEmpty = queueEmpty;
    }

    public MxMQRunnable(MQHandler<T> mqHandler){
        isRun = true;
        arrayBlockingQueue = new ArrayBlockingQueue(50);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public MxMQRunnable(int number,MQHandler<T> mqHandler){
        arrayBlockingQueue = new ArrayBlockingQueue(number);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public void setState(int state) {
        this.state = state;
    }

    @Override
    public void run() {
        while (isRun){
            try {
                T t = null;
                if(state == MxMQ.STATE_WAIT){
                   t = arrayBlockingQueue.take();
                } else {
                   t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
                   if(t == null){
                       close();
                       queueEmpty.empty(this);
                       break;
                   }
                }
                if(mqHandler != null){
                    mqHandler.hand(t);
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }
        }
    }

    public boolean sendMessage(T t) throws InterruptedException {
        return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
    }

    public boolean removeMessage(T t){
        return arrayBlockingQueue.remove(t);
    }

    public void close(){
        isRun = false;
    }

}


MxMQ:

package com.mx.mxmq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MxMQ<T> {

    public static final int STATE_WAIT = 0;
    public static final int STATE_REMOVE = 1;

    private MxMQ(){
        executors = Executors.newCachedThreadPool();
        partionRunMap = new ConcurrentHashMap<>();
    }

    public static MxMQ getInstance() {
        if(instance == null){
            synchronized (MxMQ.class){
                if(instance == null){
                    instance = new MxMQ();
                }
            }
        }
        return instance;
    }

    private static volatile MxMQ instance = null;

    private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;

    private ExecutorService executors =  null;

    /**
     * 添加分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartion(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    /**
     * 当分区里面没有任务超过20秒后就会自动移除分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            curMxMQRunnable.setState(STATE_REMOVE);
            curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
                @Override
                public void empty(MxMQRunnable mxMQRunnable) {
                    removePartion(partion);
                }
            });
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    public boolean removePartion(String partion){
        if(partionRunMap.get(partion) != null){
            MxMQRunnable<T> remove = partionRunMap.remove(partion);
            remove.close();
            System.out.println(partion+"被移除");
            return true;
        }
        return false;
    }

    public boolean sendMessage(String partion,T t) throws InterruptedException {
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            tMxMQRunnable.sendMessage(t);
            return true;
        }
        return false;
    }

    public boolean removeMessage(String partion,T t){
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            return tMxMQRunnable.removeMessage(t);
        }
        return false;
    }

    interface QueueEmpty{
        void empty(MxMQRunnable mxMQRunnable);
    }

}


MQHandler:

package com.mx.mxmq;

public interface MQHandler<T> {
    void hand(T t);
}

Message:

package com.mx.mxmq;

public class Message {
    String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Message(String message){
        this.message = message;
    }
}


好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景文章来源地址https://www.toymoban.com/news/detail-743357.html

到了这里,关于BlockingQueue实现简易消息队列处理器 可分区顺序消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • protues仿真微处理器8086实现交通灯

    protues仿真微处理器8086实现交通灯

    (1)下载安装仿真环境protues。 (2)搭建8086开发环境,我使用的是 emu8086 。自行下载安装即可。 有需要相关安装包可以私信 (1)选用74LS373与74LS245来实现8086地址数据总线的拆分。 (2)选用 8259可编程中断控制器 用于管理8086系列微机系统的外部中断请求,实现优先权的排

    2024年02月06日
    浏览(10)
  • RISC-V处理器的设计与实现(一)—— 基本指令集

    RISC-V处理器的设计与实现(一)—— 基本指令集

    RISC-V处理器的设计与实现(一)—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(二)—— CPU框架设计_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(三)—— 上板验证_Patarw_Li的博客-CSDN博客 RISC-V处理器设计(四)—— Verilog 代码设计-CSDN博客  RISC-V处

    2024年02月05日
    浏览(9)
  • 上海山景SH-ARC DSP音频处理器低通滤波算法实现

      +hezkz17进入数字音频答疑 上海山景DSP音频处理器介绍:  上海山景DSP音频处理器是一种数字信号处理器,专门用于音频信号的处理和增强。它采用先进的数字信号处理技术和算法,能够对音频信号进行实时处理,并且具有高效、稳定、可靠等特点。 该处理器可以应用于各种

    2024年02月13日
    浏览(8)
  • RISC-V处理器的设计与实现(三)—— 上板验证(基于野火征途Pro开发板)

    RISC-V处理器的设计与实现(三)—— 上板验证(基于野火征途Pro开发板)

    目录 文章传送门 一、添加串口 二、上板验证 三、总结与思考 RISC-V处理器的设计与实现(一)—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(二)—— CPU框架设计_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(三)—— 上板验证_Patarw_Li的博客-CSDN博客

    2024年02月11日
    浏览(8)
  • Spring MVC异常处理【单个控制异常处理器、全局异常处理器、自定义异常处理器】

    Spring MVC异常处理【单个控制异常处理器、全局异常处理器、自定义异常处理器】

    目录 一、单个控制器异常处理 1.1 控制器方法 1.2 编写出错页面 1.3 测试结果 二、全局异常处理 2.1 一个有异常的控制器类 2.2 全局异常处理器类 2.3 测试结果  三、自定义异常处理器 3.1 自定义异常处理器 3.2 测试结果 往期专栏文章相关导读  1. Maven系列专栏文章 2. Mybatis系列

    2024年02月16日
    浏览(10)
  • Jmeter前置处理器和后置处理器

    Jmeter前置处理器和后置处理器

    1. 后置处理器(Post Processor) 本质上是⼀种对sampler发出请求后接受到的响应数据进⾏处理 (后处理)的⽅法  正则表达式后置处理器 (1)引⽤名称:下⼀个请求要引⽤的参数名称,如填写title,则可⽤${title}引⽤它 (2)正则表达式: ():括起来的部分就是要提取的。 .:匹配

    2023年04月21日
    浏览(9)
  • DP读书:鲲鹏处理器 架构与编程(八)3.1鲲鹏处理器片上系统与Taishan处理器内核架构

    DP读书:鲲鹏处理器 架构与编程(八)3.1鲲鹏处理器片上系统与Taishan处理器内核架构

    处理器体系结构,是一个偏底层的内容,但这是任一计算机系统的底层。 系统的性能、生态和功能很大程度上都依赖于计算机系统底层——处理器体系结构。任何一个系统程序员、固件设计者、应用程序员 甚至 服务器管理员,如果想要充分利用现代高性能处理器的硬件性能

    2024年02月12日
    浏览(10)
  • SkyEye处理器仿真系列:龙芯2K1000处理器

    SkyEye处理器仿真系列:龙芯2K1000处理器

    天目全数字实时仿真软件SkyEye作为基于可视化建模的硬件行为级仿真平台,能够为嵌入式软件提供虚拟化运行环境,开发、测试人员可在该虚拟运行环境上进行软件开发、软件测试和软件验证活动。小到芯片,大到系统,SkyEye均可进行模拟。 1936年,被誉为“计算机科学与人

    2024年02月12日
    浏览(9)
  • DP读书:鲲鹏处理器 架构与编程(九)鲲鹏920处理器片上系统

    停更了两天,我做了一个本专业相关的孤岛问题的论文复现,可并没有什么太大进展,就像当初最开始跑Aspen一样,我要面对的是一个相当复杂的多参系统,这种情况下只能啃着技术文档一步一步的去调。 再次返回我的鲲鹏920处理器,无疑是舒服的所以我只能尽我所能的在做

    2024年02月12日
    浏览(15)
  • 第三十二章 开发Productions - ObjectScript Productions - 定义警报处理器 - 使用路由警报处理器

    如果需要通过多种输出机制联系用户,警报处理器应该是一个业务流程,用于确定如何在消息中路由 Ens.AlertReques 。在这种情况下, Productions 必须为每个输出机制包含一个额外的业务操作,并且警报处理器将消息转发到这些业务操作。 要将警报处理器定义为路由流程,请创建

    2024年02月08日
    浏览(9)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包