flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

这篇具有很好参考价值的文章主要介绍了flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink学习笔记

前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的!

Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王!”明天周日除了复习前面知识点之外,也要继续努力学习接下来的知识点,继续加油!

二、Flink 流批一体 API 开发

4. 数据输出 Sink

4.1 print 打印

打印是最简单的一个Sink,通常是用来做实验和测试时使用。

实例:socket 数据源,查看进程编号最终输出 sink 之 print 打印

package cn.itcast.day06.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-17 22:27:48
 * @description TODO:print
 */
public class PrintSinkDemo {
    public static void main(String[] args) throws Exception {
        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        // socket 数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 获取 lines 数据源并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);

        lines.print();
        lines.addSink(new MyPrintSink()).name("my-print-sink");
        env.execute();
    }

    private static class MyPrintSink extends RichSinkFunction<String> {
        // 这一处定义很重要,不然 indexOfThisSubtask 只能在一个方法中使用!
        private int indexOfThisSubtask;
        @Override
        public void open(Configuration parameters) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}

结果:

执行环境默认的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop

总结:

  • 打印输出,也是一种 sink
4.2 writeAsText 以文本格式输出

该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是 TextOutputFormat 格式写入的。

实例:socket 数据源,将数据输出到文本 Text 中

package cn.itcast.day06.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 22:40:48
 * @description TODO:writeAsText
 */
public class WriteSinkDemo {
    public static void main(String[] args) throws Exception {
        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //获取DataStream的并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);

        lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
        env.execute();

    }
}

结果:

output 文件夹下出现以数字命名的文件
内容为 socket 数据源输出,加上了 \n 换行符
目录中的文件名称是该 Sink 所在 subtask 的 Index + 1

总结:

  • 1- writeAsText 输出数据以小文件方式,文件命名为 subtask 的 Index + 1
  • 2- FileSystem.WriteMode 有两种,一种是 OVERWRITE,可以覆盖同名文件,一种是 NO_OVERWRITE,同名文件就报错
4.3 writeAsCsv 以 csv 格式输出

该方法是将数据以 csv 格式写入到指定的目录中,本质上使用的是 CsvOutputFormat 格式写入的。

实例:socket 数据源,将数据输出到文本 csv 中

package cn.itcast.day06.sink;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.core.fs.FileSystem;
/**
 * @author lql
 * @time 2024-02-17 22:52:12
 * @description TODO:将DataSet数据写入到csv文件中
 */
public class CsvSink {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //需先建立文件
        String filePath = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";
        //添加数据
        Tuple7<Integer, String, Integer, Integer, String, String, Long> row = new Tuple7<>(15, "zhangsan", 40, 1, "CN", "2020-09-08 00:00:00", 1599494400000L);

        //转换为dataSet,利用 数据源中 fromElements 可以接受 [列表或元组] 的属性
        DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>> dataSet = (DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>>) env.fromElements(row);

        //将内容写入到File中,如果文件已存在,将会被复盖
        dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        env.execute();
    }
}

结果:

在指定文件中,生成了 csv 数据

总结:

  • 1- 首先需要定义数据存放的位置,精确到 .scv
  • 2- 最终需要将并行度设置为 1,才能生成一个完整的文件
4.4 writeUsingOutputFormat 指定格式输出

该方法是将数据已指定的格式写入到指定目录中

package cn.itcast.day06.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 23:03:24
 * @description TODO:将数据已指定的格式写入到指定目录中
 */
public class writeUsingOutputFormatSink {
    public static void main(String[] args) throws Exception {
        //1:获取流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用env的fromElements创建一个非并行的DataStreamSource
        DataStreamSource<String> words = env.fromElements(
                "hadoop","spark","flink","hbase","flink","spark"
        );

        // 对拆分后的单词,每个单词记一次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);

        result.writeUsingOutputFormat(new TextOutputFormat<>(new Path("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
        env.execute();

    }
}

结果:

在指定目录下,生成 n(电脑并行度数量) 个文本文件

总结:

  • 1- writeAsText 和 writeAsCsv 方法底层都是调用了 writeUsingOutputFormat 方法
  • 2- 这种方法更加灵活
4.5 writeToSocket 输出到网络端口

该方法是将数据输出到指定的Socket网络地址端口。

实例:socket 数据源,node1:9999 写数据到 node1:8888

package cn.itcast.day06.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-17 23:12:03
 * @description TODO:writeToSocket
 */
public class WriteToSocketDemo {
    public static void main(String[] args) throws Exception {
        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
        
        //获取DataStream的并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);
        
        // 第三个参数是数据输出的序列化格式 SerializationSchema
        lines.writeToSocket("node1",8888,new SimpleStringSchema());
        env.execute();
    }
}

结果:

node1:8888 实时接收到 node1:9999 写入的数据

总结:

  • 端口号需要提前开启
4.6 基于本地集合的 Sink

数据分类集合输出

实例:数据打印输出,error 输出,可以输出到:Stdout,Stderr,采集为本地集合

package cn.itcast.day06.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 23:18:12
 * @description TODO:数据可以输出到:Stdout,Stderr,采集为本地集合
 */
public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Integer, String>> dataSource = env.fromElements(
                Tuple2.of(1, "zhangsan"),
                Tuple2.of(2, "lisi"),
                Tuple2.of(3, "wangwu"),
                Tuple2.of(4, "zhaoliu")
        );

        //2.sink
        dataSource.print();
        dataSource.printToErr();

        env.execute();
    }
}

结果:

黑色字体输出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)

红色字体输出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)

总结:文章来源地址https://www.toymoban.com/news/detail-833949.html

  • 1- printToErr 可以进行分类输出
  • 2- 并行度是1 能输出文件
  • 3- 并行度是n 能输出文件夹

到了这里,关于flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(43)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(45)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(41)
  • Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataSt

    2024年02月11日
    浏览(44)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(43)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(37)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(42)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(38)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包