Flink CDC系列之:TiDB CDC 导入 Elasticsearch

这篇具有很好参考价值的文章主要介绍了Flink CDC系列之:TiDB CDC 导入 Elasticsearch。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、通过docker 来启动 TiDB 集群

git clone https://github.com/pingcap/tidb-docker-compose.git

替换目录 tidb-docker-compose 里面的 docker-compose.yml 文件,内容如下所示:

version: "2.1"

services:
  pd:
    image: pingcap/pd:v5.3.1
    ports:
      - "2379:2379"
    volumes:
      - ./config/pd.toml:/pd.toml
      - ./logs:/logs
    command:
      - --client-urls=http://0.0.0.0:2379
      - --peer-urls=http://0.0.0.0:2380
      - --advertise-client-urls=http://pd:2379
      - --advertise-peer-urls=http://pd:2380
      - --initial-cluster=pd=http://pd:2380
      - --data-dir=/data/pd
      - --config=/pd.toml
      - --log-file=/logs/pd.log
    restart: on-failure

  tikv:
    image: pingcap/tikv:v5.3.1
    ports:
      - "20160:20160"
    volumes:
      - ./config/tikv.toml:/tikv.toml 
      - ./logs:/logs           
    command:
      - --addr=0.0.0.0:20160
      - --advertise-addr=tikv:20160
      - --data-dir=/data/tikv
      - --pd=pd:2379
      - --config=/tikv.toml
      - --log-file=/logs/tikv.log
    depends_on:
      - "pd"
    restart: on-failure

  tidb:
    image: pingcap/tidb:v5.3.1
    ports:
      - "4000:4000"
    volumes:
      - ./config/tidb.toml:/tidb.toml
      - ./logs:/logs
    command:
      - --store=tikv
      - --path=pd:2379
      - --config=/tidb.toml
      - --log-file=/logs/tidb.log
      - --advertise-address=tidb
    depends_on:
      - "tikv"
    restart: on-failure
    
  elasticsearch:
     image: elastic/elasticsearch:7.6.0
     container_name: elasticsearch
     environment:
       - cluster.name=docker-cluster
       - bootstrap.memory_lock=true
       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
       - discovery.type=single-node
     ports:
       - "9200:9200"
       - "9300:9300"
     ulimits:
       memlock:
         soft: -1
         hard: -1
       nofile:
         soft: 65536
         hard: 65536
         
  kibana:
     image: elastic/kibana:7.6.0
     container_name: kibana
     ports:
       - "5601:5601"
     volumes:
       - /var/run/docker.sock:/var/run/docker.sock

该 Docker Compose 中包含的容器有:

  • TiDB 集群: tikv、pd、tidb。
  • Elasticsearch:orders 表将和 products 表进行 join,join 的结果写入 Elasticsearch 中。
  • Kibana:可视化 Elasticsearch 中的数据。

本机添加 host 映射 pd 和 tikv 映射 127.0.0.1。 在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:

docker-compose up -d
mysql -h 127.0.0.1 -P 4000 -u root # Just test tidb cluster is ready,if you have install mysql local.

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

另外可以通过如下命令停止并删除所有的容器:

docker-compose down

二、下载 Flink 和所需要的依赖包

下载 Flink 1.17.1 并将其解压至目录 flink-1.17.1

https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

下载下面列出的依赖包,并将它们放到目录 flink-1.17.1/lib/ 下:

  • flink-connector-tidb-cdc-2.4.1.jar
  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

三、在TiDB数据库中创建表和准备数据

创建数据库和表 products,orders,并插入数据:

-- TiDB
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
                         id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                         name VARCHAR(255) NOT NULL,
                         description VARCHAR(512)
) AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
      (default,"car battery","12V car battery"),
      (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
      (default,"hammer","12oz carpenter's hammer"),
      (default,"hammer","14oz carpenter's hammer"),
      (default,"hammer","16oz carpenter's hammer"),
      (default,"rocks","box of assorted rocks"),
      (default,"jacket","water resistent black wind breaker"),
      (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
                       order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                       order_date DATETIME NOT NULL,
                       customer_name VARCHAR(255) NOT NULL,
                       price DECIMAL(10, 5) NOT NULL,
                       product_id INTEGER NOT NULL,
                       order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
      (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
      (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

四、启动Flink 集群,再启动 SQL CLI

使用下面的命令跳转至 Flink 目录下

cd flink-1.17.1

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
Flink CDC系列之:TiDB CDC 导入 Elasticsearch,日常分享专栏,Flink CDC系列,TiDB CDC,Elasticsearch
使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:

Flink CDC系列之:TiDB CDC 导入 Elasticsearch,日常分享专栏,Flink CDC系列,TiDB CDC,Elasticsearch

五、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '127.0.0.1:2379',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(3),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '127.0.0.1:2379',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date DATE,
   customer_name STRING,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders_1'
 );

将关联后的数据插入到ElasticSearch

Flink SQL> INSERT INTO enriched_orders
  SELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.description
  FROM orders AS o
  LEFT JOIN products AS p ON o.product_id = p.id;

六、Kibana查看ElasticSearch数据

检查最终的结果是否写入 ElasticSearch 中,可以在 Kibana 看到 ElasticSearch 中的数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

Flink CDC系列之:TiDB CDC 导入 Elasticsearch,日常分享专栏,Flink CDC系列,TiDB CDC,Elasticsearch
然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.
Flink CDC系列之:TiDB CDC 导入 Elasticsearch,日常分享专栏,Flink CDC系列,TiDB CDC,Elasticsearch

七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

通过如下的 SQL 语句对 TiDB 数据库进行一些修改,然后就可以看到每执行一条 SQL 语句,Elasticsearch 中的数据都会实时更新。文章来源地址https://www.toymoban.com/news/detail-655800.html

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

UPDATE orders SET order_status = true WHERE order_id = 10004;

DELETE FROM orders WHERE order_id = 10004;

到了这里,关于Flink CDC系列之:TiDB CDC 导入 Elasticsearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC系列之:Oracle CDC Connector

    2023年08月23日
    浏览(49)
  • 第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步

    Flink作为当前流行的流式计算框架,在对接StarRocks时,若直接使用JDBC的方式“流式”写入数据,对StarRocks是不友好的,StarRocks作为一款MVCC的数据库,其导入的核心思想还是“攒微批+降频率”。为此,StarRocks单独开发了flink-connector-starrocks,其内部实现仍是通过对数据缓存攒批

    2023年04月15日
    浏览(77)
  • flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(80)
  • 最新版Flink CDC MySQL同步Elasticsearch(一)

    首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客 注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写

    2024年02月13日
    浏览(35)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(43)
  • Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下 授权链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 基于jdk1.8 + springboot2.7.x + elasticsearch7.x 到此就大功告成啦!代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/

    2024年02月16日
    浏览(39)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(43)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(47)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(71)
  • TiDB实战篇-TiDB Lightning 导入数据

    使用TiDB Lightning 导入数据。 它是使用物理导入的模式,将SQL文件直接导入到TiKV中,它是一种初始化的导入,也就是说目标的数据库和表都是不能够存在的(注意事项,在这种方式导入的时候TiKV要切换到导入模式才行) 。      先导入数据,然后在导入索性。   Logical Impo

    2024年02月04日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包