一.前置工作
1.更改配置文件postgresql.conf
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值文章来源:https://www.toymoban.com/news/detail-518807.html
更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改文章来源地址https://www.toymoban.com/news/detail-518807.html
2.新建用户并且给用户复制流权限
-- pg新建用户
CREATE USER hadoop WITH PASSWORD '***';
-- 给用户复制流权限
ALTER ROLE hadoop replication;
-- 给用户数据库权限
grant CONNECT ON DATABASE hadoop to hadoop;
-- 把当前库所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO hadoop;
3.发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
二.java代码示例
import com.alibaba.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.yogorobot.gmall.realtime.function.MyDebezium;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.time.Duration;
import java.util.Properties;
public class Flink_CDCWIthProduct {
private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
//功能:测试实时读取pgsql数据
public static void main(String[] args) throws Exception {
//TODO 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "never");
properties.setProperty("debezium.slot.name", "pg_cdc");
properties.setProperty("debezium.slot.drop.on.stop", "true");
properties.setProperty("include.schema.changes", "true");
//使用连接器配置属性启用定期心跳记录生成
properties.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));
//TODO 创建Flink-PgSQL-CDC的Source 读取生产环境pgsql数据库
SourceFunction<String> pgsqlSource = PostgreSQLSource.<String>builder()
.hostname("pgr-***.pg.rds.aliyuncs.com")
.port(1921)
.database("jarvis_ticket") // monitor postgres database
.schemaList("jarvis_ticket") // monitor inventory schema
.tableList("jarvis_ticket.t_category") // monitor products table
.username("***")
.password("***")
//反序列化
.deserializer(new MyDebezium())
//标准逻辑解码输出插件
.decodingPluginName("pgoutput")
//配置
.debeziumProperties(properties)
.build();
//TODO 使用CDC Source从PgSQL读取数据
DataStreamSource<String> pgsqlDS = env.addSource(pgsqlSource);
//TODO 将数据输出到kafka中
//pgsqlDS.addSink(MyKafkaUtil.getKafkaSink("***"));
//TODO 打印到控制台
pgsqlDS.print();
//TODO 执行任务
env.execute();
}
}
三.new MyDebezium代码示例
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class MyDebezium implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建一个JSONObject用来存放最终封装好的数据
JSONObject result = new JSONObject();
//2.获取数据库以及表名
String topic = sourceRecord.topic();
String[] split = topic.split("\\.");
//数据库名
String schema = split[1];
//表名
String tableName = split[2];
//4.获取数据
Struct value = (Struct) sourceRecord.value();
//5.获取before数据
Struct structBefore = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (structBefore != null) {
Schema schemas = structBefore.schema();
List<Field> fields = schemas.fields();
for (Field field : fields) {
beforeJson.put(field.name(), structBefore.get(field));
}
}
//6.获取after数据
Struct structAfter = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (structAfter != null) {
Schema schemas = structAfter.schema();
List<Field> fields = schemas.fields();
for (Field field : fields) {
afterJson.put(field.name(), structAfter.get(field));
}
}
String type="update";
if(structBefore==null){
type="insert";
}
if(structAfter==null){
type="delete";
}
//将数据封装到JSONObject中
result.put("schema", schema);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//将数据发送至下游
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
到了这里,关于FlinkCDC实时读PostgreSQL数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!