flink:通过table api把文件中读取的数据写入MySQL

这篇具有很好参考价值的文章主要介绍了flink:通过table api把文件中读取的数据写入MySQL。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;

public class Test41 {
    //demo 是MySQL中已经创建好的表
    //create table demo (userId varchar(50) not null,total bigint,avgVal double);
    private static String FILE_PATH = "info.txt";
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        tableEnv.connect(new FileSystem().path(FILE_PATH))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("userId", DataTypes.VARCHAR(50))
                        .field("ts", DataTypes.INT())
                        .field("val", DataTypes.DOUBLE()))
                .createTemporaryTable("input");



        Table dataTable = tableEnv.from("input");
        Table aggregateTable = dataTable
                .groupBy("userId")
                .select("userId, userId.count as total, val.avg as avgVal");


        String sql=

                "create table jdbcOutputTable (" +

                        " userId varchar(50) not null,total bigint,avgVal double " +

                        ") with (" +

                        " 'connector.type' = 'jdbc', " +

                        " 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +

                        " 'connector.table' = 'demo', " +

                        " 'connector.driver' = 'com.mysql.jdbc.Driver', " +

                        " 'connector.username' = 'root', " +

                        " 'connector.password' = 123456' )";

        tableEnv.sqlUpdate(sql);

        aggregateTable.insertInto("jdbcOutputTable");




        tableEnv.execute("my job");

    }
}

文件info.txt文章来源地址https://www.toymoban.com/news/detail-840347.html

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

到了这里,关于flink:通过table api把文件中读取的数据写入MySQL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 读写MySQL数据(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以将读取到的数据写入到MySQL中;本文通过两种方式将数据下入到MySQL数据库,其他的基于JDBC的数据库类似,另外,Table API方式的Catalog指定为Hive Catalog方式,持久化DDL操作。 另外,JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将

    2023年04月09日
    浏览(46)
  • qt学习:json数据文件读取写入

    目录 什么是json 基本格式 例子  解析json文件数据到界面上 组合json数据文档对象 json是一种轻量级的数据交互格式,简单来说,json就是一种在各个编程语言中流通的数据格式,负责不同编程语言中的数据传递和交互 以键值对的形式存放 键-----字符串 值------基本数据类型,字

    2024年01月24日
    浏览(45)
  • Python读取写入数据到Excel文件

    【Linux干货教程】Ubuntu Linux 换源详细教程 大家好,我是洲洲,欢迎关注,一个爱听周杰伦的程序员。关注公众号【程序员洲洲】即可获得10G学习资料、面试笔记、大厂独家学习体系路线等…还可以加入技术交流群欢迎大家在CSDN后台私信我! Hello,各位看官老爷们好,洲洲已

    2024年02月12日
    浏览(83)
  • Flink将数据写入CSV文件后文件中没有数据

    Flink中有一个过时的 sink 方法: writeAsCsv ,这个方法是将数据写入 CSV 文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因. 这里先看一下数据处理的代码 代码中我是使用的自定义数据源生产数据的方式

    2024年02月16日
    浏览(44)
  • Flink之FileSink将数据写入parquet文件

    在使用FileSink将数据写入列式存储文件中时必须使用 forBulkFormat ,列式存储文件如 ORCFile 、 ParquetFile ,这里就以 ParquetFile 为例结合代码进行说明. 在Flink 1.15.3 中是通过构造 ParquetWriterFactory 然后调用 forBulkFormat 方法将构造好的 ParquetWriterFactory 传入,这里先讲一下构造 ParquetWriterF

    2024年02月03日
    浏览(44)
  • 24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月04日
    浏览(60)
  • Python处理xlsx文件(读取、转为列表、新建、写入数据、保存)

    xlsxwriter**库对于xslx表的列数不做限制, xlrd 库不能写入超过65535行,256列的数据。 由于需要处理的数据行列数较多,遇到报错才发现库的限制问题,记录一下。

    2024年02月12日
    浏览(70)
  • 【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月20日
    浏览(52)
  • R语言【utils】——write.table(),write.csv(),write.csv2():将数据写入文件

    Package  utils  version 4.2.0 参数【x】 :要写入的对象,最好是矩阵或数据帧。如果不是,则尝试将其强制转换为数据帧。 参数【file】 :命名文件的字符串或打开用于写入的连接。“”表示向控制台输出。 参数【append】 :逻辑值。只有当 参数【file】 是一个字符串时才相关。

    2024年01月22日
    浏览(47)
  • 17、Flink 之Table API: Table API 支持的操作(1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包