直接使用Flink CDC SQL的写法,一个Job只能同步一个表的数据,至于原因,在此不再赘述。
直接上代码吧
第一步,自定义 DebeziumDeserializationSchema
将SourceRecord类转化为自定义的JsonRecord类型
public class JsonStringDebeziumDeserializationSchema
implements DebeziumDeserializationSchema<JsonRecord> {
@Override
public void deserialize(SourceRecord record, Collector<JsonRecord> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
String tableName = record.topic();
//out.collect("source table name is :" + tableName);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
String insertMapString = extractAfterRow(value, valueSchema);
JsonRecord jsonRecord = new JsonRecord(tableName, "i", insertMapString);
out.collect(jsonRecord);
} else if (op == Envelope.Operation.DELETE) {
String deleteString = extractBeforeRow(value, valueSchema);
JsonRecord jsonRecord = new JsonRecord(tableName, "d", deleteString);
out.collect(jsonRecord);
} else if (op == Envelope.Operation.UPDATE) {
String updateString = extractAfterRow(value, valueSchema);
JsonRecord jsonRecord = new JsonRecord(tableName, "u", updateString);
out.collect(jsonRecord);
}
}
@Override
public TypeInformation<JsonRecord> getProducedType() {
return TypeInformation.of(new TypeHint<JsonRecord>(){});
}
private String extractAfterRow(Struct value, Schema valueSchema) throws Exception {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Map<String, Object> map = getRowMap(after, afterSchema);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(map);
}
private String extractBeforeRow(Struct value, Schema valueSchema)
throws Exception {
Struct beforeValue = value.getStruct(Envelope.FieldName.BEFORE);
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Map<String, Object> map = getRowMap(beforeValue, beforeSchema);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(map);
}
private Map<String, Object> getRowMap(Struct value, Schema valueSchema) {
Map<String, Object> map = new HashMap<>();
for (Field field : valueSchema.fields()) {
map.put(field.name(), value.get(field.name()));
}
return map;
}
JsonRecord类定义如下:
@Data
public class JsonRecord {
private String tableName;
private String op;
private String fieldValue;
}
其中fieldValue为字段map序列化后的字符串文章来源:https://www.toymoban.com/news/detail-525573.html
第二步,构建DebeziumSourceFunction
public class OracleDebeziumFunctionBuilder {
public DebeziumSourceFunction build(OracleConnectionOption option) {
OracleSource.Builder builder = OracleSource.builder();
builder.hostname(option.getHostName());
builder.port(option.getPort());
builder.username(option.getUserName());
builder.password(option.getPassword());
builder.database(option.getDatabaseName());
String[] tableArray = new String[option.getTableNames().size()];
int count = 0;
for (String tableName : option.getTableNames()) {
tableArray[count] = option.getSchemaName() + "." + tableName;
count++;
}
String[] schemaArray = new String[]{option.getSchemaName()};
builder.tableList(tableArray);
builder.schemaList(schemaArray);
// dbzProperties
Properties dbzProperties = new Properties();
dbzProperties.setProperty("database.tablename.case.insensitive", "false");
if (option.isUseLogmine()) {
dbzProperties.setProperty("log.mining.strategy", "online_catalog");
dbzProperties.setProperty("log.mining.continuous.mine", "true");
} else {
dbzProperties.setProperty("database.connection.adpter", "xstream");
dbzProperties.setProperty("database.out.server.name", option.getOutServerName());
}
builder.debeziumProperties(dbzProperties);
builder.deserializer(new JsonStringDebeziumDeserializationSchema());
builder.startupOptions(option.getStartupOption());
return builder.build();
}
}
OracleConnectionOption类定义如下:
public class OracleConnectionOption {
private String hostName;
private int port;
private String databaseName;
private String userName;
private String password;
/** 是否支持logmine */
private boolean useLogmine;
private String outServerName;
private List<String> tableNames;
private String schemaName;
private StartupOptions startupOption;
}
第三步,编写main函数
通过OutputTag实现分流文章来源地址https://www.toymoban.com/news/detail-525573.html
public class CdcStartup {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
OracleConnectionOption connectionOption = new OracleConnectionOption();
connectionOption.setHostName(...);
connectionOption.setPort(...);
connectionOption.setDatabaseName(...);
connectionOption.setUserName(...);
connectionOption.setPassword(...);
connectionOption.setUseLogmine(...);
connectionOption.setOutServerName(...);
connectionOption.setSchemaName(...);
connectionOption.setStartupOption(StartupOptions.initial());
List<String> tableNames =new ArrayList();
// 添加要同步的表名;
tableNames.add("") ;
OracleDebeziumFunctionBuilder functionBuilder = new OracleDebeziumFunctionBuilder();
DebeziumSourceFunction sourceFunction = functionBuilder.build(connectionOption);
DataStreamSource<JsonRecord> dataStreamSource = env.addSource(sourceFunction );
//sink
Map<String, OutputTag<JsonRecord>> outputTagMap = new HashMap<>();
for (String tableName : tableNames) {
outputTagMap.put(tableName , new OutputTag(tableName, TypeInformation.of(JsonRecord.class)));
}
SingleOutputStreamOperator mainStream = dataStreamSource.process(new ProcessFunction<JsonRecord, Object>() {
@Override
public void processElement(JsonRecord value, Context ctx, Collector<Object> out) throws Exception {
int index = value.getTableName().lastIndexOf(".");
String originalName= value.getTableName().substring(index + 1);
ctx.output(outputTagMap.get(originalName), value);
}
});
for (String tableName : tableNames) {
DataStream outputStream = mainStream.getSideOutput(outputTagMap.get(tableName));
CustomSinkFunction sinkFunction = new CustomSinkFunction ();//自定义sink
outputStream.addSink(sinkFunction).name(tableName);
}
env.execute();
}
}
到了这里,关于Flink CDC实现一个Job同步多个表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!