kafka-顺序消息实现

这篇具有很好参考价值的文章主要介绍了kafka-顺序消息实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka-顺序消息实现

场景

在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证

解决方案

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

kafka-顺序消息实现,kafka,kafka

代码实现

引入依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.2</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties>
    <java.version>17</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.39</version>
    </dependency>
</dependencies>

使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{

    /**
     * 属于哪个分区
     */
    private String partition;

    @Override
    public String getUniqueNo() {
        return getOrderNo();
    }
}


@Data
public class InterOrderDto extends OrderDto implements OrderMessage{

    /**
     * 属于哪个分区
     */
    private String partition;

    @Override
    public String getUniqueNo() {
        return getOrderNo();
    }
}

public interface OrderMessage {

    /**
     * 线程池路由key
     * @return
     */
    String getUniqueNo();

}
定义topic

这里是 3个分区,2个副本

@Configuration
public class KafkaConfiguration {

    @Bean
    public NewTopic topic(){
        return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);
    }
}

public interface Constants {

     String TOPIC_ORDER = "order";
}
消费者

消费者:OrderListener

@Component
@Slf4j
public class OrderListener {

    @Autowired
    private OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;

    @KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")
    public void logListener(ConsumerRecord<String, String> record) {
        log.debug("> receive log event: {}-{}", record.partition(), record.value());
        try {
            OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);
            InterOrderDto interOrderDto = new InterOrderDto();
            BeanUtils.copyProperties(orderDto, interOrderDto);
            interOrderDto.setPartition(record.partition() + "");
            orderThreadPool.dispatch(interOrderDto);
        } catch (Exception e) {
            log.error("# kafka log listener error: {}", record.value(), e);
        }
    }

}

线程池: OrderThreadPool

/**
 * @Date: 2024/1/24 10:23
 * 线程池实现
 *
 * @param W: worker
 * @param D: message
 */
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {
    private List<W> workers;
    private int size;

    public OrderThreadPool(int size, Supplier<W> provider) {
        this.size = size;
        workers = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            workers.add(provider.get());
        }
        if (CollectionUtils.isEmpty(workers)) {
            throw new RuntimeException("worker size is 0");
        }
        start();
    }

    /**
     * route message to single thread
     *
     * @param data
     */
    public void dispatch(D data) {
        W w = getUniqueQueue(data.getUniqueNo());
        w.offer(data);
    }

    private W getUniqueQueue(String uniqueNo) {
        int queueNo = uniqueNo.hashCode() % size;
        for (W worker : workers) {
            if (queueNo == worker.getQueueNo()) {
                return worker;
            }
        }
        throw new RuntimeException("worker 路由失败");
    }

    /**
     * start worker, only start once
     */
    private void start() {
        for (W worker : workers) {
            new Thread(worker, "OWorder-" + worker.getQueueNo()).start();
        }
    }

    /**
     * 关闭所有 workder, 等待所有任务执行完
     */
    public void shutdown() {
        for (W worker : workers) {
            worker.shutdown();
        }
    }

}

工作线程:SingleThreadWorker, 内部使用阻塞队列使其串行化

/**
 * @Date: 2024/1/24 10:58
 * single thread with a blocking-queue
 */
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {

    private static AtomicInteger cnt = new AtomicInteger(0);
    private BlockingQueue<T> queue;
    private boolean started = true;

    /**
     * worker 唯一id
     */
    @Getter
    private int queueNo;

    public SingleThreadWorker(int size) {
        this.queue = new LinkedBlockingQueue<>(size);
        this.queueNo = cnt.getAndIncrement();
        log.info("init worker {}", this.queueNo);
    }

    /**
     * 提交消息
     *
     * @param data
     */
    public void offer(T data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);
        }
    }

    @Override
    public void run() {
        log.info("{} worker start take ", Thread.currentThread().getName());
        while (started) {
            try {
                T data = queue.take();
                doConsumer(data);
            } catch (InterruptedException e) {
                log.error("queue take error", e);
            }
        }
    }

    /**
     * do real consume message
     *
     * @param data
     */
    protected abstract void doConsumer(T data);

    /**
     * consume rest of message in the queue when thread-pool shutdown
     */
    public void shutdown() {
        this.started = false;
        ArrayList<T> rest = new ArrayList<>();
        int i = queue.drainTo(rest);
        if (i > 0) {
            log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);
            for (T t : rest) {
                doConsumer(t);
            }
        }
    }


}

工作线程实现:OrderWorker, 这里就单独处理订单事件

/**
 * @Date: 2024/1/24 13:42
 * 具体消费者
 */
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{
    public OrderWorker(int size) {
        super(size);
    }

    @Override
    protected void doConsumer(InterOrderDto data) {
        log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));

    }
}

生产者

生产者:OrderController, 模拟发送不同的事件类型的订单

@RestController
public class OrderController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send")
    public String send() throws InterruptedException {
        int size = 1000;
        for (int i = 0; i < size; i++) {
            OrderDto orderDto = new InterOrderDto();
            orderDto.setOrderNo(i + "");
            orderDto.setPayStatus(getStatus(0));
            orderDto.setTimestamp(System.currentTimeMillis());
            //相同的key发送到相同的分区
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
            TimeUnit.MILLISECONDS.sleep(10);
            orderDto.setPayStatus(getStatus(1));
            orderDto.setTimestamp(System.currentTimeMillis());
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
            TimeUnit.MILLISECONDS.sleep(10);
            orderDto.setPayStatus(getStatus(2));
            orderDto.setTimestamp(System.currentTimeMillis());
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
        }
        return "success";
    }

    private String getStatus(int status){
        return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";
    }
}

application.properties 配置

# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(BootKafkaApplication.class, args);
    }

    /**
     * 配置线程池
     * @return
     */
    @Bean
    public OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){
        OrderThreadPool<OrderWorker, InterOrderDto> threadPool =
            new OrderThreadPool<>(3, () -> new OrderWorker(100));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("shutdown orderThreadPool");
            //容器关闭时让工作线程中的任务都被消费完
            threadPool.shutdown();
        }));
        return threadPool;
    }

}

测试

访问: http://localhost:8080/send, 结果:

OWorder-0 worker start take 
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}

可以发现,在我们工作线程中,事件消费是有序的

good luck!文章来源地址https://www.toymoban.com/news/detail-824509.html

到了这里,关于kafka-顺序消息实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka顺序消费以及消息积压问题

    什么场景下需要顺序消费? 比如说:订单有很多状态,比如:下单(未支付)、完成(已支付)、撤销等,不可能下单的消息都没读取到,就先读取支付或撤销的消息吧,要保证消息顺序消费 如何保证顺序消费? kafka的topic是无序的,但是一个topic包含多个partition, 每个pa

    2024年04月29日
    浏览(41)
  • 【kafka面试题2】如何保证kafka消息的顺序性

    如何保证kafka消息的顺序性呢,其实整体的策略就是:我们 让需要有序的消息发送到同一个分区Partition。 为什么说让有序的消息发送到同一个分区Partition就行呢,,下面我们来详细分析一下子。 首先 ,我们知道kafka消息的收发是基于Topic(主题),消息通过Topic进行分类。单

    2024年02月13日
    浏览(39)
  • 保证消息顺序性:Kafka 的策略与挑战

    目录 1. 为什么消息顺序性很重要? 2. Kafka 的消息顺序性挑战 2.1 分区与并行性 2.2 生产者与网络延迟 2.3 消费者群组 3. 保证消息顺序性的策略 3.1 单分区单线程 3.2 顺序 ID 3.3 单一消费者 4. 最佳实践与注意事项 4.1 合理的分区设计 4.2 避免重分区 4.3 监控和测试 5. 结论      

    2024年02月03日
    浏览(37)
  • Kafka 如何保证消息消费的全局顺序性

    哈喽大家好,我是咸鱼 今天我们继续来讲一讲 Kafka 当有消息被生产出来的时候,如果没有指定分区或者指定 key ,那么消费会按照【轮询】的方式均匀地分配到所有可用分区中,但不一定按照分区顺序来分配 我们知道,在 Kafka 中消费者可以订阅一个或多个主题,并被分配一

    2024年02月05日
    浏览(45)
  • 高可用环境kafka消息未按顺序消费问题

    目录 1、背景 2、问题排查 3、问题解决 质检任务是异步执行,正常情况下任务状态扭转是    等待中》运行中》成功(失败) 。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的k

    2024年04月29日
    浏览(35)
  • Kafka、RocketMQ、RabbitMQ如何保证消息的顺序消费?

    一、1个Topic(主题)只创建1个Partition (分区),这样生产者的所有数据都发送到了一个Partition (分区),保证了消息的消费顺序; 二、生产者在发送消息的时候指定要发送到哪个 Partition,这样同一个 Partition 的数据会被同一个消费者消费,从而保证了消息的消费顺序。 实现思路

    2024年02月09日
    浏览(46)
  • kafka 分布式的情况下,如何保证消息的顺序消费?

    目录 一、什么是分布式 二、kafka介绍 三、消息的顺序消费 四、如何保证消息的顺序消费   分布式是指将计算任务分散到多个计算节点上进行并行处理的一种计算模型。在分布式系统中,多台计算机通过网络互联,共同协作完成任务。每个计算节点都可以独立运行,并且可以

    2024年02月10日
    浏览(52)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(51)
  • 阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了

    在现代分布式系统中,消息顺序消费扮演着至关重要的角色。特别是在涉及事务处理、日志追踪、状态机更新等场景时,消息的处理顺序直接影响着系统的正确性和一致性。例如,金融交易系统中,账户间的转账操作必须严格按照发出请求的顺序进行处理,否则可能导致资金

    2024年03月20日
    浏览(34)
  • BlockingQueue实现简易消息队列处理器 可分区顺序消费

    : 然后看下测试代码: 还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类: MxMQRunnable: MxMQ: MQHandler: Message: 好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景

    2024年02月05日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包