通过kafka connector实现mysql数据自动同步es

这篇具有很好参考价值的文章主要介绍了通过kafka connector实现mysql数据自动同步es。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

整体思路:

1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列

2、通过listener监听消息队列,代码控制数据插入es

ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后再来记录

一、连接器的下载与配置

下载debezium mysql connector

在kafka中建立connect文件夹,并解压连接器

在kafka/config下的connect-distributed.properties文件中,修改plugin.path=连接器地址

启动连接器:

bin/connect-distributed.sh -daemon config/connect-distributed.properties

postman查询连接器是否配置成功

http://localhost:8083/connector-plugins

如果返回连接器,则表示配置成功

[
   
    {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "2.1.2.Final"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "3.3.2"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "3.3.2"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "3.3.2"
    }
]

二、创建同步连接器实例

post请求地址:

http://localhost:8083/connectors

请求体:

{
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "127.0.0.1", // 数据库ip
        "database.port": "3306", 
        "database.user": "root", // 数据库登陆用户名
        "database.password": "123456", // 登陆密码
        "database.server.id": "2", 
        "database.server.name": "hc",
        "database.include.list": "store", // 需要同步的库
        "table.include.list": "store.product", // 需要同步的表
        "database.history.kafka.bootstrap.servers": "localhost:9092", // kafka地址
        "database.history.kafka.topic": "schema-changes-inventory", 
        "topic.prefix": "pro",
        "include.schema.changes": "true",
        "transforms": "unwrap,Cast",
        "transforms.Cast.type": 
        "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.Cast.spec": "amount:float64,unit_price:float64",
        "transforms.unwrap.type": 
        "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}

查看是否建立成功:

get请求:

http://localhost:8083/connectors

返回结果:

[
    "mysql-connector"
]

三、代码里监听消息

@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class HcCustomerListener {

    private final EsSearchService esSearchService;

    private final IEsRepository esRepository;

    private final String INDEX = "product";

    /**
     * 监听产品表
     * @param record
     */
    @KafkaListener(topics = "test.store.product")
    public void onMessage(ConsumerRecord<String, String> record) {

        String kafkaMessage = record.value();
        if (StrUtil.isBlank(kafkaMessage)) {
            return;
        }

        // 检查索引是否存在,没有则新建
        if (!esRepository.checkIndex(INDEX)) {
            if (!esRepository.createIndex(INDEX)) {
                log.error("建立索引失败!索引名:" + INDEX);
            }
        }

        // 数据转换为要存储的对象
        Product item = JSONObject.toJavaObject(JSONObject.parseObject(kafkaMessage), Product.class);

        // 数据同步
        if (!esRepository.dataSync(Product, INDEX, QueryBuilders.termQuery("code", Product.getCode()))) {
            log.error("产品信息同步es失败!产品编号:" + customer.getCode());
        }
    }
}

ps:关于数据存储es,数据查询es的具体方法,会写一篇专门的文章记录

中间也遇到了一些坑,比如建connector的时候一直报错缺少什么值,最后把jdk1.8改到jdk17就好了

比如同步数据一直报数据转换错误,才发现bigdecimal类型的字段需要在建connector时显示的去做转换:"transforms.Cast.spec": "unit_price:float64,amount:float64"这样就行文章来源地址https://www.toymoban.com/news/detail-721507.html

到了这里,关于通过kafka connector实现mysql数据自动同步es的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【kafka】JDBC connector进行表数据增量同步过程中的源表与目标表时间不一致问题解决...

    〇、参考资料 时间不一致,差了8个小时 (1)source (2)sink 即sink和source都加  \\\"db.timezone\\\": \\\"Asia/Shanghai\\\", 并需要保持一直

    2024年02月11日
    浏览(39)
  • 通过零代码ETLCloud实现金蝶云星空数据自动化同步

    金蝶云星空是一款基于云计算架构打造的全面财务管理软件,旨在为企业提供全方位、一站式的财务解决方案。其功能包括 财务核算、现金管理、应付应收管理、成本核算、固定资产管理、税务管理等,覆盖了财务管理的各个方面,可以帮助企业提高财务管理效率,降低财务

    2024年02月09日
    浏览(59)
  • 通过logstash实现mysql与es的双向数据同步

    参考题目 一种基于MySQL和Elasticsearch的数据同步方法及系统 基于MySQL和Elasticsearch的数据同步方法 一种基于MySQL和Elasticsearch的数据同步系统 基于MySQL和Elasticsearch的数据同步技术 目录 1【理论调研】 方案1:使用Logstash实现数据同步 方案2:使用Canal实现数据同步 方案3:使用Debe

    2024年02月15日
    浏览(34)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)

    MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改) 把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行

    2024年02月15日
    浏览(41)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • Debezium同步Mysql数据到Kafka

    Kafka:3.3.2 mysql-connector:1.8.1 (0)前提是安装好mysql,开启binlog (1)下载kafka (2)下载mysql-connector插件 (3)编辑配置文件 (4)启动kafka自带的zk (5)启动kafka (6)启动connect (7)调用api 注意:当成功调用api,创建此连接器后会有如下主题产生:dbhistory.inventory、mysql1、

    2024年02月10日
    浏览(42)
  • cancel框架同步mysql数据到kafka

    1、下载cancel 2、修改conf文件夹下的canal.properties配置文件 3、修改conf/example文件夹下的instance.properties配置文件 在sql查询show binary logs语句得到binlog日志 4、启动 在bin目录下执行 启动程序 注:MySQL需要创建新用户

    2024年02月15日
    浏览(55)
  • 通过ETLCloud自动化数据处理:用友U8数据一键同步

    用友U8是一款成熟的企业管理软件,是一套适用于企业全面管理的ERP(Enterprise Resource Planning)软件。主要用于管理企业的财务、人力资源、供应链、生产制造等业务。它具有模块化设计和高度可定制化的特点,可以根据企业的实际需求进行配置和部署。同时,用友U8还提供了

    2024年02月09日
    浏览(41)
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如 MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。 本文中我们将采用 Debezium 与 Kafka 组合的方式来实现从 MySQL 到 DolphinDB 的数

    2024年02月02日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包