BlockingQueue实现简易消息队列处理器 可分区顺序消费

这篇具有很好参考价值的文章主要介绍了BlockingQueue实现简易消息队列处理器 可分区顺序消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,最近在巩固JUC并发包,突然想到如果自己的应用体量不大,但有需要消息队列来实现应用解耦和削峰来缓解服务器突增压力,比如抢票时,突然有比较用户同时抢票,就容易造成服务器同时连接数较多,拒绝其他用户的使用,就想着可以用消息队列来缓解,但是体量有不大,还没必要用MQ框架,那就直接自己写一个,这样,抢票请求来了就直接丢给队列处理器,然后再延迟查询处理结果,这样能减轻不少压力,老样子,先看下实现效果


BlockingQueue实现简易消息队列处理器 可分区顺序消费,java,架构方案,消息队列
然后看下测试代码:

public class TestOptional {
    @Test
    public void doTestOptional(){

        MxMQ<Message> mxMQ = MxMQ.getInstance();

        /**
         * 添加分区 无消息一直阻塞
         */
        mxMQ.addPartion("test", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });
        /**
         * 添加分区 无消息且等待时长超过20秒自动移除该分区
         */
        mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });

        for(int index = 0;index < 20;index++){
            int finalIndex = index;
            Message message = new Message("test_" + finalIndex);
            Message message2 = new Message("test2_" + finalIndex);
            try {
                mxMQ.sendMessage("test",message);
                mxMQ.sendMessage("test2",message2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        while (true){}

    }
}

还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类:
MxMQRunnable:

package com.mx.mxmq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MxMQRunnable<T> implements Runnable{

    boolean isRun = false;
    ArrayBlockingQueue<T> arrayBlockingQueue = null;
    MQHandler<T> mqHandler = null;
    int state = 0;

    MxMQ.QueueEmpty queueEmpty = null;

    public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
        this.queueEmpty = queueEmpty;
    }

    public MxMQRunnable(MQHandler<T> mqHandler){
        isRun = true;
        arrayBlockingQueue = new ArrayBlockingQueue(50);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public MxMQRunnable(int number,MQHandler<T> mqHandler){
        arrayBlockingQueue = new ArrayBlockingQueue(number);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public void setState(int state) {
        this.state = state;
    }

    @Override
    public void run() {
        while (isRun){
            try {
                T t = null;
                if(state == MxMQ.STATE_WAIT){
                   t = arrayBlockingQueue.take();
                } else {
                   t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
                   if(t == null){
                       close();
                       queueEmpty.empty(this);
                       break;
                   }
                }
                if(mqHandler != null){
                    mqHandler.hand(t);
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }
        }
    }

    public boolean sendMessage(T t) throws InterruptedException {
        return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
    }

    public boolean removeMessage(T t){
        return arrayBlockingQueue.remove(t);
    }

    public void close(){
        isRun = false;
    }

}


MxMQ:

package com.mx.mxmq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MxMQ<T> {

    public static final int STATE_WAIT = 0;
    public static final int STATE_REMOVE = 1;

    private MxMQ(){
        executors = Executors.newCachedThreadPool();
        partionRunMap = new ConcurrentHashMap<>();
    }

    public static MxMQ getInstance() {
        if(instance == null){
            synchronized (MxMQ.class){
                if(instance == null){
                    instance = new MxMQ();
                }
            }
        }
        return instance;
    }

    private static volatile MxMQ instance = null;

    private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;

    private ExecutorService executors =  null;

    /**
     * 添加分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartion(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    /**
     * 当分区里面没有任务超过20秒后就会自动移除分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            curMxMQRunnable.setState(STATE_REMOVE);
            curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
                @Override
                public void empty(MxMQRunnable mxMQRunnable) {
                    removePartion(partion);
                }
            });
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    public boolean removePartion(String partion){
        if(partionRunMap.get(partion) != null){
            MxMQRunnable<T> remove = partionRunMap.remove(partion);
            remove.close();
            System.out.println(partion+"被移除");
            return true;
        }
        return false;
    }

    public boolean sendMessage(String partion,T t) throws InterruptedException {
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            tMxMQRunnable.sendMessage(t);
            return true;
        }
        return false;
    }

    public boolean removeMessage(String partion,T t){
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            return tMxMQRunnable.removeMessage(t);
        }
        return false;
    }

    interface QueueEmpty{
        void empty(MxMQRunnable mxMQRunnable);
    }

}


MQHandler:

package com.mx.mxmq;

public interface MQHandler<T> {
    void hand(T t);
}

Message:

package com.mx.mxmq;

public class Message {
    String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Message(String message){
        this.message = message;
    }
}


好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景文章来源地址https://www.toymoban.com/news/detail-743357.html

到了这里,关于BlockingQueue实现简易消息队列处理器 可分区顺序消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • protues仿真微处理器8086实现交通灯

    (1)下载安装仿真环境protues。 (2)搭建8086开发环境,我使用的是 emu8086 。自行下载安装即可。 有需要相关安装包可以私信 (1)选用74LS373与74LS245来实现8086地址数据总线的拆分。 (2)选用 8259可编程中断控制器 用于管理8086系列微机系统的外部中断请求,实现优先权的排

    2024年02月06日
    浏览(45)
  • RISC-V处理器的设计与实现(一)—— 基本指令集

    RISC-V处理器的设计与实现(一)—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(二)—— CPU框架设计_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(三)—— 上板验证_Patarw_Li的博客-CSDN博客 RISC-V处理器设计(四)—— Verilog 代码设计-CSDN博客  RISC-V处

    2024年02月05日
    浏览(47)
  • 上海山景SH-ARC DSP音频处理器低通滤波算法实现

      +hezkz17进入数字音频答疑 上海山景DSP音频处理器介绍:  上海山景DSP音频处理器是一种数字信号处理器,专门用于音频信号的处理和增强。它采用先进的数字信号处理技术和算法,能够对音频信号进行实时处理,并且具有高效、稳定、可靠等特点。 该处理器可以应用于各种

    2024年02月13日
    浏览(32)
  • RISC-V处理器的设计与实现(三)—— 上板验证(基于野火征途Pro开发板)

    目录 文章传送门 一、添加串口 二、上板验证 三、总结与思考 RISC-V处理器的设计与实现(一)—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(二)—— CPU框架设计_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现(三)—— 上板验证_Patarw_Li的博客-CSDN博客

    2024年02月11日
    浏览(56)
  • Spring MVC异常处理【单个控制异常处理器、全局异常处理器、自定义异常处理器】

    目录 一、单个控制器异常处理 1.1 控制器方法 1.2 编写出错页面 1.3 测试结果 二、全局异常处理 2.1 一个有异常的控制器类 2.2 全局异常处理器类 2.3 测试结果  三、自定义异常处理器 3.1 自定义异常处理器 3.2 测试结果 往期专栏文章相关导读  1. Maven系列专栏文章 2. Mybatis系列

    2024年02月16日
    浏览(45)
  • Jmeter前置处理器和后置处理器

    1. 后置处理器(Post Processor) 本质上是⼀种对sampler发出请求后接受到的响应数据进⾏处理 (后处理)的⽅法  正则表达式后置处理器 (1)引⽤名称:下⼀个请求要引⽤的参数名称,如填写title,则可⽤${title}引⽤它 (2)正则表达式: ():括起来的部分就是要提取的。 .:匹配

    2023年04月21日
    浏览(46)
  • DP读书:鲲鹏处理器 架构与编程(八)3.1鲲鹏处理器片上系统与Taishan处理器内核架构

    处理器体系结构,是一个偏底层的内容,但这是任一计算机系统的底层。 系统的性能、生态和功能很大程度上都依赖于计算机系统底层——处理器体系结构。任何一个系统程序员、固件设计者、应用程序员 甚至 服务器管理员,如果想要充分利用现代高性能处理器的硬件性能

    2024年02月12日
    浏览(55)
  • SkyEye处理器仿真系列:龙芯2K1000处理器

    天目全数字实时仿真软件SkyEye作为基于可视化建模的硬件行为级仿真平台,能够为嵌入式软件提供虚拟化运行环境,开发、测试人员可在该虚拟运行环境上进行软件开发、软件测试和软件验证活动。小到芯片,大到系统,SkyEye均可进行模拟。 1936年,被誉为“计算机科学与人

    2024年02月12日
    浏览(60)
  • DP读书:鲲鹏处理器 架构与编程(九)鲲鹏920处理器片上系统

    停更了两天,我做了一个本专业相关的孤岛问题的论文复现,可并没有什么太大进展,就像当初最开始跑Aspen一样,我要面对的是一个相当复杂的多参系统,这种情况下只能啃着技术文档一步一步的去调。 再次返回我的鲲鹏920处理器,无疑是舒服的所以我只能尽我所能的在做

    2024年02月12日
    浏览(55)
  • 第三十二章 开发Productions - ObjectScript Productions - 定义警报处理器 - 使用路由警报处理器

    如果需要通过多种输出机制联系用户,警报处理器应该是一个业务流程,用于确定如何在消息中路由 Ens.AlertReques 。在这种情况下, Productions 必须为每个输出机制包含一个额外的业务操作,并且警报处理器将消息转发到这些业务操作。 要将警报处理器定义为路由流程,请创建

    2024年02月08日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包