flink执行环境和读取kafka以及自定义数据源操作

这篇具有很好参考价值的文章主要介绍了flink执行环境和读取kafka以及自定义数据源操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

创建执行环境

1. getExecutionEnvironment

2. createLocalEnvironment

3. createRemoteEnvironment

 执行模式(Execution Mode)

1. BATCH 模式的配置方法

2. 什么时候选择 BATCH 模式

触发程序执行

数据源操作

读取kafka数据源操作 

自定义Source


 

创建执行环境

        编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是
StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的
方式,就是调用这个类的静态方法,具体有以下三种。

1. getExecutionEnvironment

        最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文
直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar
包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方
法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。

2. createLocalEnvironment

        这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果
不传入,则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment localEnv =
StreamExecutionEnvironment.createLocalEnvironment();

3. createRemoteEnvironment

        这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定
要在集群中运行的 Jar 包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主机名
1234, // JobManager 进程端口号
"path/to/jarFile.jar" // 提交给 JobManager JAR
);
        在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程
序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。关于时间语义和容错
机制,我们会在后续的章节介绍。

 执行模式(Execution Mode)

        上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment ,顾名思义它应该是
做流处理的。那对于批处理,又应该怎么获取执行环境呢?
        在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment
的静态方法,返回它的对象:
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet ;对应的调用的一整
套转换方法,就是 DataSet API 。这些我们在第二章的批处理 word count 程序中已经有了基本
了解。
        而从 1.12.0 版本起, Flink 实现了 API 上的流批统一。 DataStream API 新增了一个重要特
性:可以支持不同的“执行模式”( execution mode ),通过简单的设置就可以让一段 Flink 程序
在流处理和批处理之间切换。这样一来, DataSet API 也就没有存在的必要了。
  流执行模式(STREAMING
        这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。
 批执行模式(BATCH
        专门用于批处理的执行模式, 这种模式下, Flink 处理作业的方式类似于 MapReduce 框架。
对于不会持续计算的有界数据,我们用这种模式处理会更方便。
自动模式(AUTOMATIC
        在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

1. BATCH 模式的配置方法

        由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。
主要有两种方式:
1 )通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
        在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH
(2)通过代码配置
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。
        建议: 不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指
定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在
代码中硬编码( hard code )的方式可扩展性比较差,一般都不推荐。

2. 什么时候选择 BATCH 模式

        我们知道,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界
流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的;而 BATCH
模式仅能用于有界数据。
        看起来 BATCH 模式似乎被 STREAMING 模式全覆盖了,那还有必要存在吗?我们能不
能所有情况下都用流处理模式呢?
        当然是可以的,但是这样有时不够高效。
        我们可以仔细回忆一下 word count 程序中, 批处理和流处理输出的不同:在 STREAMING
模式下,每来一条数据,就会输出一次结果(即使输入数据是有界的);而 BATCH 模式下,
只有数据全部处理完之后,才会一次性输出结果。 最终的结果两者是一致的,但是流处理模式
会将更多的中间结果输出。在本来输入有界、只希望通过批处理得到最终的结果的场景下,
STREAMING 模式的逐个输出结果就没有必要了。
        所以总结起来,一个简单的原则就是:用 BATCH 模式处理批量数据,用 STREAMING
模式处理流式数据。因为数据有界的时候,直接输出结果会更加高效;而当数据无界的时候 ,
们没得选择——只有 STREAMING 模式才能处理持续的数据流.
        当然,在后面的示例代码中,即使是有界的数据源,我们也会统一用 STREAMING 模式
处理。这是因为我们的主要目标还是构建实时处理流数据的程序,有界数据源也只是我们用来
测试的手段。

触发程序执行

        有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种
转换操作,最后输出结果到外部系统。
        需要注意的是,写完输出(sink )操作并不代表程序已经结束。因为当 main() 方法被调用
时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据
——因为数据可能还没来。 Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,
这也被称为“延迟执行”或“懒执行”( lazy execution )。
        所以我们需要显式地调用执行环境的 execute() 方法,来触发程序执行。 execute() 方法将一
直等待作业完成,然后返回一个执行结果( JobExecutionResult )。
env.execute();

数据源操作

读取kafka数据源操作 

        Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传
输方式,恰恰和流处理是完全一致的。所以可以说 Kafka Flink 天生一对,是当前处理流式
数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输, Flink 进行分
析计算,这样的架构已经成为众多企业的首选,如图 5-3 所示。

flink执行环境和读取kafka以及自定义数据源操作文章来源地址https://www.toymoban.com/news/detail-409158.html

        略微遗憾的是,与 Kafka 的连接比较复杂, Flink 内部并没有提供预实现的方法。所以我
们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。
        好在Kafka Flink 确实是非常契合,所以 Flink 官方提供了连接工具 flink-connector-kafka
直接帮我们实现了一个消费者 FlinkKafkaConsumer ,它就是用来读取 Kafka 数据的
SourceFunction
        所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。 Flink
方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本
只支持 0.10.0 版本以上的 Kafka ,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依
赖版本。这里我们需要导入的依赖如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
然后调用 env.addSource() ,传入 FlinkKafkaConsumer 的对象实例就可以了。
 
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class SourceKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new
FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
stream.print("Kafka");
 env.execute();
 }
}
创建 FlinkKafkaConsumer 时需要传入三个参数:
         第一个参数 topic ,定义了从哪些主题中读取数据。可以是一个 topic ,也可以是 topic
列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据
时, Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
         第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema Kafka
息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中
使用的 SimpleStringSchema ,是一个内置的 DeserializationSchema ,它只是将字节数
组简单地反序列化成字符串。 DeserializationSchema KeyedDeserializationSchema
公共接口,所以我们也可以自定义反序列化逻辑。
         第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。

自定义Source

        大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,
我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,
又该怎么办呢?
        那就只好自定义实现 SourceFunction 了。
        接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:
run() cancel()
         run() 方法:使用运行时上下文对象( SourceContext )向下游发送数据;
⚫        cancel() 方法:通过标识位控制退出循环,来达到中断数据源的效果。
代码如下:
我们先来自定义一下数据源:
package com.atmk.stream.app;

import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author:lss
 * @date:2022/11/3 17:18
 * @description:some
 */
public class ClickSource implements SourceFunction<Event> {
    //声明一个变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
       //在指定数据集中随机选取数据
        Random random = new Random();
        String[] users = {"Mary","Bob","Alice","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=1"};
        while (running){
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            //隔一秒生成一个点击事件,方面观测
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
        这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到 ClickSource() 数据源,使
用上面的代码就可以了。
        下面的代码我们来读取一下自定义的数据源。有了自定义的 source function ,接下来只要
调用 addSource() 就可以了:
env.addSource(new ClickSource())
下面是完整的代码:
package com.atmk.stream.app;

import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author:lss
 * @date:2022/11/3 17:26
 * @description:some
 */
public class SourceCustom {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用自定义的source function,调用addSource方法
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        stream.print("SourceCustom");
        env.execute();
    }
}
        这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1 ,如果数据源设
置为大于 1 的并行度,则会抛出异常。如下程序所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.addSource(new ClickSource()).setParallelism(2).print();
 env.execute();
 }
}
输出的异常如下:
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism
of non parallel operator must be 1.
所以如果我们想要自定义并行的数据源的话,需要使用 ParallelSourceFunction ,示例程序
如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.addSource(new CustomSource()).setParallelism(2).print();
 env.execute();
 }
 public static class CustomSource implements ParallelSourceFunction<Integer> 
{
 private boolean running = true;
 private Random random = new Random();
 @Override
 public void run(SourceContext<Integer> sourceContext) throws Exception {
 while (running) {
 sourceContext.collect(random.nextInt());
 }
 }
 @Override
 public void cancel() {
 running = false;
 }
 }
}
输出结果如下:
2> -686169047
2> 429515397
2> -223516288
2> 1137907312
2> -380165730
2> 2082090389

到了这里,关于flink执行环境和读取kafka以及自定义数据源操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(48)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

    目标 : 了解数据源的格式及实现模拟数据的生成 路径 step1:数据格式 step2:数据生成 实施 数据格式 消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号

    2024年02月04日
    浏览(43)
  • Flink读取kafka数据报错

    报错如下: 解决办法: 修改/usr/local/wyh/kafka/kafka_2.12-2.8.1/config下面的server.properties,默认该配置是被注释掉的额,所以需要放开注释并且配置Host:

    2024年02月13日
    浏览(49)
  • flink 从kafka读取数据报错

    报错: Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandl

    2024年02月02日
    浏览(45)
  • 4.3、Flink任务怎样读取Kafka中的数据

    目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分区

    2024年02月13日
    浏览(41)
  • 源码解析Flink源节点数据读取是如何与checkpoint串行执行

    源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。 本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,

    2024年02月14日
    浏览(54)
  • 自定义Flink SourceFunction定时读取数据库

    Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的

    2024年02月02日
    浏览(36)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • Flink学习之旅:(三)Flink源算子(数据源)

            Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合 数据文件 Socket数据 kafka数据 自定义Source         创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用) 运行结果

    2024年02月06日
    浏览(44)
  • SpringBoot从数据库读取数据数据源配置信息,动态切换数据源

            首先准备多个数据库,主库smiling-datasource,其它库test1、test2、test3         接下来,我们在主库smiling-datasource中,创建表databasesource,用于存储多数据源相关信息。表结构设计如下         创建好表之后,向表databasesource中存储test1、test2、test3三个数据库的相关配置

    2024年01月16日
    浏览(68)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包