flink中使用外部定时器实现定时刷新

这篇具有很好参考价值的文章主要介绍了flink中使用外部定时器实现定时刷新。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景:

我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

外部定时器定时加载实现

1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

package wikiedits.schedule;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {

    // 定时任务执行器
    private transient ScheduledExecutorService scheduledExecutorService;
    // 本地变量
    private int threshold;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 1.从db查询数据初始化本地变量
//        threshold = DBManager.SELECTSQL.getConfig("threshold");
        // 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值
        scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            // 2.1 定时任务更新本地内存配置项
            // List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
//            for(ConfigEntity entity : configList){
                ConfigEntityLocalCache.getInstance().update("key", "value");
//            }
            // 2.2 更新本地变量threshold的值
//            threshold = DBManager.SELECTSQL.getConfig("threshold");
        }, 0, 100, TimeUnit.SECONDS);

    }

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {

    }

    @Override
    public void close() throws Exception {
        ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);
    }


}

//本地缓存实现
package wikiedits.schedule;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * 保存Config信息的本地缓存 ---定时同步DB配置表的数据
 */
public class ConfigEntityLocalCache {

    private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();

    /**
     * 获取本地缓存实例
     */
    public static ConfigEntityLocalCache getInstance() {
        return instance;
    }

    /** 缓存内存配置项 */
    private static Cache<String, String> configCache =
            CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();


    /**
     * 更新本地缓存数据
     */
    public boolean update(String key, String value){
        configCache.put(key, value);
        return true;
    }


    /**
     * 更新本地缓存数据
     */
    public  String getByKey(String key){
        return configCache.getIfPresent(key);
    }

}


2.在静态类中通过static语句块创建定时器并定时加载,代码如下

package wikiedits.schedule;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * 静态类定时加载DB配置表到本地内存中
 */
public class StaticLoadUtil {

    // 定时任务执行器
    private static transient ScheduledExecutorService scheduledExecutorService;

    public static final Cache<String, String> configCache =
            CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();

    // 通过定时执行器定时同步本地缓存和DB配置表
    static {
        scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            // 2.1 定时任务更新本地内存配置项
            // List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
            // for(ConfigEntity entity : configList){
            configCache.put("key", "value");
            // }
            // 2.2 更新本地变量threshold的值
            // threshold = DBManager.SELECTSQL.getConfig("threshold");
        }, 0, 100, TimeUnit.SECONDS);
    }

    /**
     * 获取本地缓存
     */
    public static Cache<String, String> getConfigCache() {
        return configCache;
    }


}

总结:

1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行文章来源地址https://www.toymoban.com/news/detail-736743.html

到了这里,关于flink中使用外部定时器实现定时刷新的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【STM32】STM32学习笔记-定时器定时中断 定时器外部时钟(14)

    1.1 TIM_InternalClockConfig 1.2 TIM_TimeBaseInit 1.3 TIM_TimeBaseInitTypeDef 1.4 TIM_ClearFlag 1.5 TIM_ITConfig 1.6 TIM_Cmd 1.7 中断服务函数 参考程序 1.8 TIM_ETRClockMode2Config timer.h timer.c main.c timer.h timer.c main.c 09-定时器定时中断.rar 10-定时器外部时钟.rar 参考: 【STM32】江科大STM32学习笔记汇总

    2024年02月03日
    浏览(38)
  • STM32单片机(六)TIM定时器 -> 第二节:TIM定时中断练习(定时器定时中断和定时器外部时钟)

    ❤️ 专栏简介:本专栏记录了从零学习单片机的过程,其中包括51单片机和STM32单片机两部分;建议先学习51单片机,其是STM32等高级单片机的基础;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 :适用于想要从零基础开始学习入门单片机,且有一定C语言基础的的童鞋

    2024年02月09日
    浏览(30)
  • STM32自学☞定时器外部时钟案例

    本案例主要是通过外部时钟实现对射式红外传感器的计次,在oled显示屏上显示CNT的次数 #include \\\"stm32f10x.h\\\" #include \\\"stm32f10x_tim.h\\\" #include \\\"timer_interrupt.h\\\" #include \\\"stdint.h\\\" //初始化函数 void Timer_Init(void) {  /*开启时钟*/  RCC_APB1PeriphClockCmd(RCC_APB1Periph_TIM2, ENABLE); //开启TIM2的时钟  RCC_A

    2024年02月22日
    浏览(33)
  • 江科大stm32视频学习笔记——TIM定时中断&定时器外部时钟

    目录 一、TIM(Timer)定时器简介  1.1 定时器类型 摘要 1.1.1 基本定时器 1.1.2 通用定时器 1.1.3 高级定时器  1.2 定时中断基本结构 1.2.1 结构框图 1.2.2 时序图 二、定时器定时中断定时器外部时钟 2.1 内部时钟闹钟代码 2.1.1 Timer.c 2.1.2 Buzzer.c加入间隔发声函数 2.1.3 main.c 2.1.4 实验视频

    2024年01月23日
    浏览(47)
  • 51 单片机【外部中断、定时器中断、回调函数】

    ​这里的外部中断类似监听器,时时刻刻监视某引脚的电平变化;这里的定时器中断类似于定时任务,可以定时执行某函数;这里将回调函数和中断结合起来,案例里有点设计模式的味道(忘了哪个了,也可能就是感觉,关于高层不能调用低层的解决),也有点函数式编程的

    2024年02月04日
    浏览(49)
  • 单片机学习 11-中断系统(定时器中断+外部中断)

    ​ 中断是为使单片机具有对外部或内部随机发生的事件实时处理而设置的,中断功能的存在,很大程度上提高了单片机处理外部或内部事件的能力。它也是单片机最重要的功能之一,是我们学习单片机必须要掌握的。很多初学者被困在中断中,学了很久仍然不知道中断究竟是

    2024年02月05日
    浏览(37)
  • 51单片机:中断系统(外部中断,定时器中断,串口通信)

    目录 中断系统简介: 中断的优先级和嵌套: 8个中断请求源及其优先级: 中断的分别介绍: 1、外部中断0:INT0   2、外部中断1  3、T0和 T1:定时计数器的功能 4、串口中断(串口为什么使用定时器后面讲) 中断寄存器 (1)中断允许控制(IE) (2)中断请求标志(TCON) (

    2024年01月25日
    浏览(32)
  • Flink timer定时器

    常见timer 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext 所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Lon

    2024年02月07日
    浏览(38)
  • STM32 学习笔记(六)定时器中断:内部时钟模式,外部时钟模式

    定时器是功能最强大,内容最复杂的32结构。 之前51用过的功能,定时产生中断。 输出比较,常用于产生 PWM 波形,驱动电机等。 输入捕获,测量方波频率。 编码器,读取正交编码器的波形。 最大定时时间:72M/65536/65536=中断频率,中断频率取倒数是最大定时时间。 定时器可

    2024年02月08日
    浏览(39)
  • Django框架使用定时器-APScheduler实现定时任务:django实现简单的定时任务

    系统:windows10 python: python==3.9.0 djnago==3.2.0 APScheduler==3.10.1 1、创建utils包,在包里面创建schedulers包 utils/schedulers/task.py utils/schedulers/scheduler.py utils/schedulers/__init__.py 2、项目配置文件settings.py

    2024年02月12日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包