任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK

这篇具有很好参考价值的文章主要介绍了任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

任务调度框架

Java中如何实现定时任务?

比如:
1.每天早上6点定时执行
2.每月最后一个工作日,考勤统计
3.每个月25号信用卡还款
4.会员生日祝福
5.每隔3秒,自动提醒

10分钟的超时订单的自动取消,每隔30秒或1分钟查询一次订单,拿当前的时间上前推10分钟
定时任务,资源会有误差的存在,如果使用定时任务
定时任务,用于统计的时候最多。

自动统计考勤,一般0点之后开始统计,可以使用定时任务

nacos心跳

晚上要求和采购部门生成采购单,达到最低预警值的时候,去发给采购部门

我们可以通过任务调度框架实现上述的需求
任务调度框架,可以实现定时任务,实现间隔多少时间的重复执行,实现指定日期的重复执行

电商自动好评,间隔时间长的,误差几分钟,影响不大。

java中任务调度框架:
1、Spring Task spring自带的
2、Quartz 古老的框架
3、XXL-Job
4、第三方云平台-比如说:阿里云-SchedulerX等等
选择一个:Spring Task
2个注解+
包:task任务、job

使用步骤:
1、开关类,使用注解
@EnableScheduling // 开启任务调度
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
2、定义定时任务

@Scheduled(cron = "0/3 * * * ?")

CORN表达式:
秒 分 时 日 月 星期几 年 其中,只有年可以省略
定时任务,需要重复执行的方法

CORN表达式:
特殊字符串,主要用来描述时间的,用于任务调度等
https://cron.qqe2.com/

/ 间隔
- 是连续
, 枚举值
L 最后,星期、日中用
W 有效工作日
LW 某个月最后一个工作日
# 用于确定每个月第几个星期几,母亲节或父亲节
4#2 某个月的第二个星期三  4代表星期三,中文的时候有可能不影响

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

项目名:SpringTask01
Spring Web、Lombok

RabbitMq实现延迟:

死信+延迟消息处理

死信:RabbitMQ的队列中的消息,满足以下条件任意其一就会成为死信消息:
1.消息被拒绝
2.消息过期
3.队列满了
死信交换器:专门用来转发队列中的死信消息,将死信消息转发到指定的队列中

十分钟未支付,取消订单?
十分钟之后,消息会过期,过期后,通过死信交换器转发队列中的死信消息,将死信消息转发到指定的队列中,由消费者去处理。

我们可以通过死信+死信交换器实现延迟消息处理
RabbitMQ实现延迟消息处理有2种方式:
1、死信+死信交换器 代码实现(可控,更方便一些)
2、延迟消息插件

1、死信+死信交换器 代码实现(可控,更方便一些)
发送消息,到队列1,(产生延迟队列,产生死信)
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
一个消息,过一段时间,实现消费。

核心:
1.队列 是2个队列,第1个队列: 目的 产生死信(1.设置有效期2.不设置消费者),借助死信交换器,把产生的死信发到指定的队列中
第二个对了:目的 消费死信,这里获取的信息,时间就是延迟的,延迟的就是上面的有效期
2.1个交换器
死信交换器,整个系统一般就一个,可以用来转发各个功能产生的死信,是Direct类型的交换,通过RK进行消息匹配到对应的队列中。
RabbitMQ的消息的有效期有2种设置方式:
1.设置队列上的有效期,整个队列中所有消息都使用
2.可以在每个消息上设置有效期,这种适用于有多个不同有效期的消息

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
如果队列和消息都有有效期,谁短听谁的。

代码:
RabbitMQ02
实现:
1.pom

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--        RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

2.代码:
config-RabbitMQConfig

package com.yd.rabbitmq02.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author MaoXiqi
 * @organization: Lucky
 * @create 2023-10-16 11:35:54
 * @description RabbitMQConfigApi
 */
@Configuration
public class RabbitMQConfig {
    // 1.创建2个队列
    @Bean
    public Queue createQ1() {
        // 1.设置队列 内部消息有效期 设置死信交换器 设置RK
        HashMap<String, Object> params = new HashMap<>();
        // 设置队列中每个消息的有效期 单位 毫秒
        params.put("x-message-ttl", 3000);
        // 设置对应的死信交换器
        params.put("x-dead-letter-exchange", "dead-ex-yd");
        // 设置交换器匹配的路由名称
        params.put("x-dead-letter-routing-key", "test");
        return QueueBuilder.durable("dl-q01").withArguments(params).build();
    }
    @Bean
    public Queue createQ2() {
        return new Queue("dl-q02");
    }
    // 2.创建1个交换器(1.fanout 2,direct 3.topic 4.header)-死信交换器direct类型
    @Bean
    public DirectExchange createDe() {
        return new DirectExchange("dead-ex-yd");
    }
    // 3.创建1个绑定
    @Bean
    public Binding createBd1(DirectExchange de) {
        return BindingBuilder.bind(createQ2()).to(de).with("test");
    }
}

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

2.1.创建两个队列
1、设置队列,内部消息有效期 设置死信交换器 设置RK 注意:参数名是固定的,值根据业务需求去改,值 单位是毫秒

2.2创建1个交换器(1.fanout 2.direct 3.tipic
4.header)-死信交换器direct类型

2.3.创建一个绑定

controller-DeadController

    @GetMapping("send")
    public String sendDead(String msg) {
        System.out.println("发送消息," + msg + ",发送时间:" + System.currentTimeMillis());
        template.convertAndSend("", "dl-q01", msg);
        return "ok";
    }

发送
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
监听消息-主要是为了消费:
listener-DeadListener

    @RabbitListener(queues = "dl-q02")
    public void handler(String m) {
        System.out.println("延迟消息,"+m+",接受时间:" +System.currentTimeMillis());
    }

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
yml:

spring:
  rabbitmq:
    host: 121.36.5.100
    port: 5672
    username: guest
    password: guest
server:
  port: 8082

测试:
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

RabbitMQ事务:

数据库中事务:保证数据一致性,特别是多个操作要么都成功,要么都失败
RabbitMQ事务:一次性发多条消息,需要开启事务,
RabbitMQ也有自己的事务,如果本次任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

使用步骤:

源码:RabbitMQ02
1.创建配置类
2.使用基于事务发送
3.监听消息

1.创建配置类
config-RabbitMQTranConfig
1.准备一个队列
2.创建事务管理器

// 创建事务管理器
    @Bean
    public RabbitTransactionManager createTran(ConnectionFactory factory) {
        return new RabbitTransactionManager(factory);
    }

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

2.controller-TranController
开启事务
发送消息

    // 事务
   @Transactional // 需要开启SpringBoot的事务机制
   @GetMapping("sendmsg")
   public String sendMsg(String msg, int count) {
       // 开启 RabbitMQ的通道的事务
       template.setChannelTransacted(true);
       // 发送消息
       for (int i = 0; i < count; i++) {
           template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
           // 出错,看看 事务是否生效
           if (i==2) {
               System.out.println(1/0);
           }
       }
       return "ok";
   }

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

3.listener-DeadListener

    // 事务
    @RabbitListener(queues = "yd-tran-q01")
    public void handler2(String msg) {
        System.out.println("监听消息"+msg);
        // 处理业务逻辑 出错了
    }

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

手动ACK

RabbitMQ怎么防止消息丢失:
1.发送端没有发送过去
解决:
1.用事务
2.confirm消息确认机制 万能:转人工处理
2.MQ服务器丢失,MQ服务器蹦了,
解决:开启持久化
3.消费端消息丢失:
解决:自动应答,改成开始手动ACK

消息确认机制:默认是自动确认

消息的发送和接收是异步

RabbitMQ如何防止消息丢失:
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
代码:
config-RabbitMqTranConfig

    //消费消息的⼿动应答
    @Bean
    public Queue createQ4() {
        return new Queue("yd-ack-q01");
    }

controller-DeadController

    // 事务
    @Transactional // 需要开启SpringBoot的事务机制
    @GetMapping("sendmsg")
    public String sendMsg(String msg, int count) {
        // 开启 RabbitMQ的通道的事务
        template.setChannelTransacted(true);
        // 发送消息
        for (int i = 0; i < count; i++) {
            template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
            // 出错,看看 事务是否生效
            if (i==2) {
                System.out.println(1/0);
            }
        }
        return "ok";
    }

listener-DeadListener
一般设置个上限,比如最多三次

    @RabbitListener(queues = "yd-ack-q01")
    public void handler3(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //消费者获取消息,默认采用的自动应答,就是获取就应答,这样MQ服务器就删除消息
        //还可以手动应答:结果:1.成功(MQ删除)2.失败(MQ消息)
        System.out.println("收到ACK消息,监听消息:" + msg);
        //拒绝消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝 3.是否把消息添加回队列中
        channel.basicNack(tag,false,true);

        //成功消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝
        //channel.basicAck(tag,true);
        // 处理业务逻辑 出错了
    }

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

消费消息的手动应答:
RabbitMQ默认的消费者消息获取模式采用的是手动应答
但是这种有缺陷,可能会出现,消息获取了但是业务出了问题,导致MQ也自动删除了消息,最终导致业务没有执行
所以为了解决这种问题,可以开启手动应答模式,结合自己的业务执行情况,如果业务执行成功,那么就成功应答,如果失败了,就拒绝消息,同时把消息再加回队列,这样就可以再次消息再次处理(加个上限)
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式
测试
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

RabbitMQ如何保证消息的幂等性:
幂等性就是重复消费。
解决:
1.生成一个全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有小费过。
2.
任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

用户充值,重复消费,相当如充值了多次,是一定要杜绝的。

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK,cloud,rabbitmq,分布式

不要返回值的时候,可以用RabbitMQ替代OpenFegin,因为消费完就不回了。MQ默认是单向的
短信发信可以用MQ,这个业务要做,做起来可能会很耗时,中间要经过运营商,过程不可控

RabbitMQ应用场景
消息通信,发送消息和接收消息,是异步
1.实现服务通信
用在微服务中,实现2个服务的通信,这种不带返回值的,只是为了执行另一个服务的方法执行
2.解决耗时操作
比如:邮件、短信、第三方接口等,比较耗时
3.解耦
4.提升性能
5.重复代码封装
6.削峰填谷(订单先下到redis中,再通过MQ和延迟队列,慢慢的从redis搬到mysql中)

关键词:异步、解耦、延迟文章来源地址https://www.toymoban.com/news/detail-715311.html

到了这里,关于任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CentOS上如何配置手动和定时任务自动进行时间同步

    Linux(Centos)上使用crontab实现定时任务(定时执行脚本): Linux(Centos)上使用crontab实现定时任务(定时执行脚本)_centos 定时任务-CSDN博客 Winserver上如何配置和开启NTP客户端进行时间同步: Winserver上如何配置和开启NTP客户端进行时间同步_配置windows ntp客户端-CSDN博客 在Centos上如何进行

    2024年02月20日
    浏览(41)
  • STM32 实现简单定时任务调度器,动态创建任务,两种思路实现流水灯

    代码实现和硬件没关系,所以并不限于STM32,Arduino 之类的其他地方也能用,只要有一个能获取时间的函数就行,或者说,只要有一个会随着时间自动增加的变量就行,时间单位无所谓,所以确实想的话,拿到电脑上也能用。后面会用跑马灯程序来说明定时任务的玩法,可以直

    2024年02月10日
    浏览(38)
  • 造个轮子-任务调度执行小框架-任务清单解析实现

    okey~每日编码一坤时,昨天的话我们已经实现了这个框架的IOC容器。通过这个IOC容器,我们就可以非常轻松地进行后续的操作,于是,我们接着这个工作,去完成这个任务清单的解析。 昨天的话,阐述了一下这个框架解决了哪些问题,那么接下来,是如何使用这个家伙。以及

    2024年02月13日
    浏览(44)
  • 造个轮子-任务调度执行小框架-任务清单执行器实现

    okey,上一篇文章我们提到了,如何实现它的一个清单的一个代理。这里的话我们来捋一捋我们的这个执行流程是啥: 所以的话,我们的我们这里今天要做的是这个执行器的一个执行。当然这里的话,我们也是分两个部分,因为这个执行器的话,是分两个部分的,一个是正常的

    2024年02月13日
    浏览(48)
  • Django框架使用定时器-APScheduler实现定时任务:django实现简单的定时任务

    系统:windows10 python: python==3.9.0 djnago==3.2.0 APScheduler==3.10.1 1、创建utils包,在包里面创建schedulers包 utils/schedulers/task.py utils/schedulers/scheduler.py utils/schedulers/__init__.py 2、项目配置文件settings.py

    2024年02月12日
    浏览(46)
  • 造个轮子-任务调度执行小框架-IOC容器实现

    忙里偷闲,今天终于是把概率论这块骨头干下来了。所以的话,留了点时间,把整个项目的结构和基本的功能给实现以下。通过昨天的一个功能的一个设计,我想应该可以明白我想干啥吧。这里的话,重复一下,那就是俺们要搞一个任务执行框架。 这个框架到底有啥用?举个

    2024年02月13日
    浏览(39)
  • Java定时任务、自动化任务调度

    Java提供了多种方式来实现定时任务,使得开发人员能够在指定的时间间隔或固定时间点执行特定的任务。本文将介绍Java中实现定时任务的几种常用方法,并探讨它们的优势和适用场景。 Java中的Timer类是最早引入的定时任务工具,它可以用于执行一次性或重复性的定时任务。

    2024年02月16日
    浏览(52)
  • Spring Boot 3 整合 xxl-job 实现分布式定时任务调度,结合 Docker 容器化部署(图文指南)

    xxl-job 是一个分布式任务调度平台,它提供了强大的任务调度和执行能力,可以帮助我们实现任务的自动化调度和执行。本文将介绍如何在 Docker 环境下部署 xxl-job,并将其与 Spring Boot 进行整合。 数据库脚本:tables_xxl_job-2.4.0.sql Docker 镜像地址: https://hub.docker.com/r/xuxueli/xxl-jo

    2024年02月06日
    浏览(74)
  • 分布式任务调度,定时任务的处理方案

    适用场景: Spring 定时任务是 Spring 框架提供的一种轻量级的任务调度方案,它的特点是简单易用、轻量级。Spring 定时任务的执行是在 单个节点 上进行的,如果需要分布式任务调度,需要自己实现相应的解决方案。 1.导入依赖版本自己控制 2.启动类加上@EnableScheduling 3.编写业

    2023年04月14日
    浏览(64)
  • Linux 定时任务调度(crontab)

    Crontab命令用于设置周期性被执行的指令。该命令从标准输入设备读取指令,并将其存放于“crontab”文件中,以供之后读取和执行。 可以使用Crontab定时处理离线任务,比如每天凌晨2点更新数据等,经常用于系统任务调度。 一般Linux系统中都会装有Crontab,如果没有安装可以使

    2024年02月07日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包