流批一体计算引擎-4-[Flink]消费kafka实时数据

这篇具有很好参考价值的文章主要介绍了流批一体计算引擎-4-[Flink]消费kafka实时数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Python3.6.9 Flink 1.15.2消费Kafaka Topic
PyFlink基础应用之kafka
通过PyFlink作业处理Kafka数据

1 环境准备

1.1 启动kafka

(1)启动zookeeper
zkServer.sh start

(2)启动kafka
cd /usr/local/kafka/
nohup ./bin/kafka-server-start.sh ./config/server.properties >> /tmp/kafkaoutput.log 2>&1 &
或者
./bin/kafka-server-start.sh -daemon ./config/server0.properties

(3)查看进程如下
jps
10101 QuorumPeerMain
11047 Kafka

(4)kafka tools配置
C:\Windows\System32\drivers\etc\hosts

(5)查看日志文件
/tmp/kafkaoutput.log或者/usr/local/kafka/logs

(6)创建Topic主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic flink_kafakasource

(7)查看当前创建的Topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

(8)查看kafka版本
kafka_2.12-2.2.0.jar
可以看出scala的版本是2.12,kafka的版本是2.2.0

1.2 启动Flink

(1)启动flink
start-cluster.sh

(2)查看是否启用成功
jps
4704 TaskManagerRunner
4443 StandaloneSessionClusterEntrypoint

(3)关闭Flink
stop-cluster.sh

1.3 安装PyFlink

PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。

1.3.1 python3和pip3的配置

一、系统中安装了多个版本的python3

编译安装python时
其中–prefix选项是配置安装的路径,
若是不配置该选项,安装后可执行文件默认放在/usr/local/bin,
库文件默认放在/usr/local/lib,
配置文件默认放在/usr/local/etc,
其它的资源文件放在/usr /local/share,比较凌乱。

/usr/local/bin/python3.6m 
/usr/local/bin/python3.6m-config 
/usr/include/python3.6m

/usr/local/bin/python3.6 
/usr/local/bin/python3.6-config 
/usr/local/lib/python3.6 

/usr/local/bin/python3.10 
/usr/local/bin/python3.10-config
/usr/local/lib/python3.10 

二、环境变量path作用顺序

#echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/usr/local/jdk1.8.0_144/bin:/root/bin:/home/data/java/bin:/home/data/java/jre/bin
按照顺序进行显示

三、安装Pyflink

ln -s 源文件 目标文件
ln -s /usr/local/bin/python3.6 /usr/local/bin/python3
ln -s /usr/local/bin/pip3.6 /usr/local/bin/pip3


/usr/local/bin/python3.6 -m pip install --upgrade pip
pip3 install apache-flink==1.15.3 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

包文件安装后的位置
/usr/local/lib/python3.6/site-packages

1.3.2 配置Flink Kafka连接

(1)在https://mvnrepository.com/里输入flink kafka寻找对应版本的连接器
流批一体计算引擎-4-[Flink]消费kafka实时数据

(2)选择Flink对应的版本1.15.3,点击jar
流批一体计算引擎-4-[Flink]消费kafka实时数据
流批一体计算引擎-4-[Flink]消费kafka实时数据

(3)分别下载flink-connector-base和kafka-clients对应的jar包
流批一体计算引擎-4-[Flink]消费kafka实时数据

(4)将该jar包放置在python的lib目录下
/usr/local/lib/python3.6/dist-packages/pyflink/lib。
流批一体计算引擎-4-[Flink]消费kafka实时数据

(5)将该jar包放置在Flink的lib目录下
拷贝三个jar包到FLINK_HOME/lib下。
流批一体计算引擎-4-[Flink]消费kafka实时数据

2 消费kafka写入本地文件

2.1 flinkDemo.py

本应用采用pyflink+sql方式编写代码。

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env)  # , TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
sourceKafkaDdl = """
create table sourceKafka(
id int,name varchar
)
with(
 'connector'='kafka',
 'topic'='flink_kafakasource',
 'properties.bootstrap.servers'='192.168.43.48:9092',
 'scan.startup.mode'='latest-offset',
 'format'='json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.INT(), DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result.csv", ",", 1, WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
st_env.execute_sql("""
    INSERT INTO csvTableSink
        select * from sourceKafka
""").wait()

2.2 执行方式

2.2.1 方式一:直接IDEA中运行

无需安装flink。

(1)安装pyflink
pip3 install apache-flink==1.15.3

(2)配置pycharm的flink环境:
首先最重要的是版本问题,这里给出我的相关版本配置
kafka:2.2.0
jdk:1.8.0_201
apache-flink: 1.15.3
相应的jar包版本。
flink-connector-base-1.15.3.jar
flink-connector-kafka-1.15.3.jar
kafka-clients-2.8.1.jar

将jar包放入External Libraries下的site-packages下的pyflink下的lib中。

(3)运行
python3 flinkDemo.py

2.2.2 方式二:命令行提交到Flink

/usr/local/flink-1.15.3/bin/flink run -py flinkDemo.py
或
/usr/local/flink-1.15.3/bin/flink run --python flinkDemo.py
显示如下:
Job has been submitted with JobID 1f3d2ffc0b0c5f9274040fd008a5ec17

流批一体计算引擎-4-[Flink]消费kafka实时数据

2.3 模拟数据

打开kafka生产者,通过客户端生产数据。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_kafakasource
{"id":2,"name":"查询kafka后存储到cvs文件中"}

2.4 查看Flink侧结果

流批一体计算引擎-4-[Flink]消费kafka实时数据

3 消费kafka写入kafka

直接本地IDEA中运行即可。文章来源地址https://www.toymoban.com/news/detail-455176.html

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env)  # , TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")

sourceKafkaDdl = """
create table sourceKafka(
id int,name varchar
)
with(
 'connector'='kafka',
 'topic'='flink_kafakasource',
 'properties.bootstrap.servers'='192.168.43.48:9092',
 'scan.startup.mode'='latest-offset',
 'format'='json'
)
"""

sinkKafkaDdl = """
create table sinkKafka(
id int,name varchar
)
with(
 'connector'='kafka',
 'topic'='result',
 'properties.bootstrap.servers'='192.168.43.48:9092',
 'scan.startup.mode'='latest-offset',
 'format'='json'
)
"""
st_env.execute_sql(sourceKafkaDdl)
st_env.execute_sql(sinkKafkaDdl)

st_env.execute_sql("""
    INSERT INTO sinkKafka
        select * from sourceKafka
""").wait()

到了这里,关于流批一体计算引擎-4-[Flink]消费kafka实时数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(2):Flink关键特性

    目录 Flink关键特性 流式处理 丰富的状态管理 丰富的时间语义支持    Data pipeline 容错机制 Flink SQL CEP in SQL Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis )的实时数据,也可以从各种的数据源中消费有界的历史数据。同样, Fli

    2024年02月10日
    浏览(44)
  • Flink流批一体计算(9):Flink Python

    目录 使用Python依赖 使用自定义的Python虚拟环境 方式一:在集群中的某个节点创建Python虚拟环境 方式二:在本地开发机创建Python虚拟环境 使用JAR包 使用数据文件 使用Python依赖 通过以下场景为您介绍如何使用Python依赖: 使用自定义的Python虚拟环境 使用第三方Python包 使用J

    2024年02月12日
    浏览(39)
  • Flink流批一体计算(3):FLink作业调度

    架构 所有的分布式计算引擎都需要有集群的资源管理器,例如:可以把MapReduce、Spark程序运行在YARN集群中、或者是Mesos中。Flink也是一个分布式计算引擎,要运行Flink程序,也需要一个资源管理器。而学习每一种分布式计算引擎,首先需要搞清楚的就是:我们开发的分布式应用

    2024年02月10日
    浏览(46)
  • Flink流批一体计算(4):Flink功能模块

    目录 Flink功能架构 Flink输入输出 Flink功能架构 Flink是分层架构的分布式计算引擎,每层的实现依赖下层提供的服务,同时提供抽象的接口和服务供上层使用。 Flink 架构可以分为4层,包括Deploy部署层、Core核心层、API层和Library层 部署层:主要涉及Flink的部署模式。Flink支持多种

    2024年02月10日
    浏览(50)
  • Flink流批一体计算(5):部署运行模式

    目录 集群运行模式 1.local模式 2.standalone模式 3.Flink on YARN模式 本地模式 Standalone 模式 Flink on Yarn 模式 集群运行模式 类似于 Spark , Flink 也有各种运行模式,其中主要支持三种: local 模式、 standalone 模式以及 Flink on YARN 模式。 每种模式都有特定的使用场景,接下来一起了解一

    2024年02月10日
    浏览(41)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(43)
  • 4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(38)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包