使用FlinkCDC从mysql同步数据到ES,并实现数据检索

这篇具有很好参考价值的文章主要介绍了使用FlinkCDC从mysql同步数据到ES,并实现数据检索。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景

随着公司的业务量越来越大,查询需求越来越复杂,mysql已经不支持变化多样的复杂查询了。

于是,使用cdc捕获MySQL的数据变化,同步到ES中,进行数据的检索。

一、环境准备

1、创建ES索引

// 创建索引并指定映射
PUT /course
{
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "text"
			},
			"label": {
				"type": "text"
			},
			"content": {
			  "type": "text"
			}
		}
	}
}

// 查询course下所有数据(备用)
GET /course/_search
// 删除索引及数据(备用)
DELETE /course

2、创建mysql数据表

CREATE TABLE `course` (
  `id` varchar(32) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `label` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

二、使用FlinkCDC同步数据

1、导包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>1.18.0</version>
</dependency>



2、demo

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


/**
 * cdc
 */
public class CDCTest {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.56.10")
                .port(3306)
                .databaseList("mytest")
                .tableList("mytest.course")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启检查点
        env.enableCheckpointing(3000);

        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // 1个并行任务
            .setParallelism(1)
            .addSink(new RichSinkFunction<String>() {
                private final static ElasticSearchUtil es = new ElasticSearchUtil("192.168.56.10");
                @Override
                public void invoke(String value, Context context) throws Exception {
                    super.invoke(value, context);
                    JSONObject jsonObject = JSON.parseObject(value);
                    DataInfo dataInfo = new DataInfo();
                    dataInfo.setOp(jsonObject.getString("op"));
                    dataInfo.setBefore(jsonObject.getJSONObject("before"));
                    dataInfo.setAfter(jsonObject.getJSONObject("after"));
                    dataInfo.setDb(jsonObject.getJSONObject("source").getString("db"));
                    dataInfo.setTable(jsonObject.getJSONObject("source").getString("table"));

                    if (dataInfo.getDb().equals("mytest") && dataInfo.getTable().equals("course")) {

                        String id = dataInfo.getAfter().get("id").toString();
                        if(dataInfo.getOp().equals("d")) {
                            es.deleteById("course", id);
                        } else {
                            es.put(dataInfo.getAfter(), "course", id);
                        }
                    }
                }
            })

            .setParallelism(1); // 对接收器使用并行性1来保持消息顺序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import java.util.Map;

/**
 * 收集的数据类型
 * @author cuixiangfei
 * @since 20234-03-20
 */
public class DataInfo {

    // 操作 c是create;u是update;d是delete;r是read
    private String op;

    private String db;

    private String table;

    private Map<String, Object> before;

    private Map<String, Object> after;


    public String getOp() {
        return op;
    }

    public void setOp(String op) {
        this.op = op;
    }

    public String getDb() {
        return db;
    }

    public void setDb(String db) {
        this.db = db;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Map<String, Object> getBefore() {
        return before;
    }

    public void setBefore(Map<String, Object> before) {
        this.before = before;
    }

    public Map<String, Object> getAfter() {
        return after;
    }

    public void setAfter(Map<String, Object> after) {
        this.after = after;
    }

    public boolean checkOpt() {
        if (this.op.equals("r")) {
            return false;
        }
        return true;
    }

    @Override
    public String toString() {
        return "DataInfo{" +
                "op='" + op + '\'' +
                ", db='" + db + '\'' +
                ", table='" + table + '\'' +
                ", before=" + before +
                ", after=" + after +
                '}';
    }

    public static void main(String[] args) {
        String value = "{\"before\":{\"id\":\"333\",\"name\":\"333\",\"label\":\"333\",\"content\":\"3333\"},\"after\":{\"id\":\"333\",\"name\":\"33322\",\"label\":\"333\",\"content\":\"3333\"},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710923957000,\"snapshot\":\"false\",\"db\":\"mytest\",\"sequence\":null,\"table\":\"course\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000008\",\"pos\":1318,\"row\":0,\"thread\":9,\"query\":null},\"op\":\"u\",\"ts_ms\":1710923957825,\"transaction\":null}";
        JSONObject jsonObject = JSON.parseObject(value);

        System.out.println(jsonObject.get("op"));
        System.out.println(jsonObject.get("before"));
        System.out.println(jsonObject.get("after"));
        System.out.println(jsonObject.getJSONObject("source").get("db"));
        System.out.println(jsonObject.getJSONObject("source").get("table"));
    }
}

3、es工具类

springboot集成elasticSearch(附带工具类)

三、测试

1、先创建几条数据

INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('1', '11', '111', '1111');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('2', '22 33', '222 333', '2222 3333');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('3', '33 44', '33 444', '3333 4444');

2、启动cdc

3、查询es

cdc mysql es,大数据:Flink,mysql,elasticsearch,数据库文章来源地址https://www.toymoban.com/news/detail-850257.html

4、增删改几条数据进行测验

到了这里,关于使用FlinkCDC从mysql同步数据到ES,并实现数据检索的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ES大量数据条件检索准确性问题

    事例:如查询0~1000的结果集时,分页查询乱序,会搜索出来,也会搜索不出来,目前查询到的结果是分片不一致导致的,需要指定唯一分片查询 ES中基于分片的搜索方式,是分2个阶段进行的,即Query阶段和Fetch阶段。 ES的搜索类型有2种; query then fetch(默认的搜索方式) 基于

    2024年02月04日
    浏览(54)
  • 向量数据库入坑:传统文本检索方式的降维打击,使用 Faiss 实现向量语义检索

    在上一篇文章《聊聊来自元宇宙大厂 Meta 的相似度检索技术 Faiss》中,我们有聊到如何快速入门向量检索技术,借助 Meta AI(Facebook Research)出品的 faiss 实现“最基础的文本内容相似度检索工具”,初步接触到了“语义检索”这种对于传统文本检索方式具备“降维打击”的新

    2024年02月16日
    浏览(46)
  • 向量数据库:usearch的简单使用+实现图片检索应用

    usearch是快速开源搜索和聚类引擎×,用于C++、C、Python、JavaScript、Rust、Java、Objective-C、Swift、C#、GoLang和Wolfram 🔍中的向量和🔜字符串× 一个简单的例子(注:本例子在运行时向index中不断添加项目,并将最后的index持久化为一个文件,在运行时由于添加项目内存占用会不断增

    2024年02月02日
    浏览(49)
  • MySQL检索数据和排序数据

    目录 一、select语句 1.检索单个列(SELECT 列名 FROM 表名;) 2.检索多个列(SELECT 列名1,列名2,列名3  FROM 表名;)  3.检索所有的列(SELECT * FROM 表名;) 4.检索不同的行(SELECT 列名 FROM 表名;) 5.限制结果(SELECT 列名 FROM 表名 LIMIT 行数;) 6.使用完全限定的表名(SELECT 表名.列名 F

    2024年02月15日
    浏览(39)
  • Spring AI - 使用向量数据库实现检索式AI对话

     Spring AI 并不仅限于针对大语言模型对话API进行了统一封装,它还可以通过简单的方式实现LangChain的一些功能。本篇将带领读者实现一个简单的检索式AI对话接口。  在一些场景下,我们想让AI根据我们提供的数据进行回复。因为对话有最大Token的限制,因此很多场景下我们

    2024年04月14日
    浏览(49)
  • 深入学习MYSQL-数据检索

    前言 由于大部分基础知识都已经学过了,这里只把觉得应该记录一下的知识点做个笔记。然后以下笔记和sql均来自书籍(MYSQL必会知识),会根据看的其它书记继续调整和优化笔记。 LIMIT 注:这个平时的SQL查询没有什么区别,我主要展示一下在命令行里面怎么展示结果。 总共8条

    2024年02月05日
    浏览(32)
  • MySQL正则表达式检索数据

    目录 一、使用正则表达式进行基本字符匹配 1.使用regexp 2.使用正则表达式  .  二、进行OR匹配 1.为搜索两个串之一,使用   |   2.匹配几个字符之一[] 3.匹配范围  4.匹配特殊字符 过滤数据允许使用 匹配、比较、通配符 操作来寻找数据,但是随着过滤条件的复杂性增

    2024年02月14日
    浏览(34)
  • 【MySQL】一文带你了解检索数据

    🎬 博客主页:博主链接 🎥 本文由 M malloc 原创,首发于 CSDN🙉 🎄 学习专栏推荐:LeetCode刷题集! 🏅 欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📆 未来很长,值得我们全力奔赴更美好的生活✨ ------------------❤️分割线❤️------------------------- —————————

    2024年02月09日
    浏览(46)
  • 【MySQL多表查询】:让你的数据检索更高效

    前言 ✨ 欢迎来到小K的MySQL专栏,本节将为大家带来 MySQL 中多表查询相关知识的讲解 一、多表关系 ✨项目开发中,在进行数据库表结构设计时,会根据业务需求及业务模块之间的关系,分析并设计表结构,由于业务之间相互关联,所以各个表结构之间也存在着各种联系,基

    2024年02月09日
    浏览(35)
  • 这些年Web前端面试的那些套路,优化后,ES-做到了几十亿数据检索-3-秒返回,前端音频框架

    默认情况下 routing参数是文档ID (murmurhash3),可通过 URL中的 _routing 参数指定数据分布在同一个分片中,index和search的时候都需要一致才能找到数据。 如果能明确根据_routing进行数据分区,则可减少分片的检索工作,以提高性能 。 在我们的案例中,查询字段都是固定的,不提供全

    2024年04月26日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包