[大数据 Flink,Java实现不同数据库实时数据同步过程]

这篇具有很好参考价值的文章主要介绍了[大数据 Flink,Java实现不同数据库实时数据同步过程]。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

🌮前言:

🌮实现Mysql同步Es的过程包括以下步骤:

🌮配置Mysql数据库连接

🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置:

🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代码如下:

🌮最后,将步骤2中读取到的数据封装成一个Flink的DataStream程序,用于后续的数据处理和写入Es中。

🌮配置Elasticsearch连接

🌮在Flink的配置文件中,添加Elasticsearch的连接信息。可以在flink-conf.yaml文件中添加如下配置:

🌮在Flink程序中,使用ElasticsearchSinkFunction将数据写入Elasticsearch中。具体代码如下:

🌮实现数据的转换和处理

🌮实现数据的批量写入:

🌮实现实时同步:

🌮依赖:


🌮前言:

     🌮笔记

🌮实现Mysql同步Es的过程包括以下步骤:

  • 配置Mysql数据库连接: 使用Flink的JDBC连接器来连接Mysql数据库,并定义查询语句,获取需要同步的数据。同时,需要在Flink的配置文件中配置Mysql数据库的连接信息。

  • 配置Elasticsearch连接: 使用Flink的Elasticsearch连接器来连接Elasticsearch,并定义索引和类型,用于将同步的数据写入到指定的索引中。同时,需要在Flink的配置文件中配置Elasticsearch的连接信息。

  • 实现数据的转换和处理: 通过Flink的DataStream API,将从Mysql中查询到的数据转换为Elasticsearch中的文档格式,并进行相应的处理和处理,如去重、过滤等。

  • 实现数据的批量写入: 使用Flink的Elasticsearch连接器提供的批量写入接口,将转换后的数据批量写入到Elasticsearch中。

  • 实现实时同步: 将以上步骤组合成一个Flink Job,并通过Flink的DataStream API实现实时同步,即从Mysql数据库中读取到最新的数据,经过转换和处理后,实时写入到Elasticsearch中。

需要注意的是,在实现实时同步过程中,需要考虑到数据的幂等性和错误处理机制,以保证同步过程的稳定性和可靠性。同时,也需要考虑到数据的增量同步和全量同步的情况,以便根据实际需求进行调整和优化。

🌮配置Mysql数据库连接

需要使用Flink的JDBC连接器来连接Mysql数据库,并定义查询语句,获取需要同步的数据。同时,需要在Flink的配置文件中配置Mysql数据库的连接信息。

🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置:

# Mysql数据库连接信息
env.java.opts: "-Dmysql.url=jdbc:mysql://localhost:3306/test -Dmysql.username=root -Dmysql.password=123456"
 

mysql.url表示Mysql数据库的连接地址,mysql.username表示Mysql数据库的用户名,mysql.password表示Mysql数据库的密码。

🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代码如下:
// 定义Mysql数据库连接信息
String mysqlUrl = System.getProperty("mysql.url");
String mysqlUsername = System.getProperty("mysql.username");
String mysqlPassword = System.getProperty("mysql.password");

// 定义查询语句
String query = "SELECT * FROM user";

// 定义JDBC连接器
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl(mysqlUrl)
    .setUsername(mysqlUsername)
    .setPassword(mysqlPassword)
    .setQuery(query)
    .setRowTypeInfo(rowTypeInfo)
    .finish();

// 读取Mysql数据库中的数据
DataStream<Row> mysqlDataStream = env.createInput(jdbcInputFormat);

rowTypeInfo表示数据类型信息,需要根据Mysql数据库中的表结构来定义。

🌮最后,将步骤2中读取到的数据封装成一个Flink的DataStream程序,用于后续的数据处理和写入Es中。
// 将读取到的数据封装成一个Flink的DataStream程序
DataStream<String> jsonDataStream = mysqlDataStream.map(new MapFunction<Row, String>() {
    @Override
    public String map(Row row) throws Exception {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", row.getField(0));
        jsonObject.put("name", row.getField(1));
        jsonObject.put("age", row.getField(2));
        return jsonObject.toJSONString();
    }
});

🌮配置Elasticsearch连接

需要配置Elasticsearch连接,使用Flink的Elasticsearch连接器来连接Elasticsearch,并定义索引和类型,用于将同步的数据写入到指定的索引中。同时,需要在Flink的配置文件中配置Elasticsearch的连接信息。

🌮在Flink的配置文件中,添加Elasticsearch的连接信息。可以在flink-conf.yaml文件中添加如下配置:

# Elasticsearch连接信息
env.java.opts: "-Delasticsearch.hosts=http://localhost:9200"
 

🌮在Flink程序中,使用ElasticsearchSinkFunction将数据写入Elasticsearch中。具体代码如下:
// 定义Elasticsearch连接信息
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));

// 定义ElasticsearchSinkFunction
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        IndexRequest indexRequest = Requests.indexRequest()
            .index("user")
            .type("_doc")
            .source(element, XContentType.JSON);
        indexer.add(indexRequest);
    }
});

// 将数据写入Elasticsearch中
jsonDataStream.addSink(esSinkBuilder.build());

httpHosts表示Elasticsearch的连接地址,ElasticsearchSinkFunction用于将数据写入Elasticsearch中。在ElasticsearchSinkFunction中,可以定义索引和类型,用于将数据写入到指定的索引中。

以上代码中,将数据写入到名为"user"的索引中,类型为"_doc"。同时,使用IndexRequest将数据写入Elasticsearch中。

🌮实现数据的转换和处理
  • 在第二步中,已经将从Mysql中查询到的数据转换成了JSON格式。接下来,需要将JSON格式的数据转换成Elasticsearch中的文档格式。可以使用Elasticsearch的Bulk API来实现。

  • 在转换成Elasticsearch中的文档格式之前,需要进行去重操作,避免重复写入相同的数据。可以使用Flink的KeyedStream API来实现。

// 将JSON格式的数据转换成Elasticsearch中的文档格式
DataStream<IndexRequest> esDataStream = jsonDataStream.map(new MapFunction<String, IndexRequest>() {
    @Override
    public IndexRequest map(String json) throws Exception {
        JSONObject jsonObject = JSON.parseObject(json);
        String id = jsonObject.getString("id");
        IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
        indexRequest.source(json, XContentType.JSON);
        return indexRequest;
    }
});

// 进行去重操作
KeyedStream<IndexRequest, String> keyedStream = esDataStream.keyBy(new KeySelector<IndexRequest, String>() {
    @Override
    public String getKey(IndexRequest indexRequest) throws Exception {
        return indexRequest.id();
    }
});

// 将去重后的数据写入Elasticsearch中
keyedStream.addSink(esSinkBuilder.build());

使用MapFunction将JSON格式的数据转换成Elasticsearch中的文档格式。在转换成Elasticsearch中的文档格式之前,使用KeyedStream API进行去重操作,避免重复写入相同的数据。最后,将去重后的数据写入Elasticsearch中。

🌮实现数据的批量写入:

在第三步中已经使用了Elasticsearch的Bulk API来实现将转换后的数据批量写入到Elasticsearch中。具体代码如下:

// 将JSON格式的数据转换成Elasticsearch中的文档格式
DataStream<IndexRequest> esDataStream = jsonDataStream.map(new MapFunction<String, IndexRequest>() {
    @Override
    public IndexRequest map(String json) throws Exception {
        JSONObject jsonObject = JSON.parseObject(json);
        String id = jsonObject.getString("id");
        IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
        indexRequest.source(json, XContentType.JSON);
        return indexRequest;
    }
});

// 进行去重操作
KeyedStream<IndexRequest, String> keyedStream = esDataStream.keyBy(new KeySelector<IndexRequest, String>() {
    @Override
    public String getKey(IndexRequest indexRequest) throws Exception {
        return indexRequest.id();
    }
});

// 将去重后的数据写入Elasticsearch中
ElasticsearchSink.Builder<IndexRequest> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<IndexRequest>() {
    @Override
    public void process(IndexRequest indexRequest, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(indexRequest);
    }
});

keyedStream.addSink(esSinkBuilder.build());

在ElasticsearchSinkFunction中,使用RequestIndexer将数据批量写入到Elasticsearch中。需要注意的是,ElasticsearchSinkFunction的泛型类型需要与KeyedStream的泛型类型保持一致。

以上代码中,使用KeyedStream API进行去重操作,避免重复写入相同的数据。最后,使用Elasticsearch的Bulk API将去重后的数据批量写入到Elasticsearch中。

🌮实现实时同步:
// 定义Mysql数据库连接信息
String mysqlUrl = System.getProperty("mysql.url");
String mysqlUsername = System.getProperty("mysql.username");
String mysqlPassword = System.getProperty("mysql.password");

// 定义查询语句
String query = "SELECT * FROM user";

// 定义JDBC连接器
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl(mysqlUrl)
    .setUsername(mysqlUsername)
    .setPassword(mysqlPassword)
    .setQuery(query)
    .setRowTypeInfo(rowTypeInfo)
    .finish();

// 读取Mysql数据库中的数据
DataStream<Row> mysqlDataStream = env.createInput(jdbcInputFormat);

// 将读取到的数据转换成JSON格式
DataStream<String> jsonDataStream = mysqlDataStream.map(new MapFunction<Row, String>() {
    @Override
    public String map(Row row) throws Exception {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", row.getField(0));
        jsonObject.put("name", row.getField(1));
        jsonObject.put("age", row.getField(2));
        return jsonObject.toJSONString();
    }
});

// 将JSON格式的数据转换成Elasticsearch中的文档格式
DataStream<IndexRequest> esDataStream = jsonDataStream.map(new MapFunction<String, IndexRequest>() {
    @Override
    public IndexRequest map(String json) throws Exception {
        JSONObject jsonObject = JSON.parseObject(json);
        String id = jsonObject.getString("id");
        IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
        indexRequest.source(json, XContentType.JSON);
        return indexRequest;
    }
});

// 进行去重操作
KeyedStream<IndexRequest, String> keyedStream = esDataStream.keyBy(new KeySelector<IndexRequest, String>() {
    @Override
    public String getKey(IndexRequest indexRequest) throws Exception {
        return indexRequest.id();
    }
});

// 将去重后的数据写入Elasticsearch中
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));

ElasticsearchSink.Builder<IndexRequest> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<IndexRequest>() {
    @Override
    public void process(IndexRequest indexRequest, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(indexRequest);
    }
});

keyedStream.addSink(esSinkBuilder.build());

// 执行Flink程序
env.execute("Mysql to Es");

🌮依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.76</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.15.0</version>
    </dependency>
</dependencies>

flink-java、flink-streaming-java_2.12、flink-connector-jdbc_2.12、flink-connector-elasticsearch7_2.12是Flink的核心依赖;fastjson是用于将数据转换成JSON格式的依赖;elasticsearch-rest-high-level-client是Elasticsearch的Java客户端依赖。文章来源地址https://www.toymoban.com/news/detail-689262.html

到了这里,关于[大数据 Flink,Java实现不同数据库实时数据同步过程]的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实时Flink的数据库与Kafka集成优化案例

    在现代数据处理系统中,实时数据处理和分析是至关重要的。Apache Flink是一个流处理框架,可以用于实时数据处理和分析。在许多场景下,Flink需要与数据库和Kafka等消息系统进行集成,以实现更高效的数据处理。本文将讨论Flink与数据库和Kafka集成的优化案例,并提供实际示

    2024年02月20日
    浏览(32)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(31)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(31)
  • 基于Canal与Flink实现数据实时增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    浏览(34)
  • OceanBase X Flink 基于原生分布式数据库构建实时计算解决方案

    摘要:本文整理自 OceanBase 架构师周跃跃,在 Flink Forward Asia 2022 实时湖仓专场的分享。本篇内容主要分为四个部分: 分布式数据库 OceanBase 关键技术解读 生态对接以及典型应用场景 OceanBase X Flink 在游戏行业实践 未来展望 点击查看原文视频 演讲PPT 作为一款历经 12 年的纯自研

    2024年02月13日
    浏览(33)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(30)
  • Spring Boot应用中如何动态指定数据库,实现不同用户不同数据库的场景

    当在 Spring Boot 应用程序中使用Spring Data JPA 进行数据库操作时,配置Schema名称是一种常见的做法。然而,在某些情况下,模式名称需要是动态的,可能会在应用程序运行时发生变化。比如:需要做数据隔离的SaaS应用。 所以,这篇博文将帮助您解决了在 Spring Boot 应用程序中如

    2024年04月26日
    浏览(28)
  • 基于Canal与Flink实现数据实时增量同步(一),计算机毕设源码要提交吗

    配置修改 修改conf/example/instance.properties,修改内容如下: canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = kms-1.apache.com:3306 #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.mq.topic

    2024年04月12日
    浏览(42)
  • Flink读取mysql数据库(java)

    代码如下: 运行结果如下:

    2024年02月12日
    浏览(24)
  • MySQL数据库实现主从同步

    安装MySQL数据库8.0.32 今天来学习数据库主从同步的原理及过程,数据库主要是用来存储WEB数据,在企业当中是极为重要的,下面一起来看下。 MySQL主从复制在中小企业,大型企业中广泛使用,MySQL主从复制的目的是实现数据库冗余备份,将master数据库数据定时同步到slave数据库

    2024年02月02日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包