java使用assign订阅,使用SASL_SSL协议的SCRAM-SHA-256加密方式消费kafka数据

这篇具有很好参考价值的文章主要介绍了java使用assign订阅,使用SASL_SSL协议的SCRAM-SHA-256加密方式消费kafka数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

如果不是有要求或者kafka生产者没有消费者群组,就不要用assign方式订阅,还是用subscribe订阅主题,我是被生产者坑了,开始给我说没有消费者群组,所有我只能用assign订阅指定分区,后来才给我说有消费者群组。
import com.alibaba.fastjson2.JSON;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.*;


@Component
public class KafkaConsumerAssign implements
        CommandLineRunner {

    @Value("${ss.pubTopic}")
    private String pubTopic = "topic";
    @Value("${ss.kafkaAddress}")
    private String kafkaAddress = "xx.xx.xxx.xx:8093,xx.xxx.xxx.xx:8093,xx.xxx.xxx.xx:8093";

    public void autoCommit() {
        ConsumerDict consumerDict = new ConsumerDict();
        Properties properties = new Properties();
        // 指定key与value的反序列化器
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("enable.auto.commit", false);//手动提交提交
        properties.put("bootstrap.servers", kafkaAddress);//kafka连接地址
        //消费者群组,如果没有群组的话可以写通,若果有消费者组不写会,后面提交偏移量的时候会报错
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");//消费者组
//        properties.put("max.poll.records",50);//单次最大记录数
//        properties.put("session.timeout.ms","50000");//消费者连接的超时时间
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='用户名' password='密码';"); 
        properties.put("security.protocol", "SASL_SSL");//安全协议
        properties.put("sasl.mechanism", "SCRAM-SHA-256");//加密方式
        //指定truststore文件
        properties.put("ssl.truststore.location", "D:/xxx/xx/xxx/xxxxxx.jks");
        //truststore文件密码
        properties.put("ssl.truststore.password", "aaaaaa");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        //使用partitionsFor获取该topic下所有的分区
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(pubTopic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        //使用assign方式订阅kafka
        consumer.assign(topicPartitions);
        operationKafkaMessage(consumer);
    }
    //启动程序后自动启动此kafka客户端
    @Override
    public void run(String... args) {
        new KafkaConsumerAssign().autoCommit();
    }
    private void operationKafkaMessage(KafkaConsumer<String, String> consumer) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//100ms 自动获取一次数据,消费者主动发起请求
            //循环所有的分区
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                //获取每个分区中的所有数据
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + ": " + record.value());                
                }
                //当前的消费到的位置
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                //处理完每个分区中的消息后,提交偏移量。
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }
    public static void main(String[] args) {
        //new KafkaConsumerAssign().autoCommit();
    }
}

文章来源地址https://www.toymoban.com/news/detail-572148.html

到了这里,关于java使用assign订阅,使用SASL_SSL协议的SCRAM-SHA-256加密方式消费kafka数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Kafka】手把手SASL,SSL教学

    Kafka支持以下SASL机制:GSSAPI 、PLAIN、 SCRAM-SHA-256、 SCRAM-SHA-512、 OAUTHBEARER。 本指南主要以SCRAM机制配置为主。 当使用SCRAM机制时,Kafka使用Zookeeper存储用户加密后的凭证,所以需要先使用Kafka提供的脚本进行用户的创建。 比如创建用户名为kafkaAdmin,密码为admin用户的操作命令如

    2024年01月17日
    浏览(39)
  • Kafka 中 SASL ACL SSL 到底分别代表什么意思

    auth: huangyichun date: 2023-5-11 看各类帖子都没能指出这些到底是什么意思,他们是冲突的,还是互相作用的,还是隔离的?本文讲解 kafka 中 SASL 、 ACL 、 SSL 他们分别的作用以及含义。 SASL 是用来认证 C/S 模式也就是服务器与客户端的一种认证机制,全称 Simple Authentication and Secu

    2024年02月15日
    浏览(42)
  • Kafka3.4 SASL/kerberos/ACL 证以及 SSL 加密连接

    前面我们使用 kafka3.3.1 on zookeeper 的模式进行多网段监听的 kafka 集群,顺便搭建起 kafkaui 后发现一些问题,我们 kafka 集群没有连接认证,万一谁知道了我们的 kafka 连接地址,岂不是随随便便就能消费数据、清空数据、胡乱修改数据了吗? 所以本章节进行认证连接的搭建,参

    2024年02月14日
    浏览(37)
  • Java 001:通过OPC UA协议连接KepServerEx进行读、写、订阅操作

    参考前辈的踩坑记录https://blog.csdn.net/weixin_45411740/article/details/124275985?spm=1001.2014.3001.5502,我Hyb在2023-3-15调通了自己的JavaOpcUaDemo。具体Java代码和KepServerEX延时补丁都在资源中。 第1步:安装激活KepServer,补丁在资源中,不详述。 第2步:在KepServer中做OpcUa配置。 2.1 先看桌面右下

    2024年02月08日
    浏览(46)
  • 【MQTT协议】使用Mosquitto实现mqtt协议(二):编写视频帧的发布/订阅服务

    更多内容详见 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译) MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。 MQTT协议中定义了三个不同等级的QoS: QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不

    2023年04月14日
    浏览(43)
  • vue2.0使用mqtt协议订阅阿里云物联网获取实时数据

    最近在公司要开发物联网项目,需要使用mqtt协议监听设备实时数据,因为要采用到后台展示不是很方便,可以使用阿里云服务器来做简单演示。 当然使用mqtt时需要两个软件结合使用,一个是 技小新MQTT编程工具 ,一个是 mqtt.fx 软件,为了方便大家下载,我把软件已经整理好

    2024年04月27日
    浏览(42)
  • QtMqtt使用SSL加密协议传输教程

    Qt开发MQTT程序有两种方式,一个是Qt官方提供的基于MQTT的封装,一个是第三方(EMQ)开发的用于Qt调用MQTT的接口,二者使用方法大同小异,并且均提供了源码。最好使用Qt官方提供的封装来使用MQTT。 Qt官方在github上提供了源代码,地址:https://github.com/qt/qtmqtt mqtt源码版本要跟

    2024年01月15日
    浏览(43)
  • HTTP协议 和 HTTPS协议的区别(4点) && HTTPS如何使用SSL/TLS协议加密过程 && CA证书干啥的

      1. HTTP协议的端口号是80, HTTPS协议的端口号是443 2. HTTP协议使用的URL是以 http:// 开头,HTTPS协议使用的URL是以https://开头 3. HTTP协议和HTTPS协议最主要的区别是: HTTP协议所生成的HTTP请求报文被TCP协议 以明文形式透明传输,同时 客户端与服务器之间无法核验对方的身份(不晓

    2024年02月14日
    浏览(43)
  • 使用Java实现微信小程序订阅消息

    首先到微信小程序的官网,选择合适自己的订阅消息模板。 寻找到适合自己的模板之后,记住模板ID,点开详情,记住每个字段id 微信小程序订阅消息官网文档介绍地址:小程序订阅消息 | 微信开放文档 (qq.com) 微信小程序订阅消息接口:发送订阅消息 | 微信开放文档 (qq.com

    2024年02月03日
    浏览(46)
  • c++使用OpenSSL基于socket实现tcp双向认证ssl(使用TSL协议)代码实现

    相信各位对OpenSSL库已经不陌生了,目前笔者使用这个库实现了RSA、AES加解密和tcp的双向认证功能,下面来看tcp的双向认证。 简单说双向认证就是:客户端认证服务端是否合法,服务端认证客户端是否合法 。 可以借助于HTTPS来说明,http网络传输协议是超文本的明文协议,也就

    2024年02月06日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包