Flink基础之DataStream API

这篇具有很好参考价值的文章主要介绍了Flink基础之DataStream API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

流的合并

  1. union联合:被unioin的流中的数据类型必须一致
  2. connect连接:合并的两条流的数据类型可以不一致
    • connec后,得到的是ConnectedStreams
    • 合并后需要根据数据流是否经过keyby分区
      • coConnect: 将两条数据流合并为同一数据类型
      • keyedConnect
public class Flink09_UnionConnectStream {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3, 4, 5, 6, 7);
        DataStreamSource<Integer> ds2 = env.fromElements(8, 9);
        DataStreamSource<String> ds3 = env.fromElements("a", "b", "c");

        DataStream<Integer> unionDs = ds1.union(ds2);
        unionDs.print();

        //connect
        ConnectedStreams<Integer, String> connectDs = ds1.connect(ds3);
        //处理
        connectDs.process(
                new CoProcessFunction<Integer, String, String>() {
                    @Override
                    public void processElement1(Integer value, CoProcessFunction<Integer, String, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(value.toString());
                    }

                    @Override
                    public void processElement2(String value, CoProcessFunction<Integer, String, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(value.toUpperCase());
                    }
                }
        ).print("connect");

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

Sink输出算子

目前所使用的大多数Sink, 都是基于2PC的方式来保证状态精确一次性。2PC 即 two face commit, 两阶段提交,该机制的实现必须要开启Flink的检查点。

  1. FileSink:fileSink = FileSink.<数据流泛型>forRowFormat(输出路径, 数据流编码器)
    • 文件滚动策略 .withRollingPolicy().builder()
      • 文件多大滚动.withMaxPartSize(MemorySize.parse(“10m”))
      • 多长时间滚动一次 .withRolloverInterval(Duration.ofSeconds(10))
      • 多久不活跃滚动 .withInactivityInterval(Duration.ofSeconds(5))
    • 目录滚动策略:一般设置为按照天或者小时或者其他时间间隔
    • 文件输出配置:可以设置输出文件的前缀和后缀
public class Flink01_FileSink {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(2000);

        //默认是最大并行度
        env.setParallelism(1);

        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);

        //FileSink
        FileSink<String> stringFileSink = FileSink.<String>forRowFormat(new Path("output"),
                        new SimpleStringEncoder<>())
                .withRollingPolicy(//文件滚动策略
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(MemorySize.parse("10m"))//文件多大滚动
                                .withRolloverInterval(Duration.ofSeconds(10))//多久滚动
                                .withInactivityInterval(Duration.ofSeconds(5))//多久不活跃滚动
                                .build()
                )
                .withBucketAssigner(//目录滚动策略
                        new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm")
                )
                .withBucketCheckInterval(1000L)//检查的间隔
                .withOutputFileConfig(OutputFileConfig.builder()
                        .withPartPrefix("atguigu")
                        .withPartSuffix(".log")
                        .build())
                .build();

        ds.map(JSON::toJSONString).sinkTo(stringFileSink);

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
  1. Kafka Sink(重点)
    • 生产者对象:KafkaProducer
    • Kafka生产者分区策略:
      • 如果明确指定分区号,直接用
      • 如果没有指定分区号,但是Record中带了key,就按照key的hash值对分区数取余得到分区号
      • 如果没有指定相关分区号,使用粘性分区策略
    • 生产者相关配置
      • key.serializer : key的序列化器
      • value.serializer: value的序列化器
      • bootstrap.servers: 集群位置
      • retries: 重试次数
      • batch.size 批次大小
      • linger.ms 批次超时时间
      • acks 应答级别
      • transaction.id 事务ID
    • Shell中开启Kafka消费者的命令:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
public class Flink02_KafkaSink {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        //开启检查点
        env.enableCheckpointing(5000);

        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);

        //KafkaSink
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("first")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )

                //语义
                //AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作
                //EXACTLY_ONCE:精确一次
                //kafka transaction timeout is larger than broker
                //kafka超时时间:1H
                //broker超时时间:15分钟

//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障
                .setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
//                .setProperty(ProducerConfig.RETRIES_CONFIG,"10")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"600000")
                .build();

        ds.map(
                JSON::toJSONString
        ).sinkTo(kafkaSink);//写入到kafka 生产者

        //shell 消费者:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first


        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

为了在Shell中开启消费者更为便捷,这里写了一个小脚本,用来动态的设置主题并开启相应的Kafka消费者,脚本名称为kc.sh.文章来源地址https://www.toymoban.com/news/detail-753746.html

#!/bin/bash

# 检查参数数量
if [ $# -lt 1 ]; then
  echo "Usage: $0 <topic>"
  exit 1
fi

# 从命令行参数获取主题
topic=$1

# Kafka配置
bootstrap_server="hadoop102:9092"

# 构建kafka-console-consumer命令
consumer_command="kafka-console-consumer.sh --bootstrap-server $bootstrap_server --topic $topic"

# 打印消费命令
echo "Running Kafka Consumer for topic: $topic"
echo "Command: $consumer_command"

# 执行消费命令
$consumer_command

到了这里,关于Flink基础之DataStream API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink DataStream API CDC同步MySQL数据到StarRocks

    Flink:1.16.1 pom文件如下 Java代码 SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息 DataCenterShine实体类,字段与数据库一一对应。 StarRocksPrimary 实体类 FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。 TableName 注解类,

    2024年02月03日
    浏览(75)
  • Flink DataStream API详解

    参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html Data Sources Source是程序读取其输入的位置,您可以使用 env.addSource(sourceFunction) 将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定

    2024年02月14日
    浏览(40)
  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(42)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(37)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(52)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(45)
  • flink cdc DataStream api 时区问题

    以postgrsql 作为数据源时,Date和timesatmp等类型cdc同步读出来时,会发现一下几个问题: 源表: sink 表: 解决方案:在自定义序列化时进行处理。 java code scala code mysql cdc也会出现上述时区问题,Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是

    2024年02月07日
    浏览(37)
  • 《Flink学习笔记》——第五章 DataStream API

    一个Flink程序,其实就是对DataStream的各种转换,代码基本可以由以下几部分构成: 获取执行环境 读取数据源 定义对DataStream的转换操作 输出 触发程序执行 获取执行环境和触发程序执行都属于对执行环境的操作,那么其构成可以用下图表示: 其核心部分就是Transform,对数据

    2024年02月10日
    浏览(41)
  • Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 状态与容错 - 使用状态 相关文档: 有状态流处理背后的概念:Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记 Redis 过期 key 的删除机制:Redis|过期 key 的删除机制 学习笔记如下: 如果要使用键控状态,则必须要为 DataS

    2024年02月03日
    浏览(42)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(1)源算子(Source)

    DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成: Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会

    2024年01月22日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包