Flink定制化功能开发,demo代码

这篇具有很好参考价值的文章主要介绍了Flink定制化功能开发,demo代码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

       这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

Flink定制化功能开发,demo代码,Flink,linq,c#

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.example</groupId>
        <artifactId>flinkCDC</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>flinkStream</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink-version>1.14.6</flink-version>
        <scala-version>2.12</scala-version>
        <hadop-common-version>2.9.1</hadop-common-version>
        <elasticsearch.version>7.6.2</elasticsearch.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
            </snapshots>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala-version}</artifactId>
            <version>${flink-version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala-version}</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>myflinkml.DataStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>
2.2.2 kafka生产者线程方法

Flink定制化功能开发,demo代码,Flink,linq,c#

package org.example.util;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.*;

/**
 * 向kafka生产数据
 *
 * @author i7杨
 * @date 2024/01/12 13:02:29
 */

public class KafkaProducerUtil extends Thread {

    private String topic;

    public KafkaProducerUtil(String topic) {
        super();
        this.topic = topic;
    }

    private static Producer<String, String> createProducer() {
        // 通过Properties类设置Producer的属性
        Properties properties = new Properties();
//        测试环境 kafka 配置
        properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(properties);
    }

    @Override
    public void run() {
        Producer<String, String> producer = createProducer();
        Random random = new Random();
        Random random2 = new Random();

        while (true) {
            int nums = random.nextInt(10);
            int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();

            String time = new Date().getTime() / 1000 + 5 + "";
            String type = "pv";
            try {
                if (nums2 % 2 == 0) {
                    type = "pv";
                } else {
                    type = "uv";

                }
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";
                String info = nums + "=" + nums2;

                System.out.println("message : " + info);
                producer.send(new ProducerRecord<String, String>(this.topic, info));
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("=========数据已经写入==========");
            
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        new KafkaProducerUtil("test01").run();
    }
    
    public static void sendMessage(String topic, String message) {
        Producer<String, String> producer = createProducer();
        producer.send(new ProducerRecord<String, String>(topic, message));
    }
    
}
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

package org.example.funtion;

import org.apache.flink.api.common.functions.FilterFunction;

/**
 * FilterFunction重构
 *
 * @author i7杨
 * @date 2024/01/12 13:02:29
 */

public class InfoFilterFunction implements FilterFunction<String> {

    private double threshold;

    public InfoFilterFunction(double threshold) {
        this.threshold = threshold;
    }

    @Override
    public boolean filter(String value) throws Exception {

        if (value.split("=").length == 2)
            // 阈值过滤
            return Double.valueOf(value.split("=")[1]) > threshold;
        else return false;
    }
}

自定义MapFunction函数:后缀为2的,添加上特殊信息

package org.example.funtion;

import org.apache.flink.api.common.functions.MapFunction;

public class ActionMapFunction implements MapFunction<String, String> {

    @Override
    public String map(String value) throws Exception {
        System.out.println("value:" + value);
        if (value.endsWith("2"))
            return value.concat(":Special processing information");
        else return value;
    }
}

2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

package org.example.service;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;

import java.util.*;

public class FlinkTestDemo {
    public static void main(String[] args) throws Exception {

        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka 配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");
        kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "test01",// Kafka 主题名称
                new SimpleStringSchema(),
                kafkaProps);

        // 从 Kafka 中读取数据流
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
        env.disableOperatorChaining();

        kafkaStream
                .filter(new InfoFilterFunction(40))
                .map(new ActionMapFunction())
                .print("阈值大于40以上的message=");

        // 执行任务
        env.execute("This is a testing task");
    }


}

运行结果:

Flink定制化功能开发,demo代码,Flink,linq,c#文章来源地址https://www.toymoban.com/news/detail-814686.html

到了这里,关于Flink定制化功能开发,demo代码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 示例代码:使用python进行flink开发

    以下是一个使用 Python 进行 Flink 开发的简单示例代码: 以上示例代码使用 PyFlink 库连接到 Flink 作业集群,并定义了一个输入流和一个输出流。然后,使用 UDF (User Defined Function)对输入数据进行处理,并将处理后的数据写入输出流。最后,执行作业并等待作业结束。 请注意

    2024年02月13日
    浏览(52)
  • 示例代码:使用golang进行flink开发

    以下是一个使用 Golang 进行 Flink 开发的简单示例代码: 以上示例代码使用 Flink 的 REST API 连接到 Flink 作业集群,并定义了一个输入数据流和一个输出数据流。然后,使用 Map 操作对输入数据进行处理,并将处理后的数据写入输出数据流。最后,执行作业并等待作业结束。 请注

    2024年02月12日
    浏览(45)
  • Qt开发技术:Q3D图表开发笔记(三):Q3DSurface三维曲面图介绍、Demo以及代码详解

      qt提供了q3d进行三维开发,虽然这个框架没有得到大量运用也不是那么成功,性能上也有很大的欠缺,但是普通的点到为止的应用展示还是可以的。   其中就包括华丽绚烂的三维图表,数据量不大的时候是可以使用的。   前面介绍了基础的q3d散点图、柱状图,本篇介

    2023年04月20日
    浏览(80)
  • Qt开发技术:Q3D图表开发笔记(二):Q3DBar三维柱状图介绍、Demo以及代码详解

    若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/130150728 各位读者,知识无穷而人力有穷,要么改需求,要么找专业人士,要么自己研究 红胖子网络科技博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ff

    2023年04月22日
    浏览(90)
  • Flink+hadoop部署及Demo

    https://dlcdn.apache.org/hadoop/common/hadoop-3.2.4/hadoop-3.2.4.tar.gz 上传并解压到3台服务器 配置3台主机的hosts和免密登录 source .bash_profile hadoop version 查看hadoop下载版本 架构如下: hadoop01 hadoop02 hadoop03 HDFS NameNode,DataNode DataNode NameNode,DataNode YARN ResouceManager,NodeManager NodeManager ResouceManager,Nod

    2024年02月07日
    浏览(30)
  • flink 1.18 sql demo

    更换flink-table-planner 为 flink-table-planner-loader pom.xml demo

    2024年01月18日
    浏览(42)
  • springboot集成flink步骤,及demo

    springboot集成flink,写代码学习flink,集成步骤如下: 1、maven引入依赖: 2、配置文件配置相关参数: 3、写测试类,代码如下 :

    2024年02月09日
    浏览(35)
  • dinky+flink+doris实时架构全流程demo

    官网下载地址:https://archive.apache.org/dist/doris/1.2/1.2.3-rc02/   #doris单机部署 #配置FE  #配置 BE #下载地址:Index of /dist/flink/flink-1.14.6 #Flink单机部署 #相关依赖包下载 #启动 #访问 #下载  #Dinky单机部署 #配置dinky #添加依赖 依赖下载:https://download.csdn.net/download/qq_41060328/87817727 备份

    2024年02月15日
    浏览(40)
  • flink学习之广播流与合流操作demo

    广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现

    2024年02月09日
    浏览(37)
  • 每个.NET开发都应掌握的linq知识点

    LINQ是C#3.0引入的特性,让处理对象就像执行SQL语句一样简单,对于提高C#开发效率有革命性的作用。 对于每个.NET开发者来说,掌握C#的LINQ知识点是非常重要的。LINQ是C#的一个强大的特性,它为数据查询和操作提供了简洁、统一的语法,使得数据处理变得更加直观和灵活。 以

    2024年02月14日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包