通过延迟队列和线程池消费异步任务Demo

这篇具有很好参考价值的文章主要介绍了通过延迟队列和线程池消费异步任务Demo。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

仅记录学习笔记,如有错误欢迎指正。

最近项目需要把异步的获取其他系统传过来的文件list,然后下载到本地服务器,并保存一些信息,需要用异步去下载文件,多线程去然后处理任务。

DelayQueUtil

就是消息的producer,不过封装为通用的util了

 import delay.dto.DelayMessage;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
 * 类似与生产者的功能
 */

public class DelayQueUtil {
    private static final Map<String, Consumer<?>> CONSUMER_MAP = new ConcurrentHashMap<>();

    private static final AtomicBoolean STARTING = new AtomicBoolean();

    /**
     * 延迟队列
     */
    private static final DelayQueue<DelayMessage<?>> DELAY_QUEUE = new DelayQueue<>();


    private static final int CORE_POOL_SIZE = 4;
    private static final int MAXIMUM_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 20;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;
    private static final int MAXIMUM_ARRAY_SIZE = 10;
//    private static final ThreadFactory NAMED_FACTORY = new ThreadFactoryBuilder().setNameFormat("java_delay_thread_%d").build();

    /**
     * 执行读取任务的线程池
     */
    private static final ExecutorService THREAD_POOL = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAXIMUM_POOL_SIZE,
            KEEP_ALIVE_TIME,
            UNIT,
            new ArrayBlockingQueue<>(MAXIMUM_ARRAY_SIZE)
//            NAMED_FACTORY
    );


    /**
     * 提交一个延迟消息
     * @param uuid 消息的uuid
     * @param msg 消息对象
     * @param consumer 延迟到期后回到方法
     * @param delayTime 延迟时间,毫秒
     * @param <T> 消息对象类型
     * @return true: 提交成功
     */
    public static <T> boolean submit(String uuid, T msg, Consumer<T> consumer, long delayTime) {
        DelayMessage<T> delayMessage = new DelayMessage<>(uuid, msg, delayTime);
        addTask(uuid, consumer);

        return DELAY_QUEUE.offer(delayMessage);
    }

    /**
     * 取消一个延迟消息
     * @param uuid 消息的uuid
     * @return true: 取消成功
     */
    public static boolean cancel(String uuid) {
        return CONSUMER_MAP.remove(uuid) != null;
    }

    /**
     * 添加任务,懒加载开启消费线程
     * @param uuid 消息的uuid
     * @param consumer 回调方法
     * @param <T> 消息对象类型
     */
    private static <T> void addTask(String uuid, Consumer<T> consumer) {
        CONSUMER_MAP.put(uuid, consumer);

        // STARTING 是false,则开启监听队列的线程
        if (!STARTING.compareAndSet(false, true)) {
            return;
        }
        THREAD_POOL.execute(() -> {
            while (STARTING.get()) {
                try {
                    DelayMessage<T> delayMessage = (DelayMessage<T>) DELAY_QUEUE.take();
                    // 只有当map里面有该uuid对应的消息,才执行回调方法
                    if (CONSUMER_MAP.containsKey(delayMessage.getUuid())) {
                        // 执行回调方法
                        execCall(consumer, delayMessage);
                    }
                } catch (InterruptedException e) {
                    STARTING.set(false);
                }
            }
        });
    }

    private static <T> void execCall(Consumer<T> consumer, DelayMessage<T> delayMessage) {
        CONSUMER_MAP.remove(delayMessage.getUuid());
        THREAD_POOL.execute(() -> consumer.accept(delayMessage.getBody()));
    }
}

DelayMessage

统一消息消费体;用来定义消息的执行顺序,以及获取具体的消费任务的内容,还可以添加其他字段来设置终止任务的字段。

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 统一消息消费体
 */

public class DelayMessage<T> implements Delayed {

    private static final AtomicLong atomic = new AtomicLong(0);

    private final long n;

    private String uuid;

    /**
     * 消息内容
     */
    private T body;

    /**
     * 终止条件 可以不要
     */
//    private int size;

    /**
     * 到期时间,这个是必须的属性因为要按照这个判断延时时长。
     */
    private long executeTime;

    /**
     * 延迟毫秒数
     */
    private long delayTime;

    public DelayMessage(String uuid, T body, long delayTime) {
        this.uuid = uuid;
        this.n = atomic.getAndIncrement();
        this.body = body;
        this.delayTime = delayTime;
        this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this) {
            return 0;
        }
        if (other instanceof DelayMessage) {
            DelayMessage x = (DelayMessage) other;
            long diff = executeTime - x.executeTime;
            if (diff < 0) {
                return -1;
            } else if (diff > 0) {
                return 1;
            } else if (n < x.n) {
                return -1;
            } else {
                return 1;
            }
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : (d < 0 ? -1 : 1);
    }

    public static AtomicLong getAtomic() {
        return atomic;
    }

    public long getN() {
        return n;
    }

    public String getUuid() {
        return uuid;
    }

    public void setUuid(String uuid) {
        this.uuid = uuid;
    }

    public T getBody() {
        return body;
    }

    public void setBody(T body) {
        this.body = body;
    }

    public long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(long executeTime) {
        this.executeTime = executeTime;
    }

    public long getDelayTime() {
        return delayTime;
    }

    public void setDelayTime(long delayTime) {
        this.delayTime = delayTime;
    }
}

DelayConsumer

消费者:执行任务的具体业务逻辑。。

  private static void handlerMessage(Object body){
        System.out.println("处理具体的业务逻辑"+body);
    }


    public static void main(String[] args) {
        for(int i = 0 ;i < 5; i++){
            String uuid = UUID.randomUUID().toString();
            DelayQueUtil.submit(uuid,"msg"+i,DelayConsumer::handlerMessage,2000);
        }

    }

}文章来源地址https://www.toymoban.com/news/detail-442044.html

到了这里,关于通过延迟队列和线程池消费异步任务Demo的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包