BlockingQueue实现简易消息队列处理器 可分区顺序消费
这篇具有很好参考价值的文章主要介绍了BlockingQueue实现简易消息队列处理器 可分区顺序消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。
大家好,最近在巩固JUC并发包,突然想到如果自己的应用体量不大,但有需要消息队列来实现应用解耦和削峰来缓解服务器突增压力,比如抢票时,突然有比较用户同时抢票,就容易造成服务器同时连接数较多,拒绝其他用户的使用,就想着可以用消息队列来缓解,但是体量有不大,还没必要用MQ框架,那就直接自己写一个,这样,抢票请求来了就直接丢给队列处理器,然后再延迟查询处理结果,这样能减轻不少压力,老样子,先看下实现效果
:
然后看下测试代码:
public class TestOptional {
@Test
public void doTestOptional(){
MxMQ<Message> mxMQ = MxMQ.getInstance();
/**
* 添加分区 无消息一直阻塞
*/
mxMQ.addPartion("test", new MQHandler<Message>() {
@Override
public void hand(Message message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(message.getMessage());
}
});
/**
* 添加分区 无消息且等待时长超过20秒自动移除该分区
*/
mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
@Override
public void hand(Message message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(message.getMessage());
}
});
for(int index = 0;index < 20;index++){
int finalIndex = index;
Message message = new Message("test_" + finalIndex);
Message message2 = new Message("test2_" + finalIndex);
try {
mxMQ.sendMessage("test",message);
mxMQ.sendMessage("test2",message2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
while (true){}
}
}
还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类:
MxMQRunnable:
package com.mx.mxmq;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MxMQRunnable<T> implements Runnable{
boolean isRun = false;
ArrayBlockingQueue<T> arrayBlockingQueue = null;
MQHandler<T> mqHandler = null;
int state = 0;
MxMQ.QueueEmpty queueEmpty = null;
public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
this.queueEmpty = queueEmpty;
}
public MxMQRunnable(MQHandler<T> mqHandler){
isRun = true;
arrayBlockingQueue = new ArrayBlockingQueue(50);
this.mqHandler = mqHandler;
state = MxMQ.STATE_WAIT;
}
public MxMQRunnable(int number,MQHandler<T> mqHandler){
arrayBlockingQueue = new ArrayBlockingQueue(number);
this.mqHandler = mqHandler;
state = MxMQ.STATE_WAIT;
}
public void setState(int state) {
this.state = state;
}
@Override
public void run() {
while (isRun){
try {
T t = null;
if(state == MxMQ.STATE_WAIT){
t = arrayBlockingQueue.take();
} else {
t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
if(t == null){
close();
queueEmpty.empty(this);
break;
}
}
if(mqHandler != null){
mqHandler.hand(t);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean sendMessage(T t) throws InterruptedException {
return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
}
public boolean removeMessage(T t){
return arrayBlockingQueue.remove(t);
}
public void close(){
isRun = false;
}
}
MxMQ:
package com.mx.mxmq;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MxMQ<T> {
public static final int STATE_WAIT = 0;
public static final int STATE_REMOVE = 1;
private MxMQ(){
executors = Executors.newCachedThreadPool();
partionRunMap = new ConcurrentHashMap<>();
}
public static MxMQ getInstance() {
if(instance == null){
synchronized (MxMQ.class){
if(instance == null){
instance = new MxMQ();
}
}
}
return instance;
}
private static volatile MxMQ instance = null;
private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;
private ExecutorService executors = null;
/**
* 添加分区
* @param partion 分区
* @param mxHandler 处理器
* @return
*/
public boolean addPartion(String partion,MQHandler<T> mxHandler){
if(partionRunMap.get(partion) == null){
MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
partionRunMap.put(partion,curMxMQRunnable);
executors.execute(curMxMQRunnable);
System.out.println(partion+"被添加");
return true;
}
return false;
}
/**
* 当分区里面没有任务超过20秒后就会自动移除分区
* @param partion 分区
* @param mxHandler 处理器
* @return
*/
public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
if(partionRunMap.get(partion) == null){
MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
curMxMQRunnable.setState(STATE_REMOVE);
curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
@Override
public void empty(MxMQRunnable mxMQRunnable) {
removePartion(partion);
}
});
partionRunMap.put(partion,curMxMQRunnable);
executors.execute(curMxMQRunnable);
System.out.println(partion+"被添加");
return true;
}
return false;
}
public boolean removePartion(String partion){
if(partionRunMap.get(partion) != null){
MxMQRunnable<T> remove = partionRunMap.remove(partion);
remove.close();
System.out.println(partion+"被移除");
return true;
}
return false;
}
public boolean sendMessage(String partion,T t) throws InterruptedException {
MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
if(tMxMQRunnable != null){
tMxMQRunnable.sendMessage(t);
return true;
}
return false;
}
public boolean removeMessage(String partion,T t){
MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
if(tMxMQRunnable != null){
return tMxMQRunnable.removeMessage(t);
}
return false;
}
interface QueueEmpty{
void empty(MxMQRunnable mxMQRunnable);
}
}
MQHandler:
package com.mx.mxmq;
public interface MQHandler<T> {
void hand(T t);
}
Message:文章来源:https://www.toymoban.com/news/detail-743357.html
package com.mx.mxmq;
public class Message {
String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Message(String message){
this.message = message;
}
}
好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景文章来源地址https://www.toymoban.com/news/detail-743357.html
到了这里,关于BlockingQueue实现简易消息队列处理器 可分区顺序消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!