使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

这篇具有很好参考价值的文章主要介绍了使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表文章来源地址https://www.toymoban.com/news/detail-680930.html

package flink;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class FlinkSQL_CDC {

    public static void main(String[] args) throws Exception {

//
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",3335);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        //2.创建Flink-MySQL-CDC的Source
        TableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +
                "  id INT primary key," +
                "  name STRING" +
                ") WITH (" +
                "  'connector' = 'mysql-cdc'," +
                "  'hostname' = 'hadoop102'," +
                "  'port' = '3306'," +
                "  'username' = 'root'," +
                "  'password' = 'xxxx'," +
                "  'database-name' = 'student'," +
                "  'table-name' = 'table_name'," +
                "'server-time-zone' = 'Asia/Shanghai'," +
                "'scan.startup.mode' = 'initial'" +
                ")"
        );

        // 2. 注册SinkTable: sink_sensor
//        tableEnv.executeSql("" +
//                "CREATE TABLE kafka_binlog ( " +
//                "  user_id INT, " +
//                "  user_name STRING, " +
//                "`proc_time` as PROCTIME()" +
//                ") WITH ( " +
//                "  'connector' = 'kafka', " +
//                "  'topic' = 'test2', " +
//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                "  'format' = 'json' " +
//                ")" +
//                "");

        //upsert-kafka
        tableEnv.executeSql("" +
                "CREATE TABLE kafka_binlog ( " +
                "  user_id INT, " +
                "  user_name STRING, " +
                "`proc_time` as PROCTIME()," +
                "  PRIMARY KEY (user_id) NOT ENFORCED" +
                ") WITH ( " +
                "  'connector' = 'upsert-kafka', " +
                "  'topic' = 'test2', " +
                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
                "  'key.format' = 'json' ," +
                "  'value.format' = 'json' " +
                ")" +
                "");


        // 3. 从SourceTable 查询数据, 并写入到 SinkTable
         tableEnv.executeSql("insert into kafka_binlog select * from table_name");

         tableEnv.executeSql("select * from kafka_binlog").print();

        env.execute();
    }

}

到了这里,关于使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Canal+Kafka实现Mysql数据同步

    canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

    2024年02月12日
    浏览(47)
  • cancel框架同步mysql数据到kafka

    1、下载cancel 2、修改conf文件夹下的canal.properties配置文件 3、修改conf/example文件夹下的instance.properties配置文件 在sql查询show binary logs语句得到binlog日志 4、启动 在bin目录下执行 启动程序 注:MySQL需要创建新用户

    2024年02月15日
    浏览(53)
  • 通过kafka connector实现mysql数据自动同步es

    整体思路: 1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列 2、通过listener监听消息队列,代码控制数据插入es ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后

    2024年02月08日
    浏览(45)
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如 MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。 本文中我们将采用 Debezium 与 Kafka 组合的方式来实现从 MySQL 到 DolphinDB 的数

    2024年02月02日
    浏览(41)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(43)
  • Doris:MySQL数据同步到Doris的N种方式

    目录 1.CSV文件方式 1.1 导出mysql数据 1.2 导入数据 2.JDBC 编码方式 3.JDBC Catalog 方式 3.1 上传mysql驱动包 3.2 创建mysql catalog 3.3. 插入数据 4.Binlog Load 方式         当mysql与doris服务之间无法通过网络互联时,可以通过将mysql数据导出成csv文件,然后再在doris服务器导入csv文件的方

    2024年02月04日
    浏览(47)
  • 成功解决VScode每次只能打开一个文件,即只能打开一个编辑窗口。

    点击文件 -- 首选项 -- 设置 -- 工作台 -- 编辑管理 -- 取消勾选Enable Preview 如下图所示: 下拉,取消勾选Enable Preview

    2024年02月16日
    浏览(45)
  • Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

    这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。 程序代码: 输出结果 用于验证一些通过Socket传输数据的场景非常方便。 程序代码: 测试时,需要先在 172.16.3.6 的服务器上启动 nc ,然后再启动Flink读

    2024年02月16日
    浏览(41)
  • flink cdc数据同步,DataStream方式和SQL方式的简单使用

    目录 一、flink cdc介绍 1、什么是flink cdc 2、flink cdc能用来做什么 3、flink cdc的优点 二、flink cdc基础使用 1、使用flink cdc读取txt文本数据 2、DataStream的使用方式 3、SQL的方式 总结 flink cdc是一个由阿里研发的,一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数

    2024年02月13日
    浏览(40)
  • TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

    快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群 创建 changefeed,将 TiDB 增量数据输出至 Kafka 使用 go-tpc 写入数据到上游 TiDB 使用 Kafka console consumer 观察数据被写入到指定的 Topic (可选)配置 Flink 集群消费 Kafka 内数据 部署包含 TiCDC 的 TiDB 集群 在实验或测试环境中,可以使用 TiU

    2024年02月12日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包