Flink 定时加载数据源

这篇具有很好参考价值的文章主要介绍了Flink 定时加载数据源。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、简介

flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单

如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现

二、pom.xml 文件

注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带
Flink 定时加载数据源

下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ye</groupId>
    <artifactId>flink-study</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.0</flink.version>
        <!--<flink.version>1.12.7</flink.version>-->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
<!--
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ye.DataStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

三、自定义数据源

使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串

public class TimerSinkRich extends RichSourceFunction<String> {

    private  ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    private boolean flag = true;
    private Timer timer;
    private TimerTask timerTask;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        timerTask = new TimerTask() {
            @Override
            public void run() {
            // 可以在这块获取 MySQL、redis 等连接并查询数据
                Random random = new Random();
                StringBuilder str = new StringBuilder();
                for (int i = 0; i < 10; i++) {
                    char ranLowLetter = (char) ((random.nextInt(26) + 97));
                    str.append(ranLowLetter);
                }
                queue.add(str.toString());
            }
        };
        timer = new Timer();
        // 延时和执行周期参数可以通过构造方法传递
        timer.schedule(timerTask,1000,5000);
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (flag){
            if(queue.size()>0){
                ctx.collect(queue.remove());
            }
        }
    }


    @Override
    public void cancel() {
        if(null!=timer) timer.cancel();
        if(null!=timerTask) timerTask.cancel();
        // 撤销任务时,flink 默认 180s 间隔 30s 执行一次 cancel 方法,(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败
        if(queue.size()<=0) flag = false;
    }
}

2023-07-21 日更新
上面的有一个很大的问题就是 while(true),会导致 CPU 过高,排查 CPU 过高问题,可参考我另一篇文章https://yixiu.blog.csdn.net/article/details/131842679?spm=1001.2014.3001.5502

目前我的解决办法是在 while(true) 里面睡眠可大幅度降低 CPU 使用率,避免空跑,将上面的 run 方法改为如下即可,其中 Thread.sleep(100) 睡眠时间可根据数据量密度自行改变

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
         while (true) {
            Thread.sleep(100);
            String data;
            while (null != (data = queue.poll())) {
                ctx.collect(data );
            }
        }
    }

建议:对于上面的 cancel 方法,是否需要使用 if(queue.size()<=0) 去判断需自行决断

  • 如果加该判断,假如默认时间该队列 queue 未消费完,可能导致任务关闭失败,可以修改 flink 配置修改默认 cancel 任务时间
  • 如果不加该判断,会导致如果队列还有数据,会直接关闭任务丢弃 queue 未处理数据

Flink 定时加载数据源

四、flink 加载数据源并启动
public class TimerSinkStreamJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        DataStreamSource<String> streamSource = executionEnvironment.addSource(new TimerSinkRich());
        streamSource.print();
        executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");
    }
}

本地测试成功

Flink 定时加载数据源

五、上传 flink 集群
1、flink 1.16.0

启动成功
Flink 定时加载数据源
撤销任务成功
Flink 定时加载数据源
solt 也成功释放
Flink 定时加载数据源

2、flink 1.12.7

启动成功
Flink 定时加载数据源
撤销任务当然也没问题,同样能正常释放 slot
Flink 定时加载数据源

当然你也可以不要 open() 方法

public class DiySinkRich extends RichSourceFunction<String> {

    private TimerTask timerTask;
    private Timer timer;
    private boolean flag = true;
    private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        timerTask = new TimerTask() {
            @Override
            public void run() {
                Random random = new Random();
                StringBuilder str = new StringBuilder();
                for (int i = 0; i < 10; i++) {
                    char ranLowLetter = (char) ((random.nextInt(26) + 97));
                    str.append(ranLowLetter);
                }
                queue.add(str.toString());
            }
        };
        timer = new Timer();
        timer.schedule(timerTask, 1000, 5000);
        while (flag) {
            if (queue.size() > 0) {
                ctx.collect(queue.remove());
            }
        }
    }


    @Override
    public void cancel() {
        if (timer != null) timer.cancel();
        if (timerTask != null) timerTask.cancel();
        if (queue.size() == 0) flag = false;
    }
}

以上就是 flink 定时加载数据源的简单实例文章来源地址https://www.toymoban.com/news/detail-450073.html

到了这里,关于Flink 定时加载数据源的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包