flink streamload写入doris

这篇具有很好参考价值的文章主要介绍了flink streamload写入doris。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink 1.13 streamload写入doris

前言

官方教程详细介绍了基于flink 1.16的各种写入方式,本文主要介绍的是基于flink 1.13的RowData 数据流(RowDataSerializer)写入文章来源地址https://www.toymoban.com/news/detail-758109.html

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.1</flink.version>
    </properties>


        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <version>1.0.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

一、目标表

-- doris sink table 
CREATE TABLE IF NOT EXISTS events (
    `user_id` INT NOT NULL COMMENT '用户id',
    `date` DATE COMMENT '接收日期',
    `event_type` varchar(256)  COMMENT '事件类型',
    `province` varchar(128)  COMMENT '省份',
    `city` varchar(128)  COMMENT '城市'
    `receive_time` DATETIME
)
DUPLICATE KEY(`user_id`, `receive_date`)
COMMENT '事件表'
PARTITION BY RANGE (`receive_date`)
(
FROM
    ("2023-01-01") TO ("2023-09-14") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH (`user_id`) BUCKETS 1
PROPERTIES (
   "replication_allocation" = "tag.location.default: 1",
   "compression"="LZ4",
   "dynamic_partition.enable" = "true",
   "dynamic_partition.time_unit" = "DAY",
   "dynamic_partition.end" = "3",
   "dynamic_partition.prefix" = "p",
   "dynamic_partition.buckets" = "1"
);

二、map

public class EventMapFunction extends RichMapFunction<Event, RowData> {
    private static final Logger log = LoggerFactory.getLogger(EventMapFunction.class);

    @Override
    public RowData map(Event event) {
        GenericRowData genericRowData = new GenericRowData(6);
        try {
        //map 字段映射
            genericRowData.setField(0, event.getUserId());
            // 字符型需要转化,否则会报错
            genericRowData.setField(1, StringData.fromString(event.getReceiveDate()));
            genericRowData.setField(2, StringData.fromString(event.getEventType()));
            genericRowData.setField(3, StringData.fromString(event.getProvince()));
            genericRowData.setField(4, StringData.fromString(event.getCity()));
            genericRowData.setField(5, StringData.fromString(event.getReceiveTime()));

        } catch (Exception e) {
            log.error("Event data map error : " + e);
        }
        return genericRowData;
    }
}

三、DorisSink

public class DorisSinkUtil {
    public static SinkFunction<RowData> getDorisSink(String table, String labelPrefix) {
 		//写入格式   
        Properties properties = new Properties();
        properties.setProperty("read_json_by_line", "true");
        properties.setProperty("format", "json");
        properties.setProperty("strip_outer_array", "true");

        SinkFunction<RowData> dorisSink = DorisSink.sink(getEventFields(),
                getEventDataType(),
                DorisReadOptions.builder().build(),
                DorisExecutionOptions.builder()
                        .setBatchSize(3)
                        .setBatchIntervalMs(0L)
                        .setMaxRetries(3)
                        .setStreamLoadProp(properties)
                        .build(),
                DorisOptions.builder()
                        .setFenodes(readValue("doris.fenodes"))
                        .setTableIdentifier(table)
                        .setUsername(readValue("doris.username"))
                        .setPassword(readValue("doris.password"))
                        .build());
          
        return dorisSink;
    }
	//字段及类型一一对应
    public static String[] getEventFields() {
        return new String[]{
                "user_id",
                "receive_date",
                "event_type",
                "province",
                "city",
                "receive_time"
        };
    }

    public static LogicalType[] getEventDataType() {
        return new LogicalType[]{
                new IntType(),
                new VarCharType(256),
                new VarCharType(256),
                new VarCharType(128),
                new VarCharType(128),
                new VarCharType(256)
        };
    }

}

四、job主类

    public static void main(String[] args) {

        try {
            /** 一、 创建flink流式执行环境 */
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setAutoWatermarkInterval(0);

            /** 二、 获取Source */
            KafkaSource<String> eventSource = getKafkaSource(
                    readValue("kafka.broker.id"),
                    readValue("kafka.event.topic"),
                    readValue("kafka.event.group.id"));
            /** 三、消费 Source */
            SingleOutputStreamOperator<String> eventSourceStream = env.fromSource(eventSource, WatermarkStrategy.noWatermarks(), "kafkaSource_event").setParallelism(12);

            /** 四、解析数据 */
            SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = eventSourceStream.flatMap(new EventParseFlatMap()).setParallelism(12);


			/** 五、将java bean类型转化为rowdata类型 */
            SingleOutputStreamOperator<RowData> eventStream =   eventSingleOutputStreamOperator   .map(new EventMapFunction()).setParallelism(12);
            

            /** 六、构建doris sink */
            SinkFunction<RowData> eventSink = DorisSinkUtil.getDorisSink(readValue("doris.table.event"), Constants.EVENT_LABEL_PREFIX);


            /** 七、输出至doris */
            eventStream.addSink(eventSink).setParallelism(12);


            env.execute("EventJob");
        } catch (Exception e) {
            LOG.error("EventJob failed to activate", e);
        }
    }

到了这里,关于flink streamload写入doris的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink实时同步MySQL与Doris数据

    技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区 1. Flink环境: https://flink.apache.org/zh/ 下载flink-1.15.1 解压,修改配置 修改配置 修改rest.bind-address为 0.0.0.0 下载依赖jar包 至 flink安装目录lib下 启动flink 访问WebUI http://192.168.0.158:8081 2、

    2024年02月13日
    浏览(34)
  • Flink写入数据到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依赖 Flink开发相关依赖 3.Bean实体类 User.java 4.ClickHouse业务写入逻辑 ClickHouseSinkFunction.java open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。 invoke():定义了在每个元素到达Sink操

    2024年02月12日
    浏览(43)
  • Flink将数据写入MySQL(JDBC)

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。 2.1 版本说明 2.2 导入相关依赖 2.3 连接数据库,创建表 2.4 创建POJO类 2.5 自定义map函数 2.5 Flink2MySQL 2.6 启动necat、Flink,观察数据库写

    2024年02月07日
    浏览(33)
  • 6.2、Flink数据写入到Kafka

    目录 1、添加POM依赖 2、API使用说明 3、序列化器 3.1 使用预定义的序列化器 3.2 使用自定义的序列化器 4、容错保证级别 4.1 至少一次 的配置 4.2 精确一次 的配置 5、这是一个完整的入门案例 Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    2024年02月09日
    浏览(35)
  • 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

    Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。 社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何

    2024年02月07日
    浏览(48)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(34)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(48)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(31)
  • Flink将数据写入CSV文件后文件中没有数据

    Flink中有一个过时的 sink 方法: writeAsCsv ,这个方法是将数据写入 CSV 文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因. 这里先看一下数据处理的代码 代码中我是使用的自定义数据源生产数据的方式

    2024年02月16日
    浏览(29)
  • Flink之FileSink将数据写入parquet文件

    在使用FileSink将数据写入列式存储文件中时必须使用 forBulkFormat ,列式存储文件如 ORCFile 、 ParquetFile ,这里就以 ParquetFile 为例结合代码进行说明. 在Flink 1.15.3 中是通过构造 ParquetWriterFactory 然后调用 forBulkFormat 方法将构造好的 ParquetWriterFactory 传入,这里先讲一下构造 ParquetWriterF

    2024年02月03日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包