Flink之JDBC Sink

这篇具有很好参考价值的文章主要介绍了Flink之JDBC Sink。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务文章来源地址https://www.toymoban.com/news/detail-627658.html

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/
public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
        DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
        // 构建jdbc sink
        SinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink(
                "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                new JdbcStatementBuilder<CustomizeBean>() {
                    @Override
                    public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
                        pStmt.setString(1, customizeBean.getName());
                        pStmt.setInt(2, customizeBean.getAge());
                        pStmt.setString(3, customizeBean.getGender());
                        pStmt.setString(4, customizeBean.getHobbit());
                    }
                }, // 字段映射配置,这部分就和常规的java api差不多了
                JdbcExecutionOptions.builder()
                        .withBatchSize(10) // 批次大小,条数
                        .withBatchIntervalMs(5000) // 批次最大等待时间
                        .withMaxRetries(1) // 重复次数
                        .build(), // 写入参数配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
                        .withUsername("root")
                        .withPassword("password")
                        .build() // jdbc信息配置
        );
        // 添加jdbc sink
        customizeSource.addSink(jdbcSink);
        env.execute();
    }
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;

import javax.sql.XADataSource;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/
public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
        DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());

        // 每20秒作为checkpoint的一个周期
        env.enableCheckpointing(20000);
        // 两次checkpoint间隔最少是10秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
        // 程序取消或者停止时不删除checkpoint
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // checkpoint必须在60秒结束,否则将丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只能有一个checkpoint
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置EXACTLY_ONCE语义,默认就是这个
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoint存储位置
        env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
        // 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpoint
        SinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink(
                "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                (JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {
                    pStmt.setString(1, customizeBean.getName());
                    pStmt.setInt(2, customizeBean.getAge());
                    pStmt.setString(3, customizeBean.getGender());
                    pStmt.setString(4, customizeBean.getHobbit());
                }, // 字段映射配置,这部分就和常规的java api差不多了
                JdbcExecutionOptions.builder()
                        .withMaxRetries(0) // 设置重复次数
                        .withBatchSize(25) // 设置批次大小,数据条数
                        .withBatchIntervalMs(1000) // 批次最大等待时间
                        .build(),
                JdbcExactlyOnceOptions.builder()
                        // 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的
                        .withTransactionPerConnection(true)
                        .build(),
                (SerializableSupplier<XADataSource>) () -> {
                    // XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接
                    MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
                    mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置url
                    mysqlXADataSource.setUser("root"); // 设置用户
                    mysqlXADataSource.setPassword("password"); // 设置密码
                    return mysqlXADataSource;
                }
        );
        // 添加jdbc sink
        customizeSource.addSink(exactlyOneJdbcSink);
        env.execute();
    }
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 -->
        
        <!-- JDBC connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- mysql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
  • 结果
    Flink之JDBC Sink,FLink,flink,大数据
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

到了这里,关于Flink之JDBC Sink的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink - sink算子

    水善利万物而不争,处众人之所恶,故几于道💦   1. Kafka_Sink   2. Kafka_Sink - 自定义序列化器   3. Redis_Sink_String   4. Redis_Sink_list   5. Redis_Sink_set   6. Redis_Sink_hash   7. 有界流数据写入到ES   8. 无界流数据写入到ES   9. 自定义sink - mysql_Sink   10. Jdbc_Sink 官方

    2024年02月14日
    浏览(49)
  • Flink之Kafka Sink

    代码内容 结果数据

    2024年02月15日
    浏览(44)
  • Flink将数据写入MySQL(JDBC)

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。 2.1 版本说明 2.2 导入相关依赖 2.3 连接数据库,创建表 2.4 创建POJO类 2.5 自定义map函数 2.5 Flink2MySQL 2.6 启动necat、Flink,观察数据库写

    2024年02月07日
    浏览(44)
  • 轻松通关Flink第34讲:Flink 和 Redis 整合以及 Redis Sink 实现

    上一课时我们使用了 3 种方法进行了 PV 和 UV 的计算,分别是全窗口内存统计、使用分组和过期数据剔除、使用 BitMap / 布隆过滤器。到此为止我们已经讲了从数据清洗到水印、窗口设计,PV 和 UV 的计算,接下来需要把结果写入不同的目标库供前端查询使用。 下面我们分别讲

    2024年02月08日
    浏览(43)
  • flink 13.5 sink elasticsearch-7

    mysql 数据-- flink sql --es mysql flink elasticsearch 5.7.20-log 13.5 7.12.0 官网可以下载包 flink-sql-connector-elasticsearch7_2.11-1.13.6.jar https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

    2024年02月14日
    浏览(40)
  • Flink创建Hudi的Sink动态表

    工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。 1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“

    2024年02月07日
    浏览(36)
  • Flink Table API/SQL 多分支sink

    在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: 使用 StreamStatementSet. 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    浏览(100)
  • Flink Table/Sql自定义Kudu Sink实战(其它Sink可参考)

    使用第三方的org.apache.bahir » flink-connector-kudu,batch模式写入数据到Kudu会有FlushMode相关问题 具体可以参考我的这篇博客通过Flink SQL操作创建Kudu表,并读写Kudu表数据 Flink的Dynamic table能够统一处理batch和streaming 实现自定义Source或Sink有两种方式: 通过对已有的connector进行拓展。比

    2024年02月14日
    浏览(47)
  • Flink(五)source、transformations、sink的详细示例(一)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(31)
  • 12、Flink source和sink 的 clickhouse 详细示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月13日
    浏览(81)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包