2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL)

这篇具有很好参考价值的文章主要介绍了2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、JDBC SQL 连接器

FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据

添加Maven依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>

注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包

 相关jar可以通过官网下载:JDBC SQL 连接器 

2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL),# FlinkSQL 使用技巧,mysql,java,数据库


2、读取 MySQL

FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用

-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01/flink',
   'driver' = 'com.mysql.jdbc.Driver',  -- 【可选】不设置时,将自动从url中推导
   'username' = 'xxxx',
   'password' = 'xxxx',
   'table-name' = 'books'
);

-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;

运行结果:

2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL),# FlinkSQL 使用技巧,mysql,java,数据库


3、写入MySQL

3.1 何时批量写入MySQL呢?

FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL),# FlinkSQL 使用技巧,mysql,java,数据库

开启checkpoint :

# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;

设置 flush 前缓存记录的最大值 、flush 间隔时间:

-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
   'username' = 'xxxx',
   'password' = 'xxxx',
   'table-name' = 'books',
   'sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)
   'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);

使用说明:

FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度

        当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接

        当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优


3.2 sink mysql table 中主键的作用

在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作

否则连接器将以 append 模式工作

         upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行

                               使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束

       append 模式:对MySQL库中底表做insert操作

 upsert 模式:

-- TODO 创建MySQL 表
CREATE TABLE `books` (
  `id` int(11) NOT NULL,
  `title` varchar(99) DEFAULT NULL,
  `author` varchar(99) DEFAULT NULL,
  `price` double DEFAULT NULL,
  `qty` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT,
  PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
   'username' = 'root',
   'password' = 'xxxx',
   'table-name' = 'books',
   'sink.buffer-flush.max-rows' = '0' -- 实时写入
);

-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES
  (5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

append 模式:

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
   'username' = 'root',
   'password' = 'xxx',
   'table-name' = 'books',
   'sink.buffer-flush.max-rows' = '0' -- 实时写入
);

-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES
  (5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败

insert into 失败:

2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL),# FlinkSQL 使用技巧,mysql,java,数据库文章来源地址https://www.toymoban.com/news/detail-735003.html

到了这里,关于2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java】IntelliJ IDEA使用JDBC连接MySQL数据库并写入数据

    在 IntelliJ IDEA 中连接 MySQL 数据库并将数据存储在数据表中,使用 Java 和 JDBC(Java Database Connectivity)实现。 下载并安装 IntelliJ IDEA 下载并安装 MySQL 数据库 下载 MySQL Connector/J 驱动程序(JDBC 驱动程序) 使用 Navicat 创建一个 MySQL 数据库 打开 IntelliJ IDEA。 选择 \\\"File\\\"→ “New” →

    2024年02月05日
    浏览(77)
  • 【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

    问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    浏览(35)
  • Flink将数据写入MySQL(JDBC)

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。 2.1 版本说明 2.2 导入相关依赖 2.3 连接数据库,创建表 2.4 创建POJO类 2.5 自定义map函数 2.5 Flink2MySQL 2.6 启动necat、Flink,观察数据库写

    2024年02月07日
    浏览(41)
  • JDBC MySQL任意文件读取分析

    文章首发于知识星球-赛博回忆录。给主管打个广告,嘿嘿。 在渗透测试中,有些发起mysql测试流程(或者说mysql探针)的地方,可能会存在漏洞。在连接测试的时候通过添加allowLoadLocalInfileInPath,allowLoadLocalInfile,allowUrlInLocalInfile与伪造的服务器进行通信,造成任意文件读取。 完整

    2024年02月07日
    浏览(35)
  • flink:通过table api把文件中读取的数据写入MySQL

    当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 文件info.txt

    2024年03月15日
    浏览(47)
  • python如何操作excel,在excel中读取和写入数据

    Excel 是 Microsoft 为使用 Windows 和 Apple Macintosh 操作系统的电脑编写的一款电子表格软件。直观的界面、出色的计算功能和图表工具,再加上成功的市场营销,使 Excel 成为最流行的个人计算机数据处理软件。在 1993 年,作为 Microsoft Office 的组件发布了5.0版之后, Excel 就开始成为

    2024年02月03日
    浏览(58)
  • 使用EasyExcel读取和写入表格

    首先我创建了一个Excel的文件,数据如下 读取excel在EasyExcel中是很简单的一件事情,首先,我们需要有对象去存储一条条数据,所以这里我建立了一个Student对象,有如下四个字段,并分别赋予了get和set方法 @ExcelProperty该注解就是映射表头的,value的值就是Excel中实际的表头数据

    2024年02月02日
    浏览(34)
  • 2.4 如何在FlinkSQL使用DataGen(数据生成器)

    FLinkSQL中可以使用内置的DataGen SQL 连接器来生成测试数据 官网链接:DataGen SQL 连接器 随机数数据生成器支持随机生成 char、varchar、binary、varbinary、string 类型的数据 它是一个无界流的数据生成器 运行结果: 序列数据生成器,可以根据指定的起始值和结束值生成连续的整数数

    2024年02月05日
    浏览(37)
  • MySQL特殊函数使用技巧

    使用group_concat函数,可以轻松的把分组后,name 相同的数据拼接到一起,组成一个字符串,用逗号分隔。 通过该函数就能获取字符长度。 在某个字符串中的位置 将字符串中的字符 A 替换成 B。REPLACE(name,‘A’,‘B’) 获取当前时间 这样就能将 order 表中的部分数据,非常轻松插

    2024年02月04日
    浏览(34)
  • C#【必备技能篇】使用NPOI实现对excel的读取和写入

    依次执行下图中的1-6按钮 , 可以通过查看程序文件夹中的excel文件来加深理解。 链接:https://pan.baidu.com/s/19PgQMhCKviw9aBAjmJHSww 提取码:2omi 需要在源码中增加如下引用。相应的dll已更新到 5 的下载地址中。

    2023年04月13日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包