5、Flink 的 source、transformations、sink的详细示例(一)

这篇具有很好参考价值的文章主要介绍了5、Flink 的 source、transformations、sink的详细示例(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了source、transformations和sink的基本用法,下一篇将介绍各自的自定义用法。

一、source-transformations-sink示例

1、maven依赖

下文中所有示例都是用该maven依赖,除非有特殊说明的情况。

<properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.12.0</flink.version>
    </properties>

    <dependencies>
            <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.1.4</version>
		</dependency>
    </dependencies>

2、source示例

1)、基于Collection

import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class Source_Collection {

    /**
     * 一般用于学习测试时使用 
     * 1.env.fromElements(可变参数); 
     * 2.env.fromColletion(各种集合);
     * 3.env.generateSequence(开始,结束); 
     * 4.env.fromSequence(开始,结束);
     * 
     * @param args 基于集合
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> ds1 = env.fromElements("i am alanchan", "i like flink");
        DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan", "i like flink"));
        DataStream<Long> ds3 = env.generateSequence(1, 10);//已过期,使用fromSequence方法
        DataStream<Long> ds4 = env.fromSequence(1, 10);

        // transformation

        // sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();

        // execute
        env.execute();
    }

}
//运行结果
3> 3
10> 10
6> 6
1> 1

9> 9

4> 4
2> 2

7> 7

5> 5

14> 1
14> 2
14> 3
14> 4
14> 5

14> 6
14> 7

11> 8
11> 9
11> 10

8> 8

11> i am alanchan
12> i like flink
10> i like flink
9> i am alanchan

2)、基于文件

//自己创建测试的文件,内容都会如实的展示了,故本示例不再提供
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class Source_File {

    /**
     * env.readTextFile(本地/HDFS文件/文件夹),压缩文件也可以
     * 
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
        DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");
        DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");
        DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");

        // transformation

        // sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();

        // execute
        env.execute();

    }

}
// 运行结果
//单个文件
8> i am alanchan
12> i like flink
1> and you ?

//目录
15> 1, "英文", 90
1> 2, "数学", 70
11> and you ?
8> i am alanchan
9> i like flink
3> 3, "英文", 86
13> 1, "数学", 90
5> 1, "语文", 50

//tar.gz
12> i am alanchan
12> i like flink
12> and you ?

//hdfs
6> (hive,2)
10> (flink,4)
15> (hadoop,3)

3)、基于socket

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *         在192.168.10.42上使用nc -lk 9999 向指定端口发送数据
 *         nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 
 *         如果没有该命令可以下安装 yum install -y nc
 *         
 */
public class Source_Socket {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //source
        DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
        
        //transformation
        

        //sink
        lines.print();

        //execute
        env.execute();
    }

}

验证步骤:
1、先启动该程序
2、192.168.10.42上输入
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean
3、观察应用程序的控制台输出
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean

3、transformations示例

对流数据中的单词进行统计,排除敏感词vx:alanchanchn

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TransformationDemo {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

        // transformation
        DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split(",");
                for (String word : arr) {
                    out.collect(word);
                }
            }
        });

        DataStream<String> filted = words.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.equals("vx:alanchanchn");// 如果是vx:alanchanchn则返回false表示过滤掉
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                });
        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);



        // sink
        result.print();

        // execute
        env.execute();

    }

}

验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、在192.168.10.42中输入测试数据,如下
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean
4、观察应用程序的控制台,截图如下
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean

4、sink示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan 
 */
public class SinkDemo {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
        System.setProperty("HADOOP_USER_NAME", "alanchan");
        // transformation
  
        // sink
//        ds.print();
//        ds.print("输出标识");

        // 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件

        ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(1);

        // execute
        env.execute();
    }

}

并行度为1的输出,到hdfs看结果
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean
并行度为2的时候,生成了2个文件,内容如下
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean
5、Flink 的 source、transformations、sink的详细示例(一),# Flink专栏,flink,大数据,flink source,flink 转换处理,flink sink,流批一体,datastrean
以上,简单的介绍了source、transformations和sink的使用示例。文章来源地址https://www.toymoban.com/news/detail-628145.html

到了这里,关于5、Flink 的 source、transformations、sink的详细示例(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包