FLink多表关联实时同步

这篇具有很好参考价值的文章主要介绍了FLink多表关联实时同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文目标

Oracle->Debezium->Kafka->Flink->PostgreSQL
Flink消费Kafka中客户、产品、订单(ID)三张表的数据合并为一张订单(NAME)表。

前置环境

Oracle内创建三张表

-- 客户信息表
CREATE TABLE "YINYX"."T_CUST" (
  "CUST_ID" NUMBER(9,0) VISIBLE NOT NULL,
  "CUST_NAME" VARCHAR2(32 BYTE) VISIBLE
);
ALTER TABLE "YINYX"."T_CUST" ADD CONSTRAINT "SYS_C007568" PRIMARY KEY ("CUST_ID");

-- 产品信息表
CREATE TABLE "YINYX"."T_PROD" (
  "PROD_ID" NUMBER(9,0) VISIBLE NOT NULL,
  "PROD_NAME" VARCHAR2(32 BYTE) VISIBLE
);
ALTER TABLE "YINYX"."T_PROD" ADD CONSTRAINT "SYS_C007569" PRIMARY KEY ("PROD_ID");

-- 订单信息表
CREATE TABLE "YINYX"."T_ORDER" (
  "ORDER_ID" NUMBER(9,0) VISIBLE NOT NULL,
  "CUST_ID" NUMBER(9,0) VISIBLE,
  "PROD_ID" NUMBER(9,0) VISIBLE,
  "AMOUNT" NUMBER(9,0) VISIBLE
);
ALTER TABLE "YINYX"."T_ORDER" ADD CONSTRAINT "SYS_C007570" PRIMARY KEY ("ORDER_ID");

PostgreSQL内创建一张表

CREATE TABLE "public"."t_order_out" (
  "order_id" int8 NOT NULL,
  "cust_name" varchar(50) COLLATE "pg_catalog"."default",
  "prod_name" varchar(50) COLLATE "pg_catalog"."default",
  "amount" int8
);
ALTER TABLE "public"."t_order_out" ADD CONSTRAINT "t_order_out_pkey" PRIMARY KEY ("order_id");

其他前置环境

Oracle、PostgreSQL、Kafka、FLink、Debezium-Server的部署参见本系列其他文章搭建。

Oracle内建表

采用前置条件中的语句建表即可,如果遇到日志抓取问题,可以用如下语句停止、启动、删除xstream出站服务器

sqlplus sys/manager@hostyyx:1521/ORCLCDB as sysdba
BEGIN
  DBMS_APPLY_ADM.START_APPLY(apply_name => 'xstrmout');
END;
/
BEGIN
  DBMS_APPLY_ADM.STOP_APPLY(apply_name => 'xstrmout');
END;
/
BEGIN
  DBMS_APPLY_ADM.DROP_APPLY(apply_name => 'xstrmout');
END;
/

PostgreSQL内建表

采用前置条件中的语句建表即可。

启动Kafka

./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.YINYX.T_CUST

启动Debezium-Server

[yinyx@hostyyx debezium-server]$ cat conf/application.properties
quarkus.http.port=8999
rkus.log.level=INFO
quarkus.log.console.json=false

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.offset.storage.file.filename=data/ora_offsets.dat
debezium.source.offset.flush.interval.ms=0

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=1521
debezium.source.database.user=c##xstrm
debezium.source.database.password=xstrm
debezium.source.database.dbname=ORCLCDB
debezium.source.database.pdb.name=ORCLPDB1
debezium.source.database.connection.adapter=xstream
debezium.source.database.out.server.name=xstrmout
#debezium.source.snapshot.mode=schema_only
debezium.source.snapshot.mode=initial
debezium.source.schema.include.list=YINYX
debezium.source.table.include.list=YINYX.T1,YINYX.T_CUST,YINYX.T_PROD,YINYX.T_ORDER
debezium.source.log.mining.strategy=online_catalog
debezium.source.topic.prefix=yyx
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.source.schema.history.internal.kafka.bootstrap.servers=127.0.0.1:9092
debezium.source.schema.history.internal.kafka.topic=ora_schema_history

debezium.source.decimal.handling.mode=string
debezium.source.lob.enabled=true
debezium.source.database.history.skip.unparseable.ddl=true
debezium.source.tombstones.on.delete=false

debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer

debezium.format.key.schemas.enable=false
debezium.format.value.schemas.enable=false

[yinyx@hostyyx debezium-server]$ 

./run.sh

启动FLink

./start-cluster.sh
./sql-client.sh

FLink内创建任务

-- Kafka内T_CUST客户信息表变更信息映射表
CREATE TABLE tcust_kafka (
 CUST_ID BIGINT NOT NULL,
 CUST_NAME STRING NULL,
 PRIMARY KEY(CUST_ID) NOT ENFORCED
) WITH (
 'connector' = 'kafka',
 'topic' = 'yyx.YINYX.T_CUST',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'false',
 'properties.group.id' = 'gyyx',
 'format' = 'debezium-json'
 );
 -- Kafka内T_PROD产品信息表变更信息映射表
 CREATE TABLE tprod_kafka (
 PROD_ID BIGINT NOT NULL,
 PROD_NAME STRING NULL,
 PRIMARY KEY(PROD_ID) NOT ENFORCED
) WITH (
 'connector' = 'kafka',
 'topic' = 'yyx.YINYX.T_PROD',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'false',
 'properties.group.id' = 'gyyx',
 'format' = 'debezium-json'
 );
 -- Kafka内T_ORDER订单信息表变更信息映射表
 CREATE TABLE torder_kafka (
 ORDER_ID BIGINT NOT NULL,
 CUST_ID BIGINT NULL,
 PROD_ID BIGINT NULL,
 AMOUNT BIGINT NULL,
 PRIMARY KEY(ORDER_ID) NOT ENFORCED
) WITH (
 'connector' = 'kafka',
 'topic' = 'yyx.YINYX.T_ORDER',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'false',
 'properties.group.id' = 'gyyx',
 'format' = 'debezium-json'
 );
 -- 客户信息表在PG库内的同步测试,本案例没有用到
 CREATE TABLE tcust_pg (
 cust_id BIGINT NOT NULL,
 cust_name STRING NULL,
 PRIMARY KEY(cust_id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://127.0.0.1:6432/test',
 'username' = 'test',
 'password' = 'test',
 'driver' = 'org.postgresql.Driver',
 'table-name' = 't_cust'
);
-- 同步测试,用于验证基本同步通道是否正确
insert into tcust_pg(cust_id, cust_name) select CUST_ID, CUST_NAME from tcust_kafka;
-- PG库内的转换目标表,本案例的输出结果
 CREATE TABLE torderout_pg (
 order_id BIGINT NOT NULL,
 cust_name STRING NULL,
 prod_name STRING NULL,
 amount BIGINT NULL,
 PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://127.0.0.1:6432/test',
 'username' = 'test',
 'password' = 'test',
 'driver' = 'org.postgresql.Driver',
 'table-name' = 't_order_out'
);
-- 提交转换任务给FLink,三个Kafka数据源关联同步到到一个PG目标表
insert into torderout_pg(order_id, cust_name, prod_name, amount) 
select o.ORDER_ID, c.CUST_NAME, p.PROD_NAME, o.AMOUNT, o.CUST_ID, o.PROD_ID from torder_kafka o 
inner join tcust_kafka c on o.CUST_ID=c.CUST_ID 
inner join tprod_kafka p on o.PROD_ID=p.PROD_ID;

相关功能验证

Oracle是否正常

用SQL语句检查表是否都正常

PostgreSQL是否正常

用SQL语句检查表是否都正常

debezium是否正常

启动后出现版本信息并不能立刻抓到日志,还需要等1到5分钟才开始抓,正常在Oracle内修改后2秒左后可以抓到日志
2022-12-29 22:28:26,568 INFO  [io.deb.con.ora.OracleConnection] (debezium-oracleconnector-yyx-change-event-source-coordinator) Database Version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production
Version 19.3.0.0.0
2022-12-29 22:35:52,051 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:07:32.967, last recorded offset of {server=yyx} partition is {transaction_id=null, lcr_position=000000000032b3460000000100000001000000000032b345000000010000000102, snapshot_scn=3086106}

Kafka是否正常

[yinyx@hostyyx bin]$ ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
__consumer_offsets
logminer_schema_history
ora_schema_history
yyx
yyx.YINYX.T1
yyx.YINYX.T_CUST
yyx.YINYX.T_ORDER
yyx.YINYX.T_PROD
[yinyx@hostyyx bin]$ 
[yinyx@hostyyx bin]$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.YINYX.T_CUST
{"before":{"CUST_ID":1,"CUST_NAME":"zhangsan3"},"after":{"CUST_ID":1,"CUST_NAME":"zhangsan4"},"source":{"version":"2.0.0.Final","connector":"oracle","name":"yyx","ts_ms":1672327040000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"schema":"YINYX","table":"T_CUST","txId":"3.20.832","scn":"3333134","commit_scn":null,"lcr_position":"000000000032dc0f0000000100000001000000000032dc0e000000010000000102","rs_id":null,"ssn":0,"redo_thread":null,"user_name":null},"op":"u","ts_ms":1672327045610,"transaction":null}
{"before":{"CUST_ID":1,"CUST_NAME":"zhangsan4"},"after":{"CUST_ID":1,"CUST_NAME":"zhangsan"},"source":{"version":"2.0.0.Final","connector":"oracle","name":"yyx","ts_ms":1672361625000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"schema":"YINYX","table":"T_CUST","txId":"3.9.833","scn":"3345527","commit_scn":null,"lcr_position":"0000000000330c7800000001000000010000000000330c77000000010000000102","rs_id":null,"ssn":0,"redo_thread":null,"user_name":null},"op":"u","ts_ms":1672361629456,"transaction":null}

FLink是否正常

http://192.168.200.143:8081/#/job/running
可以看到正常运行的任务
FLink多表关联实时同步

本文目标效果验证

对Oracle内的T_ORDER表做增删改操作,ID信息需要在T_CUST、T_PROD表内有值,在2、3秒后查看PostgreSQL目标表内t_order_out表会同步更新。

Oracle内T_CUST表

FLink多表关联实时同步

Oracle内T_PROD表

FLink多表关联实时同步

Oracle内T_ORDER表

FLink多表关联实时同步

PostgreSQL表内t_order_out表

FLink多表关联实时同步
10da4a7c-90d5-4804-885c-731805ea2792文章来源地址https://www.toymoban.com/news/detail-424278.html

到了这里,关于FLink多表关联实时同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink CDC 的实时同步系统

    摘要: 本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分: 功能概述 架构设计 技术挑战 生产实践 Tips: 点击 「阅读原文」 查看原文视频演讲 ppt 科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等

    2024年02月05日
    浏览(48)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(56)
  • flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(46)
  • Flink实时同步MySQL与Doris数据

    技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区 1. Flink环境: https://flink.apache.org/zh/ 下载flink-1.15.1 解压,修改配置 修改配置 修改rest.bind-address为 0.0.0.0 下载依赖jar包 至 flink安装目录lib下 启动flink 访问WebUI http://192.168.0.158:8081 2、

    2024年02月13日
    浏览(45)
  • Flink实时数仓同步:拉链表实战详解

    在大数据领域,业务数据通常最初存储在关系型数据库,例如MySQL。然而,为了满足日常分析和报表等需求,大数据平台会采用多种不同的存储方式来容纳这些业务数据。这些存储方式包括离线仓库、实时仓库等,根据不同的业务需求和数据特性进行选择。 举例来说,假设业

    2024年01月20日
    浏览(57)
  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(69)
  • 基于Canal与Flink实现数据实时增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    浏览(56)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • flink sqlserver cdc实时同步(含sqlserver安装配置等)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md 如果要使用flink cdc做sqlserver的实时同步,需要满足以下条件: 需要安装SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持) ; 需要开启SQL Server代理; 启用CDC功能。 ok,接下来

    2024年02月08日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包