FLINK CDC postgresql (Stream与SQL)

这篇具有很好参考价值的文章主要介绍了FLINK CDC postgresql (Stream与SQL)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Postgres CDC Connector — CDC Connectors for Apache Flink® documentation

flink cdc捕获postgresql数据

1)更改配置文件

需要更改

linux>vi postgresql.conf

# 更改wal日志方式为logical

wal_level = logical # minimal, replica, or logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个

slotsmax_replication_slots = 20 # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样

max_wal_senders = 20 # max number of walsender processes

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)

wal_sender_timeout = 180s # in milliseconds; 0 disable

2)注意

注意:wal_level = logical源表的数据修改时,默认的逻辑复制流只包含历史记录的primary key,如果需要输出更新记录的历史记录的所有字段,需要在表级别修改参数:ALTER TABLE tableName REPLICA IDENTITY FULL; 这样才能捕获到源表所有字段更新后的值

3) 将jar包导入flink lib目录

flink-sql-connector-postgres-cdc-2.2.0.jar 到 flink/lib下

4)新建用户并且给用户复制流权限
-- pg新建用户

CREATE USER user WITH PASSWORD 'pwd';

5) 给用户复制流权限

ALTER ROLE user replication;

6) 给用户登录数据库权限

grant CONNECT ON DATABASE test to user;

7)把当前库public下所有表查询权限赋给用户

GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

8) 发布表

-- 设置发布为true

update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布

select * from pg_publication_tables;

9) 更改表的复制标识包含更新和删除的值

-- 更改复制标识包含更新和删除之前值

ALTER TABLE test0425 REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功)

select relreplident from pg_class where relname='testname';

到这一步,设置已经完全可以啦,上面步骤都是必须的

flink sql 端 创建postgresql 连接器

linux>bin/sql-client.sh     //进入flink sql客户端

CREATE TABLE flink_cdc_source (
id INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'pg数据库IP地址',
'port' = '5432',
'database-name' = 'postgres',
'schema-name' = 'public',
'username' = 'postgres',
'password' = '123456',
'table-name' = 'pg_cdc_source',
'decoding.plugin.name' = 'pgoutput'
);

错误: 复制槽名 "flink" 已经存在

FLINK CDC postgresql (Stream与SQL)

( 解决复制槽名 "flink" 已经存在)

1.切换用户

# su - postgres

2.登陆用户

-bash-4.2$ psql -U postgres

3. 查看复制槽

postgres=# select * from pg_replication_slots; 查看复制槽

FLINK CDC postgresql (Stream与SQL)

 4. 删除复制槽

SELECT * FROM pg_drop_replication_slot('flink'); 删除复制槽

FLINK CDC postgresql (Stream与SQL)

5.验证

postgres=# select * from pg_replication_slots; 查看复制槽

Flink CDC Stream Postgres变更捕获 (java)

package pg;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;


public class FlinkCdcPg {
public static void main(String[]args) throws Exception {
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "initial");
properties.setProperty("decimal.handling.mode", "double"); 
properties.setProperty("database.serverTimezone", "GMT+8"); //设置时区


SourceFunction<String>sourceFunction = PostgreSQLSource.<String>builder()
.hostname("Pg数据库IP地址")
.port(5432)
.database("postgres") // monitor postgresdatabase
.schemaList("public") // monitor inventory schema
.tableList("public.sink2") // monitor productstable
.username("postgres")
.password("123456")
.decodingPluginName("pgoutput") // pg解码插件
.slotName("t_table_slot") // 复制槽名称 不能重复
.deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism1 for sink to keep message ordering


env.execute();

}
}

Flink CDC  SQL TABLE pg读取(java)文章来源地址https://www.toymoban.com/news/detail-427006.html

package pg;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCdcOracleExample {
public static void main(String[]args) throws Exception {

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.disableOperatorChaining();
StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);


String sourceDDL ="CREATE TABLEpg_source (\n" +
" ID INT, \n" +
" NAME STRING, \n" +
" PRIMARY KEY (ID) NOT ENFORCED \n" +
" ) WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = 'Pg数据库IP地址',\n" +
" 'port' = '5432',\n" +
" 'username' = 'postgres',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'postgres',\n" +
" 'schema-name' = 'public',\n" + // 注意这里要大写
" 'table-name' = 'sink2',\n" +
" 'debezium.log.mining.strategy'='online_catalog'\n" +
)";



//执行source表ddl
tableEnv.executeSql(sourceDDL);
TableResult tableResult =tableEnv.executeSql("select * from pg_source");
tableResult.print();
env.execute();

}
}

到了这里,关于FLINK CDC postgresql (Stream与SQL)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Oracle CDC Connector源码解读

    flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。 flink oracle

    2024年02月02日
    浏览(44)
  • 【大数据】基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java / Scala 代码,也无需安装 IDE。 假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。

    2024年02月03日
    浏览(43)
  • 第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步

    Flink作为当前流行的流式计算框架,在对接StarRocks时,若直接使用JDBC的方式“流式”写入数据,对StarRocks是不友好的,StarRocks作为一款MVCC的数据库,其导入的核心思想还是“攒微批+降频率”。为此,StarRocks单独开发了flink-connector-starrocks,其内部实现仍是通过对数据缓存攒批

    2023年04月15日
    浏览(78)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • flink postgresql cdc实时同步(含pg安装配置等)

    类型 版本/描述 docker 20.10.9 Postgresql 10.6 初始化账号密码:postgres/postgres 普通用户:test1/test123 数据库:test_db flink 1.13.6 step1 : 拉取 PostgreSQL 10.6 版本的镜像: step2 :创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为 postgre

    2024年02月11日
    浏览(48)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(44)
  • Flink Upsert Kafka SQL Connector 介绍

    在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的结果值。这个时候,须要将 Kafka 记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 项目提供的 changelog-json format 来实现该性能。 在

    2024年02月20日
    浏览(45)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(46)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(56)
  • 关于flink-sql-connector-phoenix的重写逻辑

    目录 重写意义 代码结构  调用链路 POM文件配置 代码解析 一、PhoenixJdbcD

    2024年02月12日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包