rabbitmq延时队列自动解锁库存

这篇具有很好参考价值的文章主要介绍了rabbitmq延时队列自动解锁库存。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、库存服务自动解锁库存

rabbitmq延时队列自动解锁库存,rabbitmq,分布式

使用了最终一致性来解决分布式事务
当order服务出现异常回滚,此时ware服务无法回滚,怎么办?

使用seata全局事务虽然能在order服务出现异常导致回滚时使其他服务的也能同时回滚,但在流量大的情况下是使用加锁的方式,效率
低不适合并发量大的情况,也可以使用定时任务轮询去查看订单的状态,但是轮询的方式比较占资源和内存,所以选用最终一致性的方案,使用mq延时队列死信路由,然后做出补救方案,只要订单服务出现故障就通过mq定时去判断,只要能保证库存最终能解锁即可

延时队列自动解锁库存业务逻辑
ware服务在完成锁库存时就给mq发消息,把消息存到死信队列中,这个消息记录了那些商品锁定多少库存,当queue到达存活时间就会把消息交给死信路由交换机,死信路由交换机会把消息发到最终的队列,如果订单支付时间为30分钟,我们就把存活时间设置为40分钟,这样就能保证我们监听的消息一定是超过了支付的时间的,然后让ware库存服务去订阅监听最终的队列即可,只要有消息我们就去检验order订单服务,只要证明订单服务出现异常回滚或者订单超过支付时间未支付的订单我们就去做一个解锁还原库存的操作

1.库存锁定成功,给mq发消息

(1)保存 工作单(订单号)、工作单详情(商品锁了多少库存)
(2)把上面的数据给mq发一份

@Transactional
    @Override
    public boolean orderStock(OrderStockRequest orderStockRequest) {
        //保存工作单
        WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
        wareOrderTaskEntity.setOrderSn(orderStockRequest.getOrderSn());
        wareOrderTaskService.save(wareOrderTaskEntity);

        List<OrderItemVo> itemVos = orderStockRequest.getItemVos();
        List<SkuStockfromWare> collect = itemVos.stream().map(item -> {
            SkuStockfromWare skuStockfromWare = new SkuStockfromWare();
            skuStockfromWare.setSkuId(item.getSkuId());
            skuStockfromWare.setNum(item.getCount());
            //查询该商品在那些仓库有库存
            List<Long> wareId = wareSkuDao.skuStockfromWare(item.getSkuId());
            skuStockfromWare.setWareId(wareId);
            return skuStockfromWare;
        }).collect(Collectors.toList());

        //根据skuId遍历
        for (SkuStockfromWare skuStockfromWare : collect) {
            //判断是否锁定成功
            boolean flag = false;

            //判断该商品是否有仓库存在库存
            List<Long> wareIdList = skuStockfromWare.getWareId();
            if (wareIdList.size() < 0 || wareIdList == null){
                throw new NoWareStockException(skuStockfromWare.getSkuId());
            }
            for (Long wareId : wareIdList) {
                Long count = wareSkuDao.LockedStockFromWare(skuStockfromWare.getSkuId(),wareId,skuStockfromWare.getNum());
                if (count.equals(1L)){
                    //锁定成功
                    flag = true;

                    //保存工作单详情
                    WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
                    wareOrderTaskDetailEntity.setSkuId(skuStockfromWare.getSkuId());
                    wareOrderTaskDetailEntity.setSkuNum(skuStockfromWare.getNum());
                    wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());
                    wareOrderTaskDetailEntity.setWareId(wareId);
                    wareOrderTaskDetailEntity.setLockStatus(1);
                    wareOrderTaskDetailService.save(wareOrderTaskDetailEntity);
                    //TODO 库存锁定成功->发消息给交换机
                    StockLocked stockLocked = new StockLocked();
                    stockLocked.setTaskId(wareOrderTaskEntity.getId());
                    WareOrderTaskDetailTo wareOrderTaskDetailTo = new WareOrderTaskDetailTo();
                    BeanUtils.copyProperties(wareOrderTaskDetailEntity,wareOrderTaskDetailTo);
                    stockLocked.setDetailTo(wareOrderTaskDetailTo);
                    //convertAndSend(String exchange, String routingKey, Object object)
                    rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",);

                    //该商品锁定库存成功就执行下一个商品
                    break;
                }

            }

            //如果没有一个仓库扣成功,代表此skuId的库存不足
            if (!flag){
                throw new SkuNoStockException(skuStockfromWare.getSkuId());
            }

        }
        return true;
    }

2.监听队列,解锁库存

(1)判断工作单是否存在
不存在代表锁库存操作已回滚,不做处理
(2)查询订单是否存在
如果订单不存在,表示下订单操作已回滚,执行解锁库存操作
如果存在,查询订单状态是否为 4-已关闭,如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理
(3)解锁前判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作
(4)解锁库存,修改工作单详情状态为 已解锁

/**
 * 解锁库存
 */
@RabbitListener(queues = {"stock.release.stock.queue"})
@Service
public class UnLockStockListener {

    @Autowired
    WareSkuService wareSkuService;

    @RabbitHandler
    public void UnLockStock(StockLockedTo lockedTo, Channel channel, Message message) throws IOException {
        try {
            wareSkuService.unlockStock(lockedTo);
            //签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            //拒签,让消息重新归队,等待服务器重启进行下一次解锁
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }

    }

}

解锁操作


    /**
     * 解锁库存
     *
     * (1).判断工作单是否存在
     *      不存在代表已回滚,不做处理
     *      (2).查询订单是否存在
     *          如果订单不存在,表示已回滚
     *             (3).执行解锁库存操作
     *          如果存在,查询订单状态是否为 4-已关闭
     *                如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理
     */
    @Override
    public void unlockStock(StockLockedTo lockedTo) {

        WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(lockedTo.getTaskId());
        //已回滚不做处理
        if (taskEntity != null){
            //查询订单是否存在
            R<OrderVo> r = orderFeignService.orderStatus(taskEntity.getOrderSn());
            if (r.getCode() == 0){
                OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {
                });
                if (orderVo == null || orderVo.getStatus() == 4){
                    WareOrderTaskDetailTo detailTo = lockedTo.getDetailTo();
                    //判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作
                    if (detailTo.getLockStatus() == 1){
                        //恢复库存
                        unlock(detailTo.getId(),detailTo.getSkuNum(),detailTo.getSkuId(),detailTo.getWareId());
                    }
                }
            }else {
                throw new OrderFeignException();
            }

        }
    }

    /**
     * 解锁库存
     * UPDATE `wms_ware_sku` SET stock_locked = stock_locked - ?
     * WHERE sku_id = ? AND ware_id = ?
     */
    private void unlock(Long id,Integer skuNum, Long skuId, Long wareId) {
        wareSkuDao.unlock(skuNum,skuId,wareId);
        //修改状态为 已解锁
        WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();
        wareOrderTaskDetailEntity.setId(id);
        wareOrderTaskDetailEntity.setLockStatus(2);
    }

二、订单服务关闭订单同时也执行解锁库存操作

rabbitmq延时队列自动解锁库存,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-574769.html

到了这里,关于rabbitmq延时队列自动解锁库存的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(48)
  • 分布式消息队列RabbitMQ-Linux下服务搭建,面试完腾讯我才发现这些知识点竟然没掌握全

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 5.修改配置文件 这里面修改{loopback_users, [“guest”]}改为{loopback_users, []} {application, rabbit, %% - - erlang - - [{description, “RabbitMQ”}, {id, “RabbitMQ”}, {vsn, “3.6.5”}, {modules, [‘background_gc’,‘delegate’,‘delegate_sup’,‘dtree’,‘file_han

    2024年04月14日
    浏览(54)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(46)
  • Rabbitmq死信队列及延时队列实现

    问题:什么是延迟队列 我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。 但RabbitMq中并 没有提供延迟队列功能 。那么RabbitMQ如何实现延迟队列 通过:死信队列 + RabbitMQ的TTL特性实现。 实现原理 给一个普通带有过期功能的队列绑定一

    2024年02月15日
    浏览(47)
  • RabbitMQ如何实现延时队列

    RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列。 1. 延时队列的概念 延

    2024年02月16日
    浏览(38)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)
  • 【技术分享】四、RabbitMQ “延时队列”

    延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。 ex: 定时任务:十分钟后执行某种操作。 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

    2024年02月08日
    浏览(53)
  • .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(40)
  • Golang RabbitMQ实现的延时队列

    之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。 延迟队列是一种特殊类型的消息队列 ,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景

    2024年02月10日
    浏览(51)
  • 深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常

    2024年02月03日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包