一、版本信息
- Flink:1.16.1
二代码实现
- pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wys</groupId>
<artifactId>flink</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.16.1</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- mysql-cdc fat jar -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
<!-- flink webui -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志相关的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<!--flink-connector-starrocks -->
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.9_flink-1.16</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
</project>
- Java代码
package com.wys.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.wys.flink.bean.DataCenterShine;
import com.wys.flink.util.DataStreamUtil;
import com.wys.flink.util.SourceAndSinkInfo;
public class DataStreamMySQLToStarRocks {
public static void main(String[] args) throws Exception {
// 流执行环境
Configuration conf = new Configuration();
// 设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT, "8081");
// 使用配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.enableCheckpointing(180000l, CheckpointingMode.EXACTLY_ONCE);
//设置source和sink的ip端口等信息
SourceAndSinkInfo info=SourceAndSinkInfo.builder()
.sourceIp("ip")
.sourcePort(3306)
.sourceUserName("root")
.sourcePassword("****")
.sinkIp("ip")
.sinkPort(9030)
.sinkUserName("root")
.sinkPassword("")
.build();
//设置DataCenterShine实体类对应表的source和sink
DataStreamUtil.setStarRocksSourceAndSink(env, info, DataCenterShine.class);
//可以设置多个同步
//DataStreamUtil.setStarRocksSourceAndSink(env, info, Organization.class);
//定义任务名称
env.execute("data_center_shine_job");
}
}
- SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息
package com.wys.flink.util;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SourceAndSinkInfo {
/**
* 数据源ip
*/
private String sourceIp;
/**
* 数据源端口
*/
private int sourcePort;
/**
* 数据源账号
*/
private String sourceUserName;
/**
* 数据源密码
*/
private String sourcePassword;
/**
* 输出源ip
*/
private String sinkIp;
/**
* 输出源端口
*/
private int sinkPort;
/**
* 输出源账号
*/
private String sinkUserName;
/**
* 输出源密码
*/
private String sinkPassword;
}
- DataCenterShine实体类,字段与数据库一一对应。
package com.wys.flink.bean;
import com.wys.flink.annotation.FieldInfo;
import com.wys.flink.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* <p>
* 业务类型映射表
* </p>
*
* @author wys
* @since 2023-05-23 11:16:24
*/
@Data
@TableName("wsmg.data_center_shine")
@EqualsAndHashCode(callSuper=false)
public class DataCenterShine extends StarRocksPrimary implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@FieldInfo(order = 1,isPrimaryKey=true,notNull=true)
private Integer id;
/**
* mapper名称
*/
@FieldInfo(order = 2)
private String busName;
/**
* mapper类名
*/
@FieldInfo(order = 3)
private String mapperClassName;
/**
* 实体类名称
*/
@FieldInfo(order = 4)
private String entityClassName;
}
- StarRocksPrimary 实体类
package com.wys.flink.bean;
import org.apache.flink.types.RowKind;
import lombok.Data;
@Data
public class StarRocksPrimary {
/**
* 用于存储StarRocks数据类型:增、删、改
*/
private RowKind rowKind;
}
- FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。
package com.wys.flink.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface FieldInfo {
/**
* 字段排序:插入的字段顺序。
* @return
*/
int order();
/**
* 是否为主键:StarRocks主键模型时需要使用
* @methodName isPrimaryKey
* @return boolean
* @author wys
* @date 2023-12-12
*/
boolean isPrimaryKey() default false;
/**
* 不为空:字段是否为空
* @methodName notNull
* @return boolean
* @author wys
* @date 2023-12-12
*/
boolean notNull() default false;
}
- TableName 注解类,用于记录实体类对应的库与表
package com.wys.flink.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface TableName {
/***
* 表名:库名.表名称,如:sys.user
* @return
*/
String value();
}
- DataStreamUtil工具类,用于设置source和sink。目前定义了MySQL同步到MySQL以及MySQL同步到StarRocks。
package com.wys.flink.util;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.wys.flink.annotation.TableName;
import com.wys.flink.bean.DataCenterShine;
import com.wys.flink.sink.MysqlAndStarRocksSink;
public class DataStreamUtil {
/**
* MySQL同步到MySQL的数据源和输出源设置
* @methodName setMySQLSourceAndSink
* @param env
* @param info
* @param cls void
* @author wys
* @date 2023-12-12
*/
/*@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> void setMySQLSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) {
setSourceAndSink(env, info, cls, ()->new MysqlAndStarRocksSink(cls,info.getSinkIp(), info.getSinkPort()));
}*/
/**
* MySQL同步到StarRocks的数据源和输出源设置
* @methodName setStarRocksSourceAndSink
* @param env
* @param info
* @param cls void
* @author wys
* @date 2023-12-12
*/
public static <T> void setStarRocksSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) {
setSourceAndSink(env, info, cls, ()->StarRocksSinkUtil.getStarRocksSink(cls, info));
}
/**
* 数据源和输出源设置
* @methodName setSourceAndSink
* @param env
* @param info
* @param cls
* @param sink void
* @author wys
* @date 2023-12-12
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private static <T> void setSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls,Supplier<SinkFunction<T>> sink) {
if(cls.isAnnotationPresent(TableName.class)){
String table=cls.getAnnotation(TableName.class).value();
String[] tableArr=table.split("\\.");
// source
MySqlSource<T> mySQLSource= MySqlSource.<DataCenterShine>builder()
.hostname(info.getSourceIp())
.port(info.getSourcePort())
.databaseList(tableArr[0]) // 设置捕获的数据库, 如果需要同步整个数据库,请将tableList 设置为 ".*".
.tableList(table) // 设置捕获的表
.username(info.getSourceUserName())
.password(info.getSourcePassword())
.deserializer(new CustomDebeziumDeserializationSchema(cls)).build();
// 流执行环境添加source
DataStreamSource<T> source=env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(),tableArr[1]+"_source");
// sink
source.addSink(sink.get()).name(tableArr[1]+"_sink");
}
}
}
- StarRocksSinkUtil辅助类,用于设置StarRocksSink
package com.wys.flink.util;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableSchema.Builder;
import org.apache.flink.table.types.DataType;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.wys.flink.annotation.FieldInfo;
import com.wys.flink.annotation.TableName;
import com.wys.flink.bean.StarRocksPrimary;
/**
* StarRocksSink辅助类
* @className StarRocksSinkUtil
* @author wys
* @date 2023-12-12
*/
public class StarRocksSinkUtil {
private static final Pattern TPATTERN = Pattern.compile("[A-Z0-9]");
/**
* 获取StarRocksSink
* @methodName getStarRocksSink
* @param cls
* @param info
* @return SinkFunction<T>
* @author wys
* @date 2023-12-12
*/
@SuppressWarnings("serial")
public static <T> SinkFunction<T> getStarRocksSink(Class<T> cls, SourceAndSinkInfo info) {
Map<Integer, String> fieldMap = getFieldMap(cls);
return StarRocksSink.sink(getTableSchema(cls), getStarRocksSinkOptions(info, cls),
new StarRocksSinkRowBuilder<T>() {
@Override
public void accept(Object[] objects, T beanDataJava) {
try {
//反射设置objects
for (Entry<Integer, String> entry : fieldMap.entrySet()) {
Field field = cls.getDeclaredField(entry.getValue());
field.setAccessible(true);
Object obj = field.get(beanDataJava);
objects[entry.getKey() - 1] = obj;
}
//设置该数据类型
if(beanDataJava instanceof StarRocksPrimary){
objects[objects.length - 1] = ((StarRocksPrimary) beanDataJava).getRowKind().ordinal();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 获取FieldMap
*
* @methodName initFieldMap void
* @author wys
* @date 2023-12-11
*/
private static <T> Map<Integer, String> getFieldMap(Class<T> cls) {
Map<Integer, String> fieldMap = new HashMap<>();
Field[] fields = cls.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(FieldInfo.class)) {
fieldMap.put(field.getAnnotation(FieldInfo.class).order(), field.getName());
}
}
return fieldMap;
}
/**
* 获取TableSchema
* @methodName getTableSchema
* @param cls
* @return TableSchema
* @author wys
* @date 2023-12-12
*/
@SuppressWarnings("deprecation")
private static <T> TableSchema getTableSchema(Class<T> cls) {
Builder builder = TableSchema.builder();
Field[] fields = cls.getDeclaredFields();
//反射设置TableSchema
for (Field field : fields) {
if (!field.isAnnotationPresent(FieldInfo.class)) {
continue;
}
FieldInfo fi = field.getAnnotation(FieldInfo.class);
if (fi.isPrimaryKey()) {
builder.primaryKey(field.getName());
}
DataType dataType = getDataType(field.getType());
if (fi.notNull()) {
dataType = dataType.notNull();
}
builder.field(humpToUnderlined(field.getName()), dataType);
}
return builder.build();
}
/**
* 获取StarRocksSinkOptions
* @methodName getStarRocksSinkOptions
* @param info
* @param cls
* @return StarRocksSinkOptions
* @author wys
* @date 2023-12-12
*/
private static <T> StarRocksSinkOptions getStarRocksSinkOptions(SourceAndSinkInfo info, Class<T> cls) {
String table = cls.getAnnotation(TableName.class).value();
String[] tableArr = table.split("\\.");
return StarRocksSinkOptions.builder()
.withProperty("jdbc-url",String.format("jdbc:mysql://%s:%s/%s", info.getSinkIp(), info.getSinkPort(), tableArr[0]))
.withProperty("load-url", info.getSinkIp() + ":8030")
.withProperty("username", info.getSinkUserName())
.withProperty("password", info.getSinkPassword())
.withProperty("table-name", tableArr[1])
.withProperty("database-name", tableArr[0])
.withProperty("sink.properties.row_delimiter", "\\x02")
.withProperty("sink.properties.column_separator", "\\x01")
.withProperty("sink.buffer-flush.interval-ms", "5000").build();
}
/**
* 驼峰转下划线
*
* @methodName humpToUnderlined
* @param str
* @return String
* @author wys
* @date 2023-12-12
*/
private static String humpToUnderlined(String str) {
Matcher matcher = TPATTERN.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
}
matcher.appendTail(sb);
return sb.toString();
}
/**
* 获取数据类型
* @methodName getDataType
* @param cls
* @return DataType
* @author wys
* @date 2023-12-12
*/
private static DataType getDataType(Class<?> cls) {
if (cls.equals(Integer.class)) {
return DataTypes.INT();
} else if (cls.equals(String.class)) {
return DataTypes.STRING();
} else if (cls.equals(Date.class)) {
return DataTypes.TIMESTAMP();
} else if (cls.equals(BigDecimal.class)) {
return DataTypes.DECIMAL(8, 2);
}
throw new RuntimeException("未找到属性相应类型");
}
}
- CustomDebeziumDeserializationSchema实体类,自定义反序列化方案
package com.wys.flink.util;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
/**
* 自定义反序列化方案
* @className CustomDebeziumDeserializationSchema
* @author wys
* @date 2023-12-12
*/
public class CustomDebeziumDeserializationSchema<T> implements DebeziumDeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private Class<T> cls;
public CustomDebeziumDeserializationSchema(Class<T> cls) {
this.cls=cls;
}
/**
* 只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新
* @methodName deserialize
* @param sourceRecord
* @param collector void
* @author wys
* @date 2023-12-12
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<T> collector) {
JSONObject resJson = new JSONObject();
try {
Struct valueStruct = (Struct) sourceRecord.value();
Struct afterStruct = valueStruct.getStruct("after");
Struct beforeStruct = valueStruct.getStruct("before");
// 修改
if (null!=beforeStruct && null!=afterStruct) {
setDataContent(afterStruct, resJson);
resJson.put("rowKind", RowKind.UPDATE_AFTER);
}
// 插入
else if (null!= afterStruct) {
setDataContent(afterStruct, resJson);
resJson.put("rowKind", RowKind.INSERT);
}
// 删除
else if (null!= beforeStruct ) {
setDataContent(beforeStruct, resJson);
resJson.put("rowKind", RowKind.UPDATE_BEFORE);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("反序列化失败");
}
T t =resJson.toJavaObject(cls);
collector.collect(t);
}
/**
* 设置数据内容
* @methodName setDataContent
* @param struct
* @param resJson void
* @author wys
* @date 2023-12-12
*/
private void setDataContent(Struct struct,JSONObject resJson){
List<Field> fields = struct.schema().fields();
for (Field field : fields) {
String name = field.name();
Object value = struct.get(name);
resJson.put(name, value);
}
}
@Override
public TypeInformation<T> getProducedType() {
return BasicTypeInfo.of(cls);
}
}
三、自定义MySQL同步数据到StarRocks
一、功能描述
- 通过上传jar到Apache Flink Dashboard,输入需要同步的表,可自动生成任务
二、代码实现文章来源:https://www.toymoban.com/news/detail-774058.html
package com.wys.flink;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.wys.flink.annotation.TableName;
import com.wys.flink.util.DataStreamUtil;
import com.wys.flink.util.SourceAndSinkInfo;
/**
* 自定义任务:--entity DataCenterShine,Organization
* @className CustomStreamCDC
* @author wys
* @date 2023-12-11
*/
public class StarRocksCustomStreamCDC {
public static void main(String[] args) throws Exception {
List<Class<?>> clsList=new ArrayList<>();
StringBuilder jobName=new StringBuilder();
ParameterTool parameters = ParameterTool.fromArgs(args);
String entitys = parameters.get("entity",null);
if(null==entitys){
throw new RuntimeException("在Program Arguments中输入需要同步表对应的实体类名称,格式:--entity User,Role...");
}
//获取参数内容这里是实体名称的数组
String[] entityArr=entitys.split(",");
for(String className:entityArr){
Class<?> cls=getBeanClass(String.format("com.wys.flink.bean.%s", className));
clsList.add(cls);
jobName.append(cls.getSimpleName()).append("_");
}
jobName.append("job");
// 流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE);
SourceAndSinkInfo ssi=SourceAndSinkInfo.builder()
.sourceIp("ip")
.sourcePort(3306)
.sourceUserName("root")
.sourcePassword("****")
.sinkIp("ip")
.sinkPort(9030)
.sinkUserName("root")
.sinkPassword("****")
.build();
//设置输入输出源
clsList.forEach(item->DataStreamUtil.setStarRocksSourceAndSink(env, ssi, item));
env.execute(jobName.toString().toLowerCase());
}
/**
* 获取class
* @methodName getBeanClass
* @param className 为全路径
* @return Class<?>
* @author wys
* @date 2023-05-18
*/
private static Class<?> getBeanClass(String className) {
try {
Class<?> cls= Class.forName(className);
if(!cls.isAnnotationPresent(TableName.class)){
throw new RuntimeException("同步的实体类不存在@TableName");
}
return cls;
} catch (ClassNotFoundException e) {
//抛出异常:获取Class失败
throw new RuntimeException(String.format("未找到实体类[%s]", className));
}
}
}
三、Apache Flink Dashboard执行任务文章来源地址https://www.toymoban.com/news/detail-774058.html
- 在Apache Flink Dashboard的Submit New Job菜单,上传打包的jar,输入执行的主类,以及需要同步的表所对应的实体类(多个逗号分割)
- 点击Submit生成相应任务
到了这里,关于Flink DataStream API CDC同步MySQL数据到StarRocks的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!