自定义Flink SourceFunction定时读取数据库

这篇具有很好参考价值的文章主要介绍了自定义Flink SourceFunction定时读取数据库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的 sources。Flink提供了多种预定义的 stream source:基于文件、 套接字、集合等source;但没用提供数据库相关的Source。

一、自定义Flink SourceFunction定时读取数据库

有些场景需要定时的读取不断变化的数据库数据作为流数据。本文中的代码实现适用于所有关系数据库。

  • 在构造方法中传递数据库连接参数、定时周期等信息
  • run:在run中定时读取数据库数据并emit到发送到下一节点。
  • cancel: 取消一个 source,running状态改为false将 run 中的循环 emit 元素的行为终止。

二、java代码实现

/**
 * 关系库流数据源 
 *
 */
public class DbSourceFunction extends RichSourceFunction<Row> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(DbSourceFunction.class);
    private volatile boolean isRunning = true;
    private String driver = null;
    //执行周期(秒)
    private Long period = null;
    private JSONObject conf;
    private DataBaseType baseType;

    public DbFullSourceFunction(JSONObject conf, DataBaseType baseType) {
        this.conf = conf;
        this.baseType = baseType;
        this.driver = baseType.getDriverClassName();
        // 执行周期
        period = conf.getLong("period");
        //周期单位
        String unit = conf.getString("executionWay", "seconds");

        if (period != null && period > 0) {
            //根据时间单位转换为秒
            period = FuntionUtil.getSeconds(unit, period);
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void run(SourceContext<Row> ctx) throws Exception {
        while (isRunning) {
            String querySql = conf.getString(Key.QUERY_SQL);
            List<JSONObject> columnList = conf.getList(Key.COLUMN);
            int len = columnList.size();
            Connection connect = null;
            PreparedStatement ps = null;
            ResultSet rs = null;
            try {
                while (connect == null) {
                    try {
                        connect = getConnection();
                        if (connect != null) {
                            break;
                        }
                    } catch (Exception w) {
                        LOG.error("获取连接异常", w.getMessage());
                    }

                }
                ps = connect.prepareStatement(querySql);
                try {
                    rs = ps.executeQuery();
                    while (rs.next()) {
                        Row row = new Row(len);
                        for (int i = 0; i < len; i++) {
                            JSONObject column = columnList.get(i);
                            Integer columnType = column.getInt(Key.COLUMN_TYPE);
							//将ResultSet数据转换为Flink Row
                            RowSetFieldUtil.rowSetFieldResultSet(row, rs, i, columnType, baseType);
                        }
                        // 发送结果
                        ctx.collect(row);
                    }
                } catch (Exception e) {
                    LOG.error("查询出现异常",e);
                    if (ps != null) {
                        ps.close();
                    }
                    if (connect != null) {
                        connect.close();
                    }
                }
            } catch (Exception e) {
                LOG.error("查询数据异常", e);
                throw e;
            } finally {
                if (rs != null) {
                    rs.close();
                }
                if (ps != null) {
                    ps.close();
                }
                if (connect != null) {
                    connect.close();
                }
            }
            if (period == null || period <= 0) {
                isRunning = false;
            } else {
                Long takeTime = (end - start) / 1000;
                //去掉执行消耗时间
                LOG.error("sleep time:" + (period - takeTime));
                TimeUnit.SECONDS.sleep(period - takeTime);
            }

        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    private Connection getConnection() {
        Connection connection = null;
        try {
            String username = conf.getString(Key.USERNAME);
            String password = conf.getString(Key.PASSWORD);
            password = PubFunction.decryptStr(password);
            String jdbcUrl = conf.getString(String.format("%s[0]", Key.JDBC_URL));
            // 创建连接
            connection = DriverManager.getConnection(jdbcUrl, username, password);
        } catch (Exception e) {
            LOG.error("get connection occur exception", e);
            throw new RuntimeException("get connection occur exception", e);
        }
        return connection;
    }
}

总结

完整代码请点击下载自定义Flink SourceFunction定时读取数据库java代码下载文章来源地址https://www.toymoban.com/news/detail-783149.html

到了这里,关于自定义Flink SourceFunction定时读取数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 定时备份MySQL数据库

    MySQL 提供了 mysqldump 命令来实现导出数据库,命令用法如下: 在 Linux 服务器中结合 crontab 定时命令实现定时备份数据库,同时支持压缩、备份日志、定期清理等功能。 新建 mysql 的备份命令脚本 复制以下内容: 至此,执行该脚本,可以生成压缩的数据库.sql文件 给 shell 设置

    2024年02月07日
    浏览(65)
  • mysql数据库定时备份

    1  环境检查 1. 执行本手册前,请正确安装Mysql数据库,并知晓数据库用户名和密码; 2. 执行本手册前,请先确定Mysql数据库数据要储存的天数;默认30 2  批处理文件配置 当前步骤主要实现以下目标配置: Ø 配置Mysql安装目录,定义Mysql目录下BIN目录路径,用于定时导出数

    2024年02月08日
    浏览(62)
  • 数据库定时备份linux篇

    目录 1 序言 2 crond 相关知识点 2.1 crond 是什么? 2.2 crontab 进行定时任务设置 2.2.1 crontab 指令选项说明 2.2.2 crontab 指令使用格式 2.2.3 特定时间执行任务例子 2.2.4 crontab 设置步骤 3 各个数据库备份脚本 3.1 Oracle数据库 3.2 Mysql数据库 3.3 postgresql数据库 3.4 mongoDB数据库 4 定时备份数据

    2024年02月04日
    浏览(60)
  • PostgreSQL数据库定时备份脚本

          大多数数据库管理系统都提供了自带的备份工具,可以使用这些工具来进行备份操作。     例如: MySQL:使用 mysqldump 命令进行备份。 PostgreSQL:使用 pg_dump 命令进行备份。       以下是一个用于定时备份 PostgreSQL 数据库的示例脚本。这个脚本将使用 pg_dump 工具来创建

    2024年02月11日
    浏览(52)
  • 【数据处理】建立数据库索引并定时重建索引

    给表 建立索引能加速查询 (我的习惯是给经常查询的列建立索引,如果经常查询的是id列,我会给将id设置为主键),长时间查询后会变慢(具体原因目前不清楚),公司前辈说 定期重建索引就可以解决问题 ,我就在 Microsoft Sql Server Management Studio 里设置了定时“自动重建索

    2024年01月25日
    浏览(48)
  • 数据库定时备份winserver2012篇

    目录 1 序言 2 任务计划相关知识点介绍 2.1 任务计划 是什么? 2.2 批处理文件 2.2.1 批处理文件简介 2.2.2 批处理常用命令介绍 3 各个数据库备份脚本 3.1 Oracle数据库 3.2 Mysql数据库 3.3 postgresql数据库 3.4 mongoDB数据库 4 添加任务计划定时备份数据库 数据是无价的,所以生产环境中

    2024年02月04日
    浏览(52)
  • Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

    这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。 程序代码: 输出结果 用于验证一些通过Socket传输数据的场景非常方便。 程序代码: 测试时,需要先在 172.16.3.6 的服务器上启动 nc ,然后再启动Flink读

    2024年02月16日
    浏览(44)
  • Java读取数据库表

    Java自带的日志。 常见用法如下,打印日志信息: logger.error(\\\"数据库连接失败\\\",e) logger.info(\\\"tableName:{},comment:{}\\\",tableName,comment),{}是占位符 指定类初始化日志对象,在日志输出的时候,可以打印出日志信息所在类。 Connection 对象用于打开与数据源的连接。 加载驱动程序。 获取

    2024年02月02日
    浏览(34)
  • Flash读取数据库中的数据

     Flash读取数据库中的数据 要读取数据库的记录,首先需要建立一个数据库,并输入一些数据。数据库建立完毕后,由Flash向ASP提交请求,ASP根据请求对数据库进行操作后将结果返回给Flash,Flash以某种方式把结果显示出来。 1.启动Access2003,新建一名为“userInfo.mdb”的数据库,

    2024年01月25日
    浏览(48)
  • 使用脚本定时备份MySql数据库文件

    如果mysql不在环境变量中,请先将mysql放入环境变量   新建一个脚本 脚本内容: 需要给这个脚本文件执行权限: 自动执行 查看crontab服务状态: 手动启动crontab服务: 查看crontab服务是否已设置为开机启动,执行命令: 加入开机自动启动:

    2024年04月26日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包