flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1

这篇具有很好参考价值的文章主要介绍了flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

阅读此文默认读者对docker、docker-compose有一定了解。

环境

docker-compose运行了一个jobmanager、一个taskmanager和一个sql-client。

如下:

version: "2.2"
services:
  jobmanager:
    image: flink:1.18.0-scala_2.12
    container_name: jobmanager
    ports:
      - "7081:8081"
    command: jobmanager
    volumes:
      - ./jobmanager:/opt/flink
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.18.0-scala_2.12
    container_name: taskmanager1
    depends_on:
      - jobmanager
    command: taskmanager
    volumes:
      - ./taskmanager1:/opt/flink
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 32
  sql-client:
    image: flink:1.18.0-scala_2.12
    container_name: sql-client-1
    command: bin/sql-client.sh
    volumes:
      - ./sql-client:/opt/flink
    depends_on:
      - jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        rest.address: jobmanager

注意三个容器都映射了/opt/flink目录。需要先将/opt/flink目录拷贝到跟docker-compose.yml同一目录下,并分别重命名,如下图:

flink sql kafka sasl认证,Flink,Kafka,flink,大数据

三个文件夹内容是一样的,只是名字不一样。

以上环境介绍完毕。

添加fllink-connector-kafka驱动

在maven官网下载相应jar包,分别放入上述三个文件夹的lib目录下。例如jobmanager:

flink sql kafka sasl认证,Flink,Kafka,flink,大数据

启动sql-client,我用docker-compose启动的,因此命令为:docker-compose run sql-client.

创建kafka表

CREATE TABLE TestTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `presetBit` STRING,
  `imageTime` STRING,
  `imageName` STRING,
  `thumbnailWidth` BIGINT,
  `size` BIGINT,
  `thumbnailSize` BIGINT,
  `behavior` STRING,
  `imageUri` STRING,
  `presetId` STRING
) WITH (
  'connector'='kafka',
  'topic'='mytopic',
  'properties.bootstrap.servers'='localhost:9092',
  'properties.group.id'='testGroup',
  'scan.startup.mode'='earliest-offset',
  'format'='json',
  'properties.security.protocol'='SASL_PLAINTEXT',
  'properties.sasl.mechanism'='PLAIN',
  'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="your-username" password="your-pwd";'
);

说明:

flink sql kafka sasl认证,Flink,Kafka,flink,大数据

①标识字段为kafka自带字段,topic中没有也会自带。

②标识的字段为topic中存储的字段,根据自己topic来。

③为自己的topic名称

④为kafka集群地址

⑤后面的username和password需要根据实际情况修改。

创建表以后执行select * from TestTable,可以看到类似下图的内容:

flink sql kafka sasl认证,Flink,Kafka,flink,大数据

flink官网留下的坑

坑主要出现在最后一行。

flink官网是这样写的

flink sql kafka sasl认证,Flink,Kafka,flink,大数据

首先指定的类不对,应该是org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule,如果按照官网写会报错:Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule。

其次,username和password的双引号不需要写反斜杠,写反斜杠反而会报错。文章来源地址https://www.toymoban.com/news/detail-769917.html

到了这里,关于flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 开启 SASL/PLAINTEXT 认证及 ACL

    在之前的开发工作中,需要开发使用用户名密码的方式连接 Kafka 并对 Kafka 数据进行处理,但是客户并没有提供可以测试的环境,于是就自己着手搭建了一套单节点的 Kafka 并开启 SASL 认证。 1、组件版本 组件 版本 kafka 2.11-2.22 zookeeper 3.6.2 2、下载文件 KAFKA:下载地址 ZOOKEEPER:

    2023年04月08日
    浏览(50)
  • 决策树模型 IBM SPSS Modeler 18.0

    (该文为个人的一个记录,也许有错,可以参考下) 决策树模型建立 1.点击源、Excel,在空白处得到一个Excel 点击生成的Excel,导入要处理的数据,再点确定  PS:点击上图中的预览可以查看表格数据  2.点击字段选项、类型,在空白得到一个类型图标  点生成的类型图标,点

    2024年02月05日
    浏览(34)
  • FPGA实现PID控制器——基于Quartus prime 18.0

    目录  1. PID控制器和离散化PID控制器 1.1 PID控制器 1.1.1 P控制器 1.1.2 稳态误差和I控制器 1.1.3 超调和D控制器 1.2 离散式PID控制器——位置式PID控制器 2.PID控制系统Simulink仿真 3.Verilog代码编写和Modelsim仿真 3.1 误差计算模块和PID算法模块编写 3.1.1 误差计算模块 3.1.2 PID算法模块 3

    2024年02月03日
    浏览(44)
  • flink 1.18 sql demo

    更换flink-table-planner 为 flink-table-planner-loader pom.xml demo

    2024年01月18日
    浏览(42)
  • flink 1.18 sql gateway /sql gateway jdbc

    一 sql gateway 注意 之所以直接启动gateway 能知道yarn session 主要还是隐藏的配置文件,但是配置文件可以被覆盖,多个session 保留最新的applicationid 1 安装flink (略) 2 启动sql-gatway(sql-gateway 通过官网介绍只能运行在session 任务中) 2-1 启动gateway 之前先启动一个flink session ./bin/yarn-

    2024年01月16日
    浏览(48)
  • 【flink sql】kafka连接器

    Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 前面已经介绍了flink sql创建表的语法及说明:【flink sql】创建表 这篇博客聊聊怎么通过flink sql连接kafka 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(

    2024年02月08日
    浏览(52)
  • Flink系列之:Elasticsearch SQL 连接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。 连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。 如果 DDL 中没有定义主键,那么

    2024年02月04日
    浏览(56)
  • Flink系列之:JDBC SQL 连接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。 如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外

    2024年02月02日
    浏览(48)
  • Flink系列之:Upsert Kafka SQL 连接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一

    2024年01月16日
    浏览(45)
  • Flink系列之:Apache Kafka SQL 连接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 以下示例展示了如何创建 Kafka 表: 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(W)。 只读列必须声明为 VI

    2024年02月01日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包