Flink DataStream之创建执行环境

这篇具有很好参考价值的文章主要介绍了Flink DataStream之创建执行环境。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

新建project:

  • pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test.wyh</groupId>
    <artifactId>Flink117_Test_01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>
  • Main类(先用批的方式测试)
package test01;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class EnvDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //指定端口,默认8081
        conf.set(RestOptions.BIND_PORT, "8082");

        //会自动识别是远程集群还是本地IDEA环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //设置流/批,默认是流
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.readTextFile("test_input/test_word.txt")
                .flatMap(
                        (String value, Collector< Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words){
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}
  • 创建测试文件

Flink DataStream之创建执行环境

  • 运行程序

Flink DataStream之创建执行环境

------------------------------------------------- 

  •  Main类(用批的方式测试)
package test01;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class EnvDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //指定端口,默认8081
        conf.set(RestOptions.BIND_PORT, "8082");

        //会自动识别是远程集群还是本地IDEA环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //设置流/批,默认是流
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.readTextFile("test_input/test_word.txt")
                .flatMap(
                        (String value, Collector< Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words){
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}
  • 运行程序

Flink DataStream之创建执行环境

 文章来源地址https://www.toymoban.com/news/detail-509841.html

到了这里,关于Flink DataStream之创建执行环境的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(31)
  • Flink DataStream API详解

    参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html Data Sources Source是程序读取其输入的位置,您可以使用 env.addSource(sourceFunction) 将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定

    2024年02月14日
    浏览(29)
  • flink的datastream基本转换

     

    2023年04月13日
    浏览(30)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(30)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(42)
  • Flink基础之DataStream API

    union联合:被unioin的流中的数据类型必须一致 connect连接:合并的两条流的数据类型可以不一致 connec后,得到的是ConnectedStreams 合并后需要根据数据流是否经过keyby分区 coConnect: 将两条数据流合并为同一数据类型 keyedConnect 目前所使用的大多数Sink, 都是基于2PC的方式来保证状态

    2024年02月05日
    浏览(33)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(37)
  • Flink DataStream之从Kafka读数据

    搭建Kafka 参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客  启动zookeeper 启动kafka 查看进程  创建topic 查看topic列表 导入pom依赖 新建类 启动程序 在终端向kafka生产数据,同时观察程序控制台flink的读取情况  如图说明flink从kafka成功读取数据。

    2024年02月13日
    浏览(40)
  • Flink DataStream之Union合并流

    新建类 运行程序 方式一:  方式二:

    2024年02月16日
    浏览(33)
  • flink cdc DataStream api 时区问题

    以postgrsql 作为数据源时,Date和timesatmp等类型cdc同步读出来时,会发现一下几个问题: 源表: sink 表: 解决方案:在自定义序列化时进行处理。 java code scala code mysql cdc也会出现上述时区问题,Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是

    2024年02月07日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包