java阻塞队列/kafka/spring整合kafka

这篇具有很好参考价值的文章主要介绍了java阻塞队列/kafka/spring整合kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

queue增加删除元素

  • 增加元素
    • add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
    • put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素
    • offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false
  • 删除元素
    • poll: 若队列为空,返回null。
    • remove:若队列为空,抛出NoSuchElementException异常。
    • take:若队列为空,发生阻塞,等待有元素

BlockingQueue:

  • 解决线程通信的问题
  • 阻塞方法:put、take

其他实现类:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue/ SynchronousQueue/ DelayQueue

BlockingQueue实例

package com.nowcoder.mycommunity;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable{

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for(int i = 0; i < 100; ++ i){
                queue.put(i);
                Thread.sleep(20);
                System.out.println(Thread.currentThread().getName() + "   producer" + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{

    public BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                Thread.sleep(new Random().nextInt(1000));
                System.out.println(Thread.currentThread().getName() + "   consuer" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

kafka

  • kafka是一个分布式的流媒体平台
  • 主要应用:消息系统、日志收集、用户行为追踪、流式处理
  • 特点:高吞吐量、消息持久化(存放在磁盘上,btw,磁盘顺序读写速度并不慢)、高可靠性、高扩展性
Broker

kafka的服务器,每一台服务器称为一个Broker

Zookeeper

管理其他集群,包括kafka的集群。可以单独下载

Topic/ Partition/ Offset

消息队列可能是一对多的形式,生产者将一条消息放在多个队列中,然后消费者从各自的队列中取消息。
下图为一个Topic,Topic中可能会含有很多Partition,Offset为Partition的索引
java阻塞队列/kafka/spring整合kafka,java杂文,java,kafka,开发语言

Leader Replica/ Follower Replica

kafka的数据不止存储一份,他会存为多份,即使某一个分区坏了还可以有备份。
leader Replica(祖副本):当尝试从分区获取数据时,祖副本可以处理请求,返回数据
Follower Replica(随从副本):只能备份,不能响应请求
如果祖副本挂掉,集群会从Follower Replica中选一个作为新的leader

kafka命令

官方文档

配置

进入到configure目录下,修改consumer.properties

使用

进入到kafka的目录中

// 启动zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties 

// 启动kafka
> ./bin/kafka-server-start.sh config/server.properties 

// --create:创建主题
// --bootstrap-server localhost:9092:在哪个服务器创建主题,kafka默认端口为9092
// --replication-factor 1:副本为1
// --partitions 1:分区为1
// --topic test:主题的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

// 查看该服务器上的主题
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092            
test

// 创建生产者向某个服务器的某个主题中发消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test        
>hello
>world

// 创建一个消费者,读取某个服务器上某个主题下的消息队列,从头开始读取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

Spring整合Kafka

引入依赖

pom.xml文章来源地址https://www.toymoban.com/news/detail-524053.html

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.7</version>
</dependency>
配置Kafka
  • 配置server
  • 配置consumer
# Kafka Properties
# 服务器地址
spring.kafka.bootstrap-servers==localhost:9092
#消费者id,可以在consumer.properties查看
spring.kafka.consumer.group.id=mycommunity-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交的时间间隔,单位毫秒
spring.kafka.consumer.auto-commit-interval=3000
访问Kafka
  • producer
  • consumer

Spring整合Kafka的例子

package com.nowcoder.mycommunity;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyCommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test", "hello");
        kafkaProducer.sendMessage("test", "world");

        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Component
class KafkaProducer{

    @Autowired
    public KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content){
        kafkaTemplate.send(topic, content);
    }
}

@Component
class KafkaConsumer{

	// 加上listener注解,Spring会自动注入
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

到了这里,关于java阻塞队列/kafka/spring整合kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 非阻塞重试与 Spring Kafka 的集成测试

            如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。 Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要,还可以配置其他死信主题。如果所有重试均已用尽,事件将转发至 DLT。公共领域提供了大量资源来了解技术细节。  在

    2024年02月12日
    浏览(23)
  • 充分了解java阻塞队列机制

    BlockingQueue继承了Queue的接口,是队列的一种,并且和Queue相比,BlockingQueue是线程安全的,多用于并发+并行编程,对于线程安全问题可以很好的解决. 下面是实现BlockingQueue接口的类 怕大家理解不方便,俺通过思维导图的方式给大家呈现 阻塞队列的典型例子就是BlockingQueue接口的实现类

    2024年02月15日
    浏览(32)
  • kafka延时队列原理,Java开发中遇到最难的问题

    Dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,发布者和订阅者之间还能通信么? Dubbo 的整体架构设计有哪些分层? 什么是 Spring Boot?以及Spring Boot的优劣势? 你如何理解 Spring Boot 中的 Starters? 服务注册和发现是什么意思?Spring Cloud 如何实现? Spring Cloud断路器的作用

    2024年03月21日
    浏览(37)
  • 使用 Spring Kafka 进行非阻塞重试的集成测试

    ​Kafka的非阻塞重试是通过为主题配置重试主题来实现的。如果需要,还可以配置额外的死信主题。如果所有重试都耗尽,事件将被转发到DLT。在公共领域中有很多资源可用于了解技术细节。对于代码中的重试机制编写集成测试确实是一项具有挑战性的工作。以下是一些测试

    2024年02月10日
    浏览(29)
  • 深入浅出Java多线程(十三):阻塞队列

    大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第十三篇内容:阻塞队列。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!! 在多线程编程的世界里,生产者-消费者问题是一个经典且频繁出现的场景。设想这样一个情况:有一群持续

    2024年03月20日
    浏览(35)
  • java高并发系列 - 第25天:掌握JUC中的阻塞队列

    这是java高并发系列第25篇文章。 环境:jdk1.8。 本文内容 掌握Queue、BlockingQueue接口中常用的方法 介绍6中阻塞队列,及相关场景示例 重点掌握4种常用的阻塞队列 Queue接口 队列是一种先进先出(FIFO)的数据结构,java中用Queue接口来表示队列。 Queue接口中定义了6个方法:

    2024年02月14日
    浏览(29)
  • 【Java】多线程案例(单例模式,阻塞队列,定时器,线程池)

    ❤️ Author: 老九 ☕️ 个人博客:老九的CSDN博客 🙏 个人名言:不可控之事 乐观面对 😍 系列专栏: 单例模式是设计模式之一。代码当中的某个类,只能有一个实例,不能有多个。单例模式分为:饿汉模式和懒汉模式 饿汉模式表示很着急,就想吃完饭剩下很多碗,然后一

    2024年02月06日
    浏览(35)
  • SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

    SpringBoot整合SpringCloudStream3.1+版本Kafka 添加死信队列配置文件,添加对应channel 通道绑定配置对应的channel位置添加重试配置 Kafka基本配置(application-mq.yml) 创建死信队列配置文件(application-dql.yml) 注意:这里的valueSerde使用了对象类型,需要搭配 application/json 使用,consumer接收

    2024年02月16日
    浏览(27)
  • Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)

    设计模式就是软件开发中的“棋谱”,软件开发中也有很多常见的 “问题场景”。针对这些问题场景,大佬们总结出了一些固定的套路。按照这些套路来实现代码可能不会很好,但至少不会很差。当前阶段我们需要掌握两种设计模式: (1)单例模式 (2)工厂模式 概念/特征

    2024年02月09日
    浏览(42)
  • 【Java系列】多线程案例学习——基于阻塞队列实现生产者消费者模型

    个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得,欢迎大家在评论区交流讨论💌 什么是阻塞式队列(有两点): 第一点:当队列满的时候

    2024年02月04日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包