基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

这篇具有很好参考价值的文章主要介绍了基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2;wsl 1会出现各种各样的问题】

软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0

一、PostgreSQL作为数据来源(source),由flink读取

1.postgre安装与配置

已有postgre的跳过此步

(1)pg安装

https://zhuanlan.zhihu.com/p/143156636

sudo apt install postgresql
sudo -u postgres psql -c "SELECT version();"
sudo -u postgres psql # 连接进入postgre shell(以管理员用户)

(2)pg配置

# 创建新用户和数据库
sudo su - postgres -c "createuser domeya"
sudo su - postgres -c "createdb domeya_db"

sudo -u postgres psql # 进入psql(管理员用户postgres)
grant all privileges on database domeya_db to domeya; # 授权用户操作数据库
\password # 设置当前用户密码(\password domeya,可设置用户domeya的密码)
\q # 退出psql

# psql postgres://username:password@host:port/dbname  
psql postgres://domeya:123@localhost:5432/domeya_db # 新用户测试连接

可能出现的问题

sudo -u postgres psql报错:

psql: error: connection to server on socket “/var/run/postgresql/.s.PGSQL.5432” failed: No such file or directory
Is the server running locally and accepting connections on that socket?

https://stackoverflow.com/questions/69639250/pgconnectionbad-connection-to-server-on-socket-var-run-postgresql-s-pgsql

https://blog.csdn.net/psiitoy/article/details/7310003

【解决关键】:重启pg服务

# 重启
sudo service postgresql restart # 重要!
ps -ef | grep postgres

(可选)重装pg

# 卸载
dpkg --list | grep postgresql
dpkg --purge postgresql postgresql-14 postgresql-client-14 postgresql-client-common postgresql-common # 根据dpkg --list | grep postgresql中展示的结果进行填写
# rm -rf /var/lib/postgresql/
# 重装
sudo apt install postgresql

2.flink安装与配置

已有flink的跳过此步

flink安装,配置环境变量

# https://flink.apache.org/downloads/
curl -O -L https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt

sudo vim /etc/profile.d/flink.sh
# flink.sh
export FLINK_HOME=/opt/flink-1.17.1
export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

如果webUI无法外机访问把rest.bind-address: 0.0.0.0这个设置放开权限即可

cd $FLINK_HOME/conf
cp flink-conf.yaml flink-conf.yaml.backup

vim flink-conf.yaml

# 修改以下设置
rest.bind-address: 0.0.0.0

启动flink

cd $FLINK_HOME
./bin/start-cluster.sh # 启动flink
jps # 查看是否启动StandaloneSessionClusterEntrypoint, TaskManagerRunner

# ./bin/stop-cluster.sh # 关闭flink

3.flink cdc postgre配置

3.1 postgre配置(for flink cdc)

https://www.cnblogs.com/xiongmozhou/p/14817641.html

(1)修改配置文件

cd /etc/postgresql/14/main
cp postgresql.conf postgresql.conf.backup
vim postgresql.conf

postgresql.conf修改几个关键配置如下:

# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

修改完之后重启postgresql,service postgresql restart

(2)赋予权限

以管理员进入psql,sudo -u postgres psql

(可选)如果没有测试表,可以新建一个

-- 如果没有测试表,可创建一个
CREATE TABLE test_table1(
   id varchar(8),
   p_dt varchar(8)
);
-- 查看表
\d

insert into test_table1 values('1', '20230820');
select * from test_table1;

赋予普通用户复制流权限、发布表、更改表的复制标识包含更新和删除的值

-- 给用户复制流权限
ALTER ROLE domeya replication;
-- 查看权限
\du


\c domeya_db -- 重要:进入到domeya_db数据库(以管理员账号进入)
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布(包括以后新建的表);
-- 注意,此处PUBLICATION名字必须为dbz_publication,否则后续flink sql报错must be superuser to create FOR ALL TABLES publication
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;

-- 更改复制标识包含更新和删除之前值
ALTER TABLE test_table1 REPLICA IDENTITY FULL; -- 对应前面创建的测试表
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='test_table1'; -- 对应前面创建的测试表

-- 退出
\q

wal_level = logical源表的数据修改时,默认的逻辑复制流只包含历史记录的primary key,如果需要输出更新记录的历史记录的所有字段,需要在表级别修改参数:ALTER TABLE tableName REPLICA IDENTITY FULL; 这样才能捕获到源表所有字段更新后的值

发布所有表可能太多,也可以创建publication,添加指定表到publication。

update pg_publication set puballtables=false where pubname is not null; -- 默认发布所有表为false
CREATE PUBLICATION flink_cdc_publication;
alter publication flink_cdc_publication add table test_table1;
select * from pg_publication;
select * from pg_publication_tables;

3.2 flink cdc postgres的jar包下载

下载flink cdc postgres相关jar包,放在$FLINK_HOME/lib

cd $FLINK_HOME/lib

# 以下用于flink cdc postgres连接
# 注意:用于flink sql的jar包是flink-sql-connector-postgres-cdc,不是flink-connector-postgres-cdc
# https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/2.4.0
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4.0/flink-sql-connector-postgres-cdc-2.4.0.jar

如果flink在运行状态,需要重启flink,之后再启动flink sql client

cd $FLINK_HOME
./bin/stop-cluster.sh
./bin/start-cluster.sh

4.flink cdc postgre测试

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html【官方文档demo】

启动flink sql client(之前重启了flink cluster)

cd $FLINK_HOME
./bin/sql-client.sh

在flink sql client创建表,与pg中的表结构对应,表名字可以不同

CREATE TABLE source_table (
    id STRING,
    p_dt STRING
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'domeya',
    'password' = '123',
    'database-name' = 'domeya_db',
    'schema-name' = 'public',
    'table-name' = 'test_table1',
    'slot.name' = 'flink',
    -- experimental feature: incremental snapshot (default off)
    -- 'scan.incremental.snapshot.enabled' = 'true'
    'decoding.plugin.name' = 'pgoutput' -- 必须加,否则报错could not access file "decoderbufs"
);

select * from source_table;

可能出现的问题

运行select * from source_table;时报错

报错1:

[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory

https://github.com/ververica/flink-cdc-connectors/issues/37

table sql加:WITH('decoding.plugin.name' = 'pgoutput')【flink sql】

dataStream加:PostgreSQLSource.<String>builder().decodingPluginName("pgoutput").build()

报错2:

[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: must be superuser to create FOR ALL TABLES publication

https://gist.github.com/alexhwoods/4c4c90d83db3c47d9303cb734135130d

检查之前的操作(psql):

CREATE PUBLICATION dbz_publication FOR ALL TABLES;
select * from pg_publication_tables;

报错3:

Caused by: org.postgresql.util.PSQLException: ERROR: replication slot “flink” already exists

https://zhuanlan.zhihu.com/p/449066277

当上面debezium.slot.name的值超过20个,就会报错,即使之前的job已经下线,这个slot文件依旧在,此时需要执行下面语句并删除slot即可:

psql:

-- https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION
SELECT slot_name, slot_type, active FROM pg_replication_slots;
SELECT pg_drop_replication_slot('flink'); # 这个和之前flink sql中的'slot.name' = 'flink'对应

注意,flink postgres-cdc只能读(作为source),不能写(作为sink)

Flink SQL> insert into source_table values('3', '20230820');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'postgres-cdc' can only be used as a source. It cannot be used as a sink.

二、Tidb作为数据去向(sink),由flink写入

1.tidb安装与配置

已有tidb的跳过此步

https://docs.pingcap.com/zh/tidb/stable/quick-start-with-tidb

su xxx # 切换到你的普通用户
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
source /home/xxx/.bashrc # 按上个命令输出的路径来,上面显示的是Shell profile:  /home/xxx/.bashrc

tiup playground # 下载镜像,并启动某个版本的集群
  • 以这种方式执行的 playground,在结束部署测试后 TiUP 会清理掉原集群数据,重新执行该命令后会得到一个全新的集群。
  • 若希望持久化数据,可以执行 TiUP 的 --tag 参数:tiup --tag <your-tag> playground ...

下载完毕,启动成功之后展示信息:

Connect TiDB: mysql --comments --host 127.0.0.1 --port 4000 -u root
TiDB Dashboard: http://127.0.0.1:2379/dashboard
Grafana: http://127.0.0.1:3000

连接tidb

# 使用mysql client连接tidb
sudo apt install mysql-client
mysql --comments --host 127.0.0.1 --port 4000 -u root

# 设置root密码 
# https://docs.pingcap.com/zh/tidb/stable/user-account-management#%E8%AE%BE%E7%BD%AE%E5%AF%86%E7%A0%81
# https://blog.csdn.net/qq_45675449/article/details/106866700
SET PASSWORD FOR 'root'@'%' = '123'; # root的localhost是%,可通过 select user,host from mysql.user; 查看

exit;

# mysql -uroot -p无法连接,必须加上port和host,并且host不能写成localhost
# https://blog.csdn.net/hjf161105/article/details/78850658
mysql -uroot --port 4000 -h 127.0.0.1 -p

2.flink cdc tidb的jar包下载

下载用于jdbc mysql连接的jar包,用于flink cdc tidb连接。

特别注意:Tidb的sink模式得用jdbc+mysql连接,不用官方提供的tidb cdc因为其不能作为sink,只能曲线救国参考这种方法了。

cd $FLINK_HOME/lib
# 以下用于jdbc mysql(用于flink cdc tidb连接)
# https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc/3.1.1-1.17
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.1.0/mysql-connector-j-8.1.0.jar

如果flink在运行状态,需要重启flink,之后再启动flink sql client

cd $FLINK_HOME
./bin/stop-cluster.sh
./bin/start-cluster.sh

3.flink cdc tidb测试

基于Flink CDC实时同步数据(MySQL到MySQL)

flink cdc tidb 官方文档demo(无法作为sink,只能作为source)

(可选)tidb创建测试表

# mysql -uroot --port 4000 -h 127.0.0.1 -p

# 创建测试表
CREATE TABLE test.test_table1(
   id varchar(8),
   p_dt varchar(8)
);
insert into test.test_table1 values('3', '20230819');

启动flink sql client(之前重启了flink cluster)

cd $FLINK_HOME
./bin/sql-client.sh

flink sql连接tidb,仿照mysql的连接

-- checkpoint every 3000 milliseconds                       
SET 'execution.checkpointing.interval' = '3s';

-- register a TiDB table in Flink SQL
CREATE TABLE sink_table (
    id STRING,
    p_dt STRING,
    PRIMARY KEY(id) NOT ENFORCED 
    -- 必须写PRIMARY KEY,否则报错:[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:4000/test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = '123',
    'table-name' = 'test_table1'
);
  
-- read snapshot and binlogs from table
SELECT * FROM sink_table;

三、用Flink SQL Client同步PostgreSQL到Tidb

# 将会提交一个作业,进行source_table->sink_table的单向同步
insert into sink_table select * from source_table;

[INFO] Submitting SQL update statement to the cluster…
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8e47bfa3ea78da4c47b395f7517c2812

在flink web ui上可以看到作业运行状态。

只要这个作业是正常runnning,那么对source_table的任何修改都会同步到sink_table。注意这种是单向同步,source_table的变动(增/删/改)会同步到sink_table,但反过来sink_table的变动不会影响到source_table(不会触发source_table->sink_table的同步)。文章来源地址https://www.toymoban.com/news/detail-668737.html

到了这里,关于基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(20)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(16)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 开启 log archiving (1).启用 log archiving         a:以DBA用户连接数据库    

    2024年02月11日
    浏览(15)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql(无主键)

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 具体环境设置和maven依赖请看上篇:Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 现在操作的是源表和目标表都无主键数

    2024年02月15日
    浏览(14)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(12)
  • 【实战-01】flink cdc 实时数据同步利器

    【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(37)
  • flink oracle cdc实时同步(超详细)

    flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(12)
  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(17)
  • FLINK CDC postgresql (Stream与SQL)

    FLINK CDC postgresql (Stream与SQL)

    Postgres CDC Connector — CDC Connectors for Apache Flink® documentation flink cdc捕获postgresql数据 1)更改配置文件 需要更改 # 更改wal日志方式为logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个 # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 # 中断

    2023年04月27日
    浏览(6)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(9)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包