环境
flink 1.15.3(此时最新版本为1.16.1)
mysql 5.7+
starrocks 2.5.2
mysql同步表结构
mysql中的timestamp字段是可以正常同步的,但是多了8小时,设置了mysql链接属性也没效果
CREATE TABLE `temp_flink` (
`id` int(11) NOT NULL,
`name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`remark` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
参考下方的链接有两种方式;文章来源:https://www.toymoban.com/news/detail-596301.html
这里使用单独的转换器代码如下
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* mysql日期字段时区/格式处理
* @author JGMa
*/
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
@Override
public void configure(Properties props) {
}
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName().toUpperCase();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if ("DATE".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter = this::convertDate;
}
if ("TIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter = this::convertTime;
}
if ("DATETIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter = this::convertDateTime;
}
if ("TIMESTAMP".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
}
}
private String convertDate(Object input) {
if (input == null) {
return null;
}
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
}
if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
return String.valueOf(input);
}
private String convertTime(Object input) {
if (input == null) {
return null;
}
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
return String.valueOf(input);
}
private String convertDateTime(Object input) {
if (input == null) {
return null;
}
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
}
return String.valueOf(input);
}
private String convertTimestamp(Object input) {
if (input == null) {
return null;
}
if (input instanceof ZonedDateTime) {
// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime).replaceAll("T", " ");
}
return String.valueOf(input);
}
}
使用
{
public static void main(String[] args) {
String tableName = "temp_flink";
String srcHost = "192.168.10.14";
String srcDatabase = "xcode";
String srcUsername = "root";
String srcPassword = "123456";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
Properties mysqlProperties = new Properties();
// mysqlProperties.setProperty("characterEncoding","UTF-8");
// mysqlProperties.setProperty("connectionTimeZone","Asia/Shanghai");
//自定义时间转换配置
mysqlProperties.setProperty("converters", "dateConverters");
mysqlProperties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(srcHost)
// .jdbcProperties(mysqlProperties)
.port(3306)
.databaseList(srcDatabase)
.tableList(srcDatabase + "." + tableName)
.username(srcUsername)
.password(srcPassword)
// .serverTimeZone("Asia/Shanghai")
// 主要是这里
.debeziumProperties(mysqlProperties)
.deserializer(new JsonStringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[temp_flink-source]")
.setParallelism(1);
streamSource.addSink(StarRocksSink.sink(
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8")
.withProperty("load-url", "192.168.10.245:8030")
.withProperty("database-name", "xcode")
.withProperty("username", "root")
.withProperty("password", "123456")
.withProperty("table-name", tableName)
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
// .withProperty("sink.properties.partial_update", "true")
// .withProperty("sink.properties.columns", "k1,k2,k3")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
.withProperty("sink.parallelism", "1")
.build())
).name(">>>StarRocks temp_flink Sink<<<");
try {
env.execute("temp_flink stream sync");
} catch (Exception e) {
e.printStackTrace();
log.error("[sync error] info : {}", e);
}
}
}
参考资料
https://blog.csdn.net/cloudbigdata/article/details/122935333
https://blog.csdn.net/WuBoooo/article/details/127387144文章来源地址https://www.toymoban.com/news/detail-596301.html
到了这里,关于Flink cdc同步mysql到starrocks(日期时间格式/时区处理)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!