Flink将数据写入MySQL(JDBC)

这篇具有很好参考价值的文章主要介绍了Flink将数据写入MySQL(JDBC)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        <flink.version>1.14.6</flink.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.8.5</hadoop.version>
        <hbase.version>1.4.9</hbase.version>
        <hive.version>2.3.5</hive.version>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <mysql.version>8.0.22</mysql.version>
        <scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

 <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>8.0.22</version>
</dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( 
      `id` varchar(100) NOT NULL
      ,`ts` bigint(20) DEFAULT NULL
      ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) 
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;


import java.util.Objects;

/**
 * TODO POJO类的特点
 * 类是公有(public)的
 * 有一个无参的构造方法
 * 所有属性都是公有(public)的
 * 所有属性的类型都是可以序列化的
 */
public class WaterSensor {
    //类的公共属性
    public String id;
    public Long ts;
    public Integer vc;

    //无参构造方法
    public WaterSensor() {
        //System.out.println("调用了无参数的构造方法");
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    //生成get和set方法
    public void setId(String id) {
        this.id = id;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public Long getTs() {
        return ts;
    }

    public Integer getVc() {
        return vc;
    }

    //重写toString方法
    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }

    //重写equals和hasCode方法
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;

import org.apache.flink.api.common.functions.MapFunction;

public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] datas = value.split(",");
        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
    }
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;

import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * Flink 输出到 MySQL(JDBC)
 */
public class flinkSinkJdbc {
    public static void main(String[] args) throws Exception {
        //TODO 创建Flink上下文执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        //TODO Source
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        //TODO Transfer
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
        /**TODO 写入 mysql
         * 1、只能用老的 sink 写法
         * 2、JDBCSink 的 4 个参数:
         *   第一个参数: 执行的 sql,一般就是 insert into
         *   第二个参数: 预编译 sql, 对占位符填充值
         *   第三个参数: 执行选项 ---->攒批、重试
         *   第四个参数: 连接选项---->url、用户名、密码
         */
        SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                        System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
                    }
                }
                , JdbcExecutionOptions
                        .builder()
                        .withMaxRetries(3)         // 重试次数
                        .withBatchSize(100)        // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("********")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        //TODO 写入到Mysql
        waterSensorSingleOutputStreamOperator.addSink(sinkFunction);

        streamExecutionEnvironment.execute();
    }
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

Flink将数据写入MySQL(JDBC),# Flink,flink,大数据
启动Flink程序
Flink将数据写入MySQL(JDBC),# Flink,flink,大数据
查看数据库写入是否正常
Flink将数据写入MySQL(JDBC),# Flink,flink,大数据文章来源地址https://www.toymoban.com/news/detail-730789.html

到了这里,关于Flink将数据写入MySQL(JDBC)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(37)
  • 6.2、Flink数据写入到Kafka

    目录 1、添加POM依赖 2、API使用说明 3、序列化器 3.1 使用预定义的序列化器 3.2 使用自定义的序列化器 4、容错保证级别 4.1 至少一次 的配置 4.2 精确一次 的配置 5、这是一个完整的入门案例 Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    2024年02月09日
    浏览(35)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(34)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

    需求描述: 1、数据从 Kafka 写入 ClickHouse。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。 5、Kafka 数据为 Json 格式,通过 FlatMap 扁平

    2024年02月03日
    浏览(34)
  • 【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

    需求描述: 1、数据从 Kafka 写入 Hive。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。 5、先在 Hive 中创建表然后动态获取 Hive 的表

    2024年02月03日
    浏览(43)
  • Flink之FileSink将数据写入parquet文件

    在使用FileSink将数据写入列式存储文件中时必须使用 forBulkFormat ,列式存储文件如 ORCFile 、 ParquetFile ,这里就以 ParquetFile 为例结合代码进行说明. 在Flink 1.15.3 中是通过构造 ParquetWriterFactory 然后调用 forBulkFormat 方法将构造好的 ParquetWriterFactory 传入,这里先讲一下构造 ParquetWriterF

    2024年02月03日
    浏览(31)
  • Flink将数据写入CSV文件后文件中没有数据

    Flink中有一个过时的 sink 方法: writeAsCsv ,这个方法是将数据写入 CSV 文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因. 这里先看一下数据处理的代码 代码中我是使用的自定义数据源生产数据的方式

    2024年02月16日
    浏览(28)
  • 15_基于Flink将pulsar数据写入到ClickHouse

    编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作 3.8.1.ClickHouse基本介绍 ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。 结论: ClickHouse像很多OL

    2024年02月14日
    浏览(72)
  • 14_基于Flink将pulsar数据写入到HBase

    3.7.1.编写Flink完成数据写入到Hbase操作, 完成数据备份, 便于后续进行即席查询和离线分析 3.7.1.1.HBase基本介绍 hbase是基于Google发布bigTable论文产生一款软件, 是一款noSQL型数据, 不支持SQL. 不支持join的操作, 没有表关系, 不支持事务(多行事务),hbase是基于 HDFS的采用java 语言编写 查

    2024年02月13日
    浏览(25)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包