使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

这篇具有很好参考价值的文章主要介绍了使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Flink实现Kafka到MySQL的数据流转换

在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍如何使用Apache Flink来实现这一过程。

mysql kafka数据转换,flink,kafka,mysql,etl

环境准备

在开始之前,请确保你的开发环境中已经安装并配置了以下组件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

Kafka消息队列

1. 启动zookeeper
 zkServer start
2. 启动kafka服务
 kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topic
 kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生产数据
 kafka-console-producer --broker-list localhost:9092 --topic east_money

MySQL数据库
初始化mysql表

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  `close` double DEFAULT NULL COMMENT '最新价',
  `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  `change` double DEFAULT NULL COMMENT '涨跌额',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交额',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今开',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市净率',
  `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

定义Kafka数据源表:我们使用一个SQL语句创建了一个Kafka表re_stock_code_price_kafka,这个表代表了我们要从Kafka读取的数据结构和连接信息。

tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")

定义MySQL目标表:然后,我们定义了一个MySQL表re_stock_code_price,指定了与MySQL的连接参数和表结构。

val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)

数据转换和写入:最后,我们执行了一个插入操作,将从Kafka读取的数据转换并写入到MySQL中。

tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")

result.print()

全部代码

package org.east

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Kafka2Mysql {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")


    val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")


    result.print()
    print("数据打印完成!!!")
  }
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
mysql kafka数据转换,flink,kafka,mysql,etl文章来源地址https://www.toymoban.com/news/detail-851655.html

到了这里,关于使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink数据流

    官网介绍 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。 1.无限流有一个开始,但没有定义的结束。它们不会在生成数据时终止并提供数据。必须连续处

    2024年02月17日
    浏览(48)
  • 大数据Flink(六十):Flink 数据流和分层 API介绍

    文章目录 Flink 数据流和分层 API介绍 一、​​​​​​​​​​​​​​Flink 数据流

    2024年02月12日
    浏览(44)
  • Flink1.17.0数据流

    官网介绍 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。 1.无限流有一个开始,但没有定义的结束。它们不会在生成数据时终止并提供数据。必须连续处

    2024年02月11日
    浏览(54)
  • 实时Flink数据流与ApacheHadoop集成

    在大数据时代,实时数据处理和批处理数据分析都是非常重要的。Apache Flink 和 Apache Hadoop 是两个非常受欢迎的大数据处理框架。Flink 是一个流处理框架,专注于实时数据处理,而 Hadoop 是一个批处理框架,专注于大规模数据存储和分析。在某些场景下,我们需要将 Flink 和 H

    2024年02月19日
    浏览(50)
  • 实时Flink数据流与ApacheHive集成

    在大数据时代,实时数据处理和批处理数据处理都是非常重要的。Apache Flink 是一个流处理框架,可以处理大规模的实时数据流,而 Apache Hive 是一个基于 Hadoop 的数据仓库工具,主要用于批处理数据处理。在实际应用中,我们可能需要将 Flink 与 Hive 集成,以实现流处理和批处

    2024年02月22日
    浏览(65)
  • ELK 将数据流转换回常规索引

    ELK 将数据流转换回常规索引 现象:创建索引模板是打开了数据流,导致不能创建常规索引,并且手动修改、删除索引模板失败 解决方法: 1、停止logstash不允许重新创建数据流的索引 2、kibana上删除数据流 3、修改索引模板将数据流转换回常规索引 4、重新启动logstash

    2024年02月14日
    浏览(36)
  • Spark Streaming + Kafka构建实时数据流

    1. 使用Apache Kafka构建实时数据流 参考文档链接:https://cloud.tencent.com/developer/article/1814030 2. 数据见UserBehavior.csv 数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集 根据这一csv文档运用Kafka模拟实时数据流,

    2024年02月12日
    浏览(44)
  • Kafka数据流的实时采集与统计机制

    随着大数据时代的到来,实时数据处理成为了众多企业和组织的关注焦点。为了满足这一需求,Apache Kafka成为了一个广泛采用的分布式流处理平台。Kafka以其高吞吐量、可扩展性和容错性而闻名,被广泛应用于日志收集、事件驱动架构和实时分析等场景。 在本文中,我们将探

    2024年02月07日
    浏览(40)
  • 后端返回数据流,前端进行转换blob文件流

    1. 首先相应的头里面请求改为 responseType: \\\'blob\\\' 2.           let res = await getPhotoVideoUrl() --此处为模拟的获取一个视频流的地址;         const img = new Blob([res], { type: \\\'image/png\\\' });         let imgUrl = window.URL.createObjectURL(img); 3.拿到流的地址后,先进行new Blob进行创建一个对象。

    2024年02月13日
    浏览(42)
  • 转换流-数据流-对象流-打印流-标准输入输出流

    把字节流转换为字符流,转换流是一种处理流。字节流有乱码的可能。 假设input.txt文件中存放了字符串 “abc中国” ,使用字节流读取会乱码,使用字符流读取是使用平台默认的编码格式读取的,如果文本存储是不是平台的编码格式,也会出现乱码。转换流本质上就是加了编

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包