kerberos认证Flink的kafka connector和kafka client配置

这篇具有很好参考价值的文章主要介绍了kerberos认证Flink的kafka connector和kafka client配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink kafka kerberos,kafka,flink

一、flink-connector-kakfa

1. kafka配置文件

kafka jaas必须配置,如果缺少,则报一下错误。

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

对于Flink只能通过配置java.security.auth.login.config的方式。

jaas配置

1.1 方式一:

System.setProperty配置系统变量:

System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");

kafka_client_jaas_keytab.conf文件内容如下:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab = true   
    useTicketCache=false
    storeKey = true
    keyTab="D://configs//xxx.keytab"
    principal="xxx@XXXXXX.COM"
    serviceName="kafka";
};

1.2 方法二:在IDEA中添加jvm参数:

-Djava.security.auth.login.config=D:\\configs\\kafka_client_jaas_keytab.conf

flink kafka kerberos,kafka,flink

注意:将参数添加至kafka 的properties中是错误的。如下:

Properties properties = new Properties();
properties.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(topic, simpleStringSchema, properties);

2 配置Flink kerberos

2.1 Idea中配置jvm环境变量

idea配置

-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab -Dsecurity.kerberos.login.principal=xxx@XXXXXX.COM
2.2 传递stream env

直接传递参数给flink StreamExecutionEnvironment

        Properties flinkProps = new Properties();
        flinkProps.setProperty("security.kerberos.krb5-conf.path", "D:\\configs\\krb5.conf");
        flinkProps.setProperty("security.kerberos.login.keytab", "D:\\configs\\xxx.keytab");
        flinkProps.setProperty("security.kerberos.login.principal", "xxx@XXXXXX.COM");
        flinkProps.setProperty("security.kerberos.login.contexts", "Client,KafkaClient");
        flinkProps.setProperty("state.backend", "hashmap");
        // Configuration flinkConfig = ConfigUtils.getFlinkConfig();
        Configuration flinkConfig = new Configuration();
        flinkConfig.addAllToProperties(flinkProps);
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);

3. 查看是否连接成功

kafka连接成功可以看到如下日志内容:

09:38:26.473 [Sink: Unnamed (6/8)#0] INFO  org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
... ...
09:38:27.534 [kafka-producer-network-thread | producer-3] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-3] Cluster ID: vj0AfElIS12S0Cp0WDBU7Q
... ...
09:38:27.618 [kafka-kerberos-refresh-thread-xxx@XXXXXX.COM] WARN  org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=xxx@XXXXXX.COM]: TGT renewal thread has been interrupted and will exit.

4. 配置成cache是不行的。

注意:设置成如下cache格式的,是不行的。
虽然flink已经设置了kerberos的principal和keytab 。

System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_cache.conf");

kafka_client_jaas_cache.conf文件内容:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};

会报如下错误:

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

附代码:

    @Test
    public void testWrite() throws Exception {
// jvm配置:-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab -Dsecurity.kerberos.login.principal=xxx@XXXXXX.COM 
// System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");
        Properties flinkProps = new Properties();
        flinkProps.setProperty("security.kerberos.krb5-conf.path", "D:\\configs\\krb5.conf");
        flinkProps.setProperty("security.kerberos.login.keytab", "D:\\configs\\xxx.keytab");
        flinkProps.setProperty("security.kerberos.login.principal", "xxx@XXXXXX.COM");
        flinkProps.setProperty("security.kerberos.login.contexts", "Client,KafkaClient");
        flinkProps.setProperty("state.backend", "hashmap");

        // Configuration flinkConfig = ConfigUtils.getFlinkConfig();
        Configuration flinkConfig = new Configuration();
        flinkConfig.addAllToProperties(flinkProps);
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);

        Properties properties = new Properties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI");
        properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
        // flink-connector-kafka api中错误配置jaas的方法:properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:\\configs\\xxx.keytab", "xxx@XXXXXX.COM"));
        String topic = "flinkcdc";
        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(topic, simpleStringSchema, properties);
        senv.fromElements("hello world", "coming again").addSink(producer);
        senv.execute("test");
    }

二、kafka-client方式

1. kafka 的jaas配置

配置 java的 java.security.auth.login.config 或者 kafka 的sasl.jaas.config 都是可以的。
但注意jaas配置优先级
sasl.jaas.config > java.security.auth.login.config
所以如果配置了 sasl.jaas.config, 就会导致 java.security.auth.login.config 失效

上代码:
首先需要注意sasl.jaas.config 中的路径分隔符不能是 \\ 必须是 /

错误的:
D:\\configs\\kafka_client_jaas_keytab.conf
正确的:
D:/configs/kafka_client_jaas_keytab.conf
    private static final String JAAS_CONFIG_KEYTAB_TEMPLATE =
            "com.sun.security.auth.module.Krb5LoginModule required\n" +
                    "debug=true\n" +
                    "doNotPrompt=true\n" +
                    "storeKey=true\n" +
                    "useKeyTab=true\n" +
                    "keyTab=\"%s\"\n" +
                    "principal=\"%s\";";

    @Test
    public void testKafkaWrite() {
        Properties properties = new Properties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI");
        properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 以下二者选其中之一就可以了。
        System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");
        // properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:/configs/xxx.keytab", "xxx@XXXXXX.COM"));
        try {
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            ProducerRecord<String, String> record1 = new ProducerRecord<>("flinkcdc", "hello kafka");
            ProducerRecord<String, String> record2 = new ProducerRecord<>("flinkcdc", "coming soon");
            Future<RecordMetadata> f1 = producer.send(record1);
            Future<RecordMetadata> f2 = producer.send(record2);

            producer.flush();
            List<Future<RecordMetadata>> fs = new ArrayList<>();
            fs.add(f1);
            fs.add(f2);
            for (Future<RecordMetadata> future : fs) {
                RecordMetadata metadata = future.get();
                System.out.println(metadata.toString());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

kafka_client_jaas_keytab.conf 文件内容和flink-conector-kakfka的一样。

三、kafka console 启动命令

console producer启动命令:

bin/kafka-console-producer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --producer-property security.protocol=SASL_PLAINTEXT

console consumer启动命令:文章来源地址https://www.toymoban.com/news/detail-757697.html

bin/kafka-console-consumer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --from-beginning  --consumer-property security.protocol=SASL_PLAINTEXT --group tester

到了这里,关于kerberos认证Flink的kafka connector和kafka client配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解Java GSS(含kerberos认证及在hadoop、flink案例场景举例)

    在当今的信息安全环境下,保护敏感数据和网络资源的安全至关重要。 Kerberos 认证协议作为一种强大的网络身份验证解决方案,被广泛应用于许多大型分布式系统中,如: Hadoop 。而 Java GSS ( Generic Security Services )作为 Java 提供的通用安全服务,与 Kerberos 认证密切相关。 本

    2024年02月08日
    浏览(44)
  • 大数据之Kerberos认证与kafka开启Kerberos配置

    数据安全 = 认证 + 授权 授权是指用户可以访问的资源,比如:授权用户张三不能访问ods层的表,可以访问dwd层和dws层的表。 再比如java中基于角色的身份认证RBAC(Role-Based Access Control)基于角色的权限控制。通过角色关联用户,角色关联权限的方式间接赋予。比如大数据中使

    2024年02月02日
    浏览(37)
  • Kerberos安全认证-连载12-Kafka Kerberos安全配置及访问

    目录 1. Kafka配置Kerberos 2. 客户端操作Kafka ​​​​​​​3. Java API操作Kafka 4. StructuredStreaming操作Kafka 5. Flink 操作Kafka 技术连载系列,前面内容请参考前面连载11内容:​​​​​​​​​​​​​​Kerberos安全认证-连载11-HBase Kerberos安全配置及访问_IT贫道的博客-CSDN博客 Kafk

    2024年02月12日
    浏览(51)
  • 40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月05日
    浏览(46)
  • Kafka配置Kerberos安全认证及与Java程序集成

    本文主要介绍在 Kafka 中如何配置 Kerberos 认证,以及 java 使用 JAAS 来进行 Kerberos 认证连接。 本文演示为单机版。 查看 Kerberos 版本命令: klist -V 软件名称 版本 jdk 1.8.0_202 kafka 2.12-2.2.1 kerberos 1.15.1 Kerberos 是一种由 MIT(麻省理工大学)提出的网络身份验证协议,它旨在通过使用密

    2024年01月22日
    浏览(61)
  • Kafka-配置Kerberos安全认证(JDK8、JDK11)

    一、相关配置 1、JAAS 配置文件 2、keytab 文件(kafka.service.keytab) 从 Kerberos 服务器上拷贝到目标机器 或 找运维人员要一份 3、Kerberos 配置文件(krb5.conf) krb5文件参数说明:krb5.conf(5) 从 Kerberos 服务器上拷贝到目标机器 或 找运维人员要一份 Tip: JDK11版本 sun.security.krb5.Config 类

    2024年02月15日
    浏览(54)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(41)
  • flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1

    阅读此文默认读者对docker、docker-compose有一定了解。 docker-compose运行了一个jobmanager、一个taskmanager和一个sql-client。 如下: 注意三个容器都映射了/opt/flink目录。需要先将/opt/flink目录拷贝到跟docker-compose.yml同一目录下,并分别重命名,如下图: 三个文件夹内容是一样的,只是

    2024年02月03日
    浏览(30)
  • Flink Connector 开发

    Flink 是新一代流 批统一的计算引擎 ,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。 Connector 的作用就相当于一个连接器 ,连接 Flink 计算引擎跟外界存储系统。 Flink 里有以下几种方式,当然也不限于这几种方式可以跟外界进行

    2024年02月03日
    浏览(38)
  • Flink RocketMQ Connector实现

    Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。 参考FlinkKafkaConsumer: 可以看到,自定义的Source,只需要实现SourceFunction。 创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法 需要

    2024年02月11日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包