Flink 流处理API

这篇具有很好参考价值的文章主要介绍了Flink 流处理API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

 文章来源地址https://www.toymoban.com/news/detail-453759.html

一、环境

1.1getExecutionEnvironment

1.2createLocalEnvironment

1.3createRemoteEnvironment

二、从集合中读取数据

三、从文件中读取数据

四、从KafKa中读取数据

1.导入依赖

2.启动KafKa

3.java代码


 

一、环境

1.1getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

#批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

#流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

设置并行度:如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认为1

 //设置并行度为8
 env.setParallelism(8);

1.2createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); 

1.3createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("IP",端口号,jar包路径)

二、从集合中读取数据


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

/**
 * @author : Ashiamd email: ashiamd@foxmail.com
 * @date : 2021/1/31 5:13 PM
 * 测试Flink从集合中获取数据
 */
public class SourceTest1_Collection {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置env并行度1,使得整个任务抢占同一个线程执行
        env.setParallelism(1);

        // Source: 从集合Collection中获取数据
        DataStream<SensorReading> dataStream = env.fromCollection(
                Arrays.asList(
                        new SensorReading("sensor_1", 1547718199L, 35.8),
                        new SensorReading("sensor_6", 1547718201L, 15.4),
                        new SensorReading("sensor_7", 1547718202L, 6.7),
                        new SensorReading("sensor_10", 1547718205L, 38.1)
                )
        );

        DataStream<Integer> intStream = env.fromElements(1,2,3,4,5,6,7,8,9);

        // 打印输出
        dataStream.print("SENSOR");
        intStream.print("INT");

        // 执行
        env.execute("JobName");

    }

}

三、从文件中读取数据

文件由自己创建一个txt文件

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author : Ashiamd email: ashiamd@foxmail.com
 * @date : 2021/1/31 5:26 PM
 * Flink从文件中获取数据
 */
public class SourceTest2_File {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 使得任务抢占同一个线程
        env.setParallelism(1);

        // 从文件中获取数据输出
        DataStream<String> dataStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");

        dataStream.print();

        env.execute();
    }
}

四、从KafKa中读取数据

1.导入依赖

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.10.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>
    <!-- kafka -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.12.1</version>
    </dependency>

  </dependencies>

2.启动KafKa

启动Zookeeper

./bin/zookeeper-server-start.sh [config/zookeeper.properties]

启动KafKa服务

./bin/kafka-server-start.sh -daemon ./config/server.properties

启动KafKa生产者

./bin/kafka-console-producer.sh --broker-list localhost:9092  --topic sensor

3.java代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

/**
 * @author : Ashiamd email: ashiamd@foxmail.com
 * @date : 2021/1/31 5:44 PM
 */
public class SourceTest3_Kafka {

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度1
        env.setParallelism(1);

        Properties properties = new Properties();
        //监听的kafka端口
        properties.setProperty("bootstrap.servers", "localhost:9092");
        // 下面这些次要参数
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // flink添加外部数据源
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("sensor", new SimpleStringSchema(),properties));

        // 打印输出
        dataStream.print();

        env.execute();
    }
}

 

 

到了这里,关于Flink 流处理API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【hadoop】centos7.6+hadoop3.1.1搭建分布式hadoop环境——包含各类问题解决方案

    本文针对centos7.4即以上版本的hadoop环境搭建,因为这部分搭建是个很复杂且很容易出错的内容,所以在结合了多种搭建方案后给出最适宜当前版本的搭建。 本教程适用于CentOS 7.4即以上版本,如果是Ubuntu等其它linux内核版本则不适合。 查看系统版本: 软件 版本 获取方法 Ope

    2024年02月16日
    浏览(44)
  • 机器环境无法访问GitHub情况下linux安装OpenCV执行cmake无法下载ADE文件v0.1.1f.zip

    ☞ ░ 前往老猿Python博客 ░ https://blog.csdn.net/LaoYuanPython 在CSDN的博文《构建VisualStudio2019+OpenCV4.3的C++ windows编译环境》中,老猿介绍了opencv版本的下载方法的方法,该方法下载OpenCV的代码不要上GitHub,国内可以直接访问,比较方便。 在linux安装OpenCV与上面博文不同的是版本选择

    2024年02月03日
    浏览(56)
  • Flink流处理API大合集:掌握所有flink流处理技术

    Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了(1),2024年程序员学习,flink,大数据

    2024年04月16日
    浏览(22)
  • 解决conda创建环境,环境路径并非是conda安装目录下的envs或我们设置的目录

    有些同学可能遇到使用conda创建环境,环境的路径总是在C盘,但是明明conda安装在D盘,而且配置文件的默认路径也是D盘。其实原因很简单,因为设置的默认路径没有足够的权限。 1.怎么查看默认的目录呢 第一种方法: 找到 .condarc 文件,一般在 C:Usersadministrator.condarc ,以文

    2024年02月11日
    浏览(46)
  • Doris-05-集成Spark、Flink、Datax,以及数据湖分析(JDBC、ODBC、ES、Hive、多源数据目录Catalog)

    准备表和数据: Spark 读写 Doris Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。 代码库地址:https://github.com/apache/incubator-doris-spark-connector 支持从 Doris 中读取数据 支持 Spark DataFrame 批量/流式 写入 Doris 可以将 Doris 表映射为 DataFra

    2024年02月06日
    浏览(62)
  • Flink 处理函数(1)—— 基本处理函数

    在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以 自定义处理逻辑 在处理函数中,我们直面的就是数据流中最基本的元素: 数据事件(event)、状态(state)以及时间(time) 。这就相当于 对流有了完全的控制权 基本处理函数主要是定

    2024年01月18日
    浏览(42)
  • Flink处理函数(3)—— 窗口处理函数

    窗口处理函数包括:ProcessWindowFunction 和 ProcessAllWindowFunction 基础用法 这里的 MyProcessWindowFunction 就是 ProcessWindowFunction 的一个实现类; ProcessWindowFunction 是一个典型的全窗口函数,把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理 源码解析 类型参数如下: IN:i

    2024年01月20日
    浏览(43)
  • Flink高手之路:Flink的环境搭建

    本地单机模式,一般用于测试环境是否搭建成功,很少使用 flink 自带集群,开发测试使用 StandAloneHA :独立集群的高可用模式,也是 flink 自带,用于开发测试环境 计算资源统一由hadoop yarn管理,生产环境使用   上传到hadoop001 [root@hadoop001 software]# tar -xzvf flink-1.12.2-bin-scala_2.1

    2023年04月24日
    浏览(51)
  • Flink处理函数(2)—— 按键分区处理函数

     按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作 定时器(timers)是处理函数中进行时间相关操作的主要机制 定时服务(TimerService)提供了注册定时器的功能 TimerService 是 Flink 关于时间和定时器的基础服务接口: 六个方法可以分成两大类:基于处理时

    2024年01月21日
    浏览(44)
  • Python获取与处理文件路径/目录路径

      1.1 获取当前文件的绝对路径 使用**os.path.abspath()**获取当前文件的绝对路径。 输出:   1.2.1 获取当前文件的所在目录 使用**os.path.dirname()**获取当前文件的所在目录。 输出:   1.2.2 获取当前文件的所在目录的上一级目录 使用多个**os.path.dirname()**嵌套以获取当前文件的所

    2024年01月17日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包