springboot集成flink步骤,及demo

这篇具有很好参考价值的文章主要介绍了springboot集成flink步骤,及demo。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

springboot集成flink,写代码学习flink,集成步骤如下:

1、maven引入依赖:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_2.11</artifactId>
   <version>${flink.version}</version>
</dependency>

2、配置文件配置相关参数:

# Flink配置
flink.jobmanager.host=localhost
flink.jobmanager.port=6123
flink.parallelism=1

3、写测试类,代码如下 :文章来源地址https://www.toymoban.com/news/detail-488057.html

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import java.util.Random;



public class Demo {

    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建数据源
        DataStream<String> stream = env.addSource(new SourceFunction<String>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Thread.sleep(10);
                    long timestamp = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    String str = "key" + random.nextInt(10) + "," + timestamp;
                    ctx.collectWithTimestamp(str, timestamp);
                    ctx.emitWatermark(new Watermark(timestamp));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });


        // 将数据源解析成二元组(key, timestamp)
        DataStream<Tuple2<String, Long>> parsedStream = stream.map((String line)  -> {
            String[] parts = line.split(",");
            return new Tuple2<>((String)parts[0], Long.parseLong(parts[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 设置事件时间和水位线
        DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple2<String, Long> element) {
                        return element.f1;
                    }
                });

        // 按键值进行分组
        KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);

        // 每5秒钟统计最近一分钟的数据(使用滚动时间窗口)
        WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));

        // 进行聚合计算
        DataStream<Tuple2<String, Long>> resultStream = windowedStream
                .reduce((Tuple2<String, Long> v1, Tuple2<String, Long> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1));

        // 输出结果
        resultStream.print();

        // 启动作业
        env.execute("Demo");
    }
}

到了这里,关于springboot集成flink步骤,及demo的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot集成kafka详细步骤(发送及监听消息示例)

    1、本机的kafka环境配置,不再赘述 2、添加 pom 文件 3、配置application.yml 4、复写kafka的相关配置类:生产、消费相关配置 5、生产、消费的伪代码 6、测试消息发送 经过测试!

    2024年02月11日
    浏览(41)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(44)
  • springboot集成starrocks、以及采用flink实现mysql与starrocks亚秒级同步

    (因采用dynamic-datasource-spring-boot-starter动态数据源,所以才是以下配置文件的样式,像redis,druid根据自己情况导入依赖) 这个配置文件的场景是把starrocks当成slave库在用。某些大数据慢查询就走starrocks 就这样配置好后就可把starrocks当mysql用了 重点:采用这种方式有限制,插入

    2024年01月21日
    浏览(34)
  • Elasticsearch 集成--Flink 框架集成

           Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。 但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没 有完全解决,而这

    2024年02月09日
    浏览(33)
  • Flink与ApacheAirflow集成

    在大数据处理领域,流处理和批处理是两个非常重要的领域。Apache Flink 是一个流处理框架,Apache Airflow 是一个工作流管理器。在实际应用中,我们可能需要将这两个系统集成在一起,以实现更高效的数据处理和管理。本文将详细介绍 Flink 与 Airflow 的集成方法,并提供一些实

    2024年02月20日
    浏览(33)
  • Hudi集成Flink

    安装Maven 1)上传apache-maven-3.6.3-bin.tar.gz到/opt/software目录,并解压更名 tar -zxvf apache-maven-3.6. 3 -bin.tar.gz -C /opt/module/ mv   apache -maven-3.6. 3  maven 2)添加环境变量到/etc/profile中 sudo  vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin 3)测试安装结果 sourc

    2023年04月13日
    浏览(33)
  • Hudi(四)集成Flink(2)

            当前表 默认是快照读取 ,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1、WITH参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置

    2024年02月07日
    浏览(36)
  • Flink单机版安装教程 - 步骤详解

    本教程详细介绍了如何在单机环境下安装和启动Apache Flink 1.16.0版本。包括下载稳定版安装包,使用tar命令解压,以及通过start-cluster.sh脚本启动Flink集群。

    2024年02月11日
    浏览(47)
  • 基于Hadoop搭建Flink集群详细步骤

    目录 1.xftp上传flink压缩包至hadoop102的/opt/software/目录下 2.解压flink压缩包至/opt/module/目录下 3. 配置flink-conf.yaml 4.配置masters 5.配置workers 6.配置环境变量my_env.sh 7.重启环境变量 8.分发/opt/module/flink-1.13.0和/etc/profile.d/my_env.sh 9.另外两台重启环境变量 10.开启hadoop集群和flink集群 11.浏

    2024年02月09日
    浏览(68)
  • 第二章 Flink集成Iceberg的集成方式及基本SQL使用

    注意事项:一般都是用基于Flink的Hive Catalog,使用HMS存储表模型数据 1、集成方式 (1)下载jar包 下载地址 (2)启动FlinkSQL ①StandLone模式启动 ②Flink On Yarn模式启动 2、基本使用 2.1、创建catalog 核心:可创建hive、hadoop、自定义等目录,创建模板如下 type : 必须的 iceberg 。(必需

    2024年02月08日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包