【跟小嘉学 Apache Flink】二、Flink 快速上手

这篇具有很好参考价值的文章主要介绍了【跟小嘉学 Apache Flink】二、Flink 快速上手。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

【跟小嘉学 Apache Flink】一、Apache Flink 介绍
【跟小嘉学 Apache Flink】二、Flink 快速上手

一、创建工程

1.1、创建 Maven 工程

创建 maven 工程 并且添加如下依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.xiaojia</groupId>
    <artifactId>flinkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>19</maven.compiler.source>
        <maven.compiler.target>19</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.1</version>
        </dependency>


    </dependencies>

</project>

1.2、log4j 配置

在 resource 目录下创建 log4j.properties 文件,写入如下内容

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

二、批处理单词统计(DataSet API)

2.1、创建 BatchWordCount 类型

package org.xiaojia.demo.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) {
        // 1、创建执行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        // 2、 从文件读取数据
        DataSource<String> lineDataSource = executionEnvironment.readTextFile("input/words.txt");

        // 3、将每一行数据进行分词,转换为二元组类型
        FlatMapOperator<String, Tuple2<String, Long>>  wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        // 5、分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        try {
            sum.print();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2.4、运行结果

【跟小嘉学 Apache Flink】二、Flink 快速上手,跟小嘉学Apache Flink,apache,flink,大数据,数据仓库
实际上在 Flink 里面已经做到流批处理统一,官方推荐使用 DateStream API,在跳任务时通过执行模式设置为 Batch 来进行批处理

bin/fliink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

三、流处理单词统计(DataSet API)

使用 DataSet API可以很容易实现批处理。对于Flink而言,流处理才是处理逻辑的底层核心,所以流批统一之后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据是有界流。所以批处理,其实可以看作是有界流的处理。

3.1、读取文件流

3.1.1、过时的写法

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、读取文件
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.readTextFile("input/words.txt");

        // 3、将每一行数据进行分词,转换为二元组类型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、执行等待
        streamExecutionEnvironment.execute();

    }
}

3.1.2、执行错误的处理

Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module
    @7ce6a65d

如果出现上述类似错误,解决方案,通过添加 VM参数打开对应模块的对应模块包

--add-opens java.base/java.lang=ALL-UNNAMED 
--add-opens java.base/java.util=ALL-UNNAMED

【跟小嘉学 Apache Flink】二、Flink 快速上手,跟小嘉学Apache Flink,apache,flink,大数据,数据仓库

3.1.3、执行结果

【跟小嘉学 Apache Flink】二、Flink 快速上手,跟小嘉学Apache Flink,apache,flink,大数据,数据仓库

3.1.4、readTextFile 过时问题

【跟小嘉学 Apache Flink】二、Flink 快速上手,跟小嘉学Apache Flink,apache,flink,大数据,数据仓库
解决方案可以按照提示给出的 使用 FileSource(需要用到Flink的连接器)


<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-files</artifactId>
  <version>${flink.version}</version>
</dependency>

3.2、读取 socket 网络流

3.2.1、读取socket 流代码

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、读取socket流
        String hostname = "127.0.0.1";
        int port = 8888;
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);

        // 3、将每一行数据进行分词,转换为二元组类型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、执行等待
        streamExecutionEnvironment.execute();
    }
}

3.2.2、使用 nc 监听端口

(base) xiaojiadeMacBook-Pro:~ xiaojia$ nc -lk 8888
hello java
hello flink
hello world

3.2.3、执行结果

【跟小嘉学 Apache Flink】二、Flink 快速上手,跟小嘉学Apache Flink,apache,flink,大数据,数据仓库
此时,只要有数据进来,就会统计

3.2.4、从命令行参数获取主机名和端口号

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式的执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、读取socket流

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String hostname = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);

        // 3、将每一行数据进行分词,转换为二元组类型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、执行等待
        streamExecutionEnvironment.execute();
    }
}

命令行参数传递
【跟小嘉学 Apache Flink】二、Flink 快速上手,跟小嘉学Apache Flink,apache,flink,大数据,数据仓库文章来源地址https://www.toymoban.com/news/detail-707299.html

到了这里,关于【跟小嘉学 Apache Flink】二、Flink 快速上手的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 流数据湖平台Apache Paimon(三)Flink进阶使用

    2.9.1 写入性能 Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量: 增加检查点间隔,或者仅使用批处理模式。 增加写入缓冲区大小。 启用写缓冲区溢出。 如果您使用固定存储桶模式,请重新调整存储桶数量。 2.9.1.1 并行度 建议sink的并行度小于等于bucket的数量

    2024年02月09日
    浏览(32)
  • 流数据湖平台Apache Paimon(二)集成 Flink 引擎

    Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。 环境准备 2.1.1 安装 Flink 1)上传并解压Flink安装包 tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/ 2)配置环境变量 2.1.2 上传 jar 包 1)下载并上传Paimon的jar包 jar包下载地址:https://repository.apache.org/snapshots/org/apache/pa

    2024年02月09日
    浏览(44)
  • 【大数据-实时流计算】图文详解 Apache Flink 架构原理

    目录 Apache Flink架构介绍 一、Flink组件栈 二、Flink运行时架构 在Flink的整个

    2024年02月02日
    浏览(41)
  • 【大数据】深入浅出 Apache Flink:架构、案例和优势

    Apache Flink 是一个强大的开源流处理框架,近年来在大数据社区大受欢迎。它允许用户实时处理和分析大量流式数据,使其成为 欺诈检测 、 股市分析 和 机器学习 等现代应用的理想选择。 在本文中,我们将详细介绍什么是 Apache Flink 以及如何使用它来为您的业务带来益处。

    2024年01月17日
    浏览(45)
  • Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)

    在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到 写hudi真实数据 以及 写hudi元数据 ,这篇文章来说一下具体的实现 这里的操作就是在 HoodieFlinkWriteClient.upsert 方法: initTable 初始化HoodieFlinkTable preWrite 在这里几乎没

    2024年02月10日
    浏览(34)
  • 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

    Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。 社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何

    2024年02月07日
    浏览(56)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(53)
  • Kudu与Apache Flink的集成:实时数据处理的新方法

    随着数据的增长,实时数据处理变得越来越重要。传统的批处理系统已经不能满足现在的需求。因此,实时数据处理技术逐渐成为了研究的热点。Kudu和Apache Flink是两个非常重要的实时数据处理系统,它们各自具有独特的优势。Kudu是一个高性能的列式存储系统,适用于实时数

    2024年02月21日
    浏览(41)
  • 重磅!flink-table-store将作为独立数据湖项目重入apache

    数据湖是大数据近年来的网红项目,大家熟知的开源数据湖三剑客 Apache hudi、Apache iceberg 、Databricks delta 近年来野蛮生长,目前各自背后也都有商业公司支持,投入了大量的人力物力去做研发和宣传。然而今天我们要讲的是数据湖界的后起之秀 —— flink-table-store。 熟悉 Flin

    2024年02月08日
    浏览(42)
  • Apache Flink 和 Apache Kafka 两者之间的集成架构 Flink and Apache Kafka: A Winning Partnership

    作者:禅与计算机程序设计艺术 Apache Flink 和 Apache Kafka 是构建可靠、高吞吐量和低延迟的数据管道(data pipeline)的两个著名的开源项目。2019年4月,两者宣布合作共赢。在这次合作中,Apache Kafka 将提供强大的消息存储能力、Flink 将作为一个分布式数据流处理平台来对其进行

    2024年02月11日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包