flink 1.13 streamload写入doris
前言
官方教程详细介绍了基于flink 1.16的各种写入方式,本文主要介绍的是基于flink 1.13的RowData 数据流(RowDataSerializer)写入文章来源地址https://www.toymoban.com/news/detail-758109.html
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.1</flink.version>
</properties>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
一、目标表
-- doris sink table
CREATE TABLE IF NOT EXISTS events (
`user_id` INT NOT NULL COMMENT '用户id',
`date` DATE COMMENT '接收日期',
`event_type` varchar(256) COMMENT '事件类型',
`province` varchar(128) COMMENT '省份',
`city` varchar(128) COMMENT '城市'
`receive_time` DATETIME
)
DUPLICATE KEY(`user_id`, `receive_date`)
COMMENT '事件表'
PARTITION BY RANGE (`receive_date`)
(
FROM
("2023-01-01") TO ("2023-09-14") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH (`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"compression"="LZ4",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "1"
);
二、map
public class EventMapFunction extends RichMapFunction<Event, RowData> {
private static final Logger log = LoggerFactory.getLogger(EventMapFunction.class);
@Override
public RowData map(Event event) {
GenericRowData genericRowData = new GenericRowData(6);
try {
//map 字段映射
genericRowData.setField(0, event.getUserId());
// 字符型需要转化,否则会报错
genericRowData.setField(1, StringData.fromString(event.getReceiveDate()));
genericRowData.setField(2, StringData.fromString(event.getEventType()));
genericRowData.setField(3, StringData.fromString(event.getProvince()));
genericRowData.setField(4, StringData.fromString(event.getCity()));
genericRowData.setField(5, StringData.fromString(event.getReceiveTime()));
} catch (Exception e) {
log.error("Event data map error : " + e);
}
return genericRowData;
}
}
三、DorisSink
public class DorisSinkUtil {
public static SinkFunction<RowData> getDorisSink(String table, String labelPrefix) {
//写入格式
Properties properties = new Properties();
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");
properties.setProperty("strip_outer_array", "true");
SinkFunction<RowData> dorisSink = DorisSink.sink(getEventFields(),
getEventDataType(),
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(0L)
.setMaxRetries(3)
.setStreamLoadProp(properties)
.build(),
DorisOptions.builder()
.setFenodes(readValue("doris.fenodes"))
.setTableIdentifier(table)
.setUsername(readValue("doris.username"))
.setPassword(readValue("doris.password"))
.build());
return dorisSink;
}
//字段及类型一一对应
public static String[] getEventFields() {
return new String[]{
"user_id",
"receive_date",
"event_type",
"province",
"city",
"receive_time"
};
}
public static LogicalType[] getEventDataType() {
return new LogicalType[]{
new IntType(),
new VarCharType(256),
new VarCharType(256),
new VarCharType(128),
new VarCharType(128),
new VarCharType(256)
};
}
}
四、job主类
public static void main(String[] args) {
try {
/** 一、 创建flink流式执行环境 */
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(0);
/** 二、 获取Source */
KafkaSource<String> eventSource = getKafkaSource(
readValue("kafka.broker.id"),
readValue("kafka.event.topic"),
readValue("kafka.event.group.id"));
/** 三、消费 Source */
SingleOutputStreamOperator<String> eventSourceStream = env.fromSource(eventSource, WatermarkStrategy.noWatermarks(), "kafkaSource_event").setParallelism(12);
/** 四、解析数据 */
SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = eventSourceStream.flatMap(new EventParseFlatMap()).setParallelism(12);
/** 五、将java bean类型转化为rowdata类型 */
SingleOutputStreamOperator<RowData> eventStream = eventSingleOutputStreamOperator .map(new EventMapFunction()).setParallelism(12);
/** 六、构建doris sink */
SinkFunction<RowData> eventSink = DorisSinkUtil.getDorisSink(readValue("doris.table.event"), Constants.EVENT_LABEL_PREFIX);
/** 七、输出至doris */
eventStream.addSink(eventSink).setParallelism(12);
env.execute("EventJob");
} catch (Exception e) {
LOG.error("EventJob failed to activate", e);
}
}
文章来源:https://www.toymoban.com/news/detail-758109.html
到了这里,关于flink streamload写入doris的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!