初探Flink的Java实现流处理和批处理

这篇具有很好参考价值的文章主要介绍了初探Flink的Java实现流处理和批处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

端午假期,夏日炎炎,温度连续40度以上,在家学习Flink相关知识,记录下来,方便备查。
开发工具:IntelliJ Idea
Flink版本:1.13.0
本次主要用Flink实现批处理(DataSet API) 和 流处理(DataStream API)简单实现。

第一步、创建项目与添加依赖

1)新建项目
打开Idea,新建Maven项目,包和项目命名,点击确定进入项目。
初探Flink的Java实现流处理和批处理
2)引入依赖
pom.xml文件中添加依赖,即Flink-java、flink-streaming、slf4j等, 可参考以下代码。

<properties>
    <flink.version>1.13.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.2</slf4j.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 日志-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.16.0</version>
    </dependency>
</dependencies>

3)添加日志文件
resource目录下添加日志文件log4j.properties,内容如下所示。

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=@-4r [%t] %-5p %c %x - %m%n
第二步、构造数据集

在项目下新建 input 文件夹,用于存放数据集,在其下新建 words.txt 文件,即测试的数据集,如下图所示。
初探Flink的Java实现流处理和批处理

第三步、编写业务代码

读取数据集中内容,并进行单词的字数统计。新建 BatchWordCout 类,引入分6个步骤实现数据集的读取与打印。
方式一、批处理 DataSet API
主要处理步骤为
1)创建执行环境;
2)从环境中读取数据;
3)将每行数据进行分词,转化成二元组类型 扁平映射;
4)按照word进行分组;
5)分组内进行聚合统计;
6)打印结果
批处理 DataSet API 写法如下所示。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1、创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2、从环境中读取数据
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
        // 3、将每行数据进行分词,转化成二元组类型 扁平映射
        FlatMapOperator<String,Tuple2<String,Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String,Long>> out) -> {
            // 将每行文本进行拆分
            String[] words = line.split(" ");
            // 将每个单词转化成二元组
            for(String word : words){
                out.collect(Tuple2.of(word,1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        // 4、按照word进行分组
         UnsortedGrouping<Tuple2<String,Long>> wordAndOneGroup =  wordAndOneTuple.groupBy(0);
         // 5、分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        // 6、打印结果
        sum.print();

    }

控制台打印效果如下图所示。
初探Flink的Java实现流处理和批处理
在Flink 1.12 版本后,官方推荐做法是直接使用 DataSet API 即提交任务时将执行模式更改为BATCH来进行批处理
$bin/flink run -Dexecution.runtime-mode=BATCH batchWordCount.jar

方式二、流处理 DataStream API
流处理的处理步骤与批处理流程类似,主要区别是执行环境不一样。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BatchSteamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
        // 3、转换计算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将每行文本进行拆分
            String[] words = line.split(" ");
            // 将每个单词转化成二元组
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4、分组
        KeyedStream<Tuple2<String, Long>, Object> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6、打印结果
        sum.print();
        // 7、启动执行
        env.execute();
    }
}

控制台输出结果如下图所示。
初探Flink的Java实现流处理和批处理
从打印结果可以看出 多线程执行,结果是无序;第一列数字与本地运行环境的CPU核数有关;

参考文档

【1】https://www.bilibili.com/video/BV133411s7Sa?p=9&vd_source=c8717efb4869aaa507d74b272c5d90be文章来源地址https://www.toymoban.com/news/detail-499892.html

到了这里,关于初探Flink的Java实现流处理和批处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ‘java‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件

    1.‘java’ 不是内部或外部命令,也不是可运行的程序 或批处理文件。 这种情况一般来说是没有配置环境变量或者是没有配置好 (1)找到安装java的位置 (2)进入控制面板==》系统与安全==》系统==》高级设置–》环境变量 2.开始配置 (1)系统变量中新建 变量名:JAVA_HOME 变

    2024年02月07日
    浏览(44)
  • React源码解析18(11)------ 实现多次setState的批处理

    在React中,如果涉及到了多次setState,组件render几次。setState是同步的还是异步的。这是一个很常见的面试题。 而本篇文章,就是主要实现React中,对于这部分的性能优化,我们称之为批处理。例如当我有下面的JSX。 对于当前的点击事件来说,只有最后的setNum(num + 3)是有效的。

    2024年02月11日
    浏览(44)
  • Spring Boot + Spring Batch 实现批处理任务,保姆级教程!(场景实战)

    来源:blog.csdn.net/qq_35387940/article/details/108193473 概念词就不多说了,我简单地介绍下 , spring batch 是一个 方便使用的 较健全的 批处理 框架。 为什么说是方便使用的,因为这是 基于spring的一个框架,接入简单、易理解、流程分明。 为什么说是较健全的, 因为它提供了往常我

    2024年02月11日
    浏览(42)
  • 批处理命令大全 | Windows批处理教程 - ChatGPT

    批处理以.bat或.cmd文件的形式存在,在Windows命令提示符下运行,也可以通过双击批处理文件来运行。批处理文件由一系列命令组成,可以按照顺序执行,也可以根据条件或循环控制选择性地执行。 在Windows上创建一个批处理文件非常简单,在编辑器中输入一系列命令并保存为

    2024年02月04日
    浏览(87)
  • Windows批处理

    @ echo off :关闭命令的回显功能,这样在执行脚本时不会显示每条命令的具体执行过程。建议将此行放在批处理脚本的首行。 rem :用于添加注释,后面可以跟上注释内容。注释的作用是对脚本进行说明或提醒,不会被执行。 pause :暂停批处理的运行,直到用户按下任意键才

    2024年02月07日
    浏览(51)
  • redis批处理优化

    一个命令在网络传输的时间往往是远大于在redis中执行命令的时间的,如果每条命令都要逐条经历网络传输,耗时将会大大增加,我们不妨将命令多量少次的传输给redis,这样就大大减少了因为网络传输时间,大大提高的效率 2.1.单机模式下的批处理 2.2.集群模式下的批处理 这

    2024年01月19日
    浏览(47)
  • JDBC p4 批处理

    基本介绍: 当需要成批插入或者更新记录时。可以采用Java的批量更新机制,这一机制允许多条语句一次性提交给数据库批量处理。通常情况下比单独提交处理更有效率。 JDBC的批量处理语句包括下面方法: addBatch():添加需要批量处理的SQL语句或参数; executeBatch():执行批量

    2024年02月15日
    浏览(42)
  • BAT 批处理脚本教程

    第一节 常用批处理内部命令简介 批处理定义:顾名思义,批处理文件是将一系列命令按一定的顺序集合为一个可执行的文本文件,其扩展名为BAT或者CMD。这些命令统称批处理命令。 小知识:可以在键盘上按下Ctrl+C组合键来强行终止一个批处理的执行过程。 了解了大概意思后

    2024年02月02日
    浏览(52)
  • 【bat】批处理脚本大全

    目录 1.概述 2.变量 3.运算符 3.2.重定向运算符 3.3.多命名运算符 3.4.管道运算符 4.命令 4.1.基本命令 4.2.参数传递 4.3.查看脚本内容 4.4.注释 4.5.日期和时间 4.6.启动脚本 4.7.调用其他bat 4.8.任务管理 4.8.1.任务列表查看 4.8.2.任务终止 4.9.文件夹 4.10.关机 4.11.环境变量 4.12.目录 4.12.1

    2024年02月04日
    浏览(55)
  • 大数据处理平台的架构演进:从批处理到实时流处理

    🎈个人主页:程序员 小侯 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏 ✨收录专栏:大数据系列 ✨文章内容:大数据框架演进 🤝希望作者的文章能对你有所帮助,有不足的地方请在评论区留言指正,大家一起学习交流!🤗 大数据处理平台的架构演进经历了从批处理到实

    2024年02月10日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包