Flink读写Doris操作介绍

这篇具有很好参考价值的文章主要介绍了Flink读写Doris操作介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink读写Doris操作介绍

​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。

  • Flink操作Doris修改和删除只支持在 Unique Key 模型上

1. 准备开发环境

  • pom.xml加入依赖
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.13_2.12</artifactId>
    <version>1.0.3</version>
</dependency>
  • 创建测试库测试表
-- 切测试库
use test_db;

-- 创建测试表flinktest
CREATE TABLE flinktest
(
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 插入样例数据
insert into flinktest values
(1,1,'jim',2),
(2,1,'grace',2),
(3,2,'tom',2),
(4,3,'bush',3),
(5,3,'helen',3);

-- 查看表数据情况
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+
  • Doris 和 Flink 列类型映射关系
Doris Type Flink Type
NULL_TYPE NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
DATETIME TIMESTAMP
DECIMAL DECIMAL
CHAR STRING
LARGEINT STRING
VARCHAR STRING
DECIMALV2 DECIMAL
TIME DOUBLE
HLL Unsupported datatype

2. Flink-DataStream读Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class Flink_stream_read_doris {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);


        Properties props = new Properties();
        props.setProperty("fenodes", "hdt-dmcp-ops01:8130");
        props.setProperty("username", "root");
        props.setProperty("password", "123456");
        props.setProperty("table.identifier", "test_db.flinktest");

        env
                .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
  代码控制台输出:
[4, 3, bush, 3]
[2, 1, grace, 2]
[1, 1, jim, 2]
[5, 3, helen, 3]
[3, 2, tom, 2]
 */

3. Flink写Doris

Flink 读写 Doris 数据主要有两种方式

  • DataStream
  • SQL

3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * 使用 Flink 将 JSON 数据 写到Doris数据库
 */
public class Flink_stream_write_doris_json {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
        env
                .fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}")
                .addSink(DorisSink.sink(
                        new DorisExecutionOptions.Builder()
                                .setBatchIntervalMs(2000L)
                                .setEnableDelete(false)
                                .setMaxRetries(3)
                                .setStreamLoadProp(pro)
                                .build(),
                        new DorisOptions.Builder()
                                .setFenodes("hdt-dmcp-ops01:8130")
                                .setUsername("root")
                                .setPassword("123456")
                                .setTableIdentifier("test_db.flinktest")
                                .build())
                );

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
    代码执行前: 5 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+

    代码执行后: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:

package com.zenitera.bigdata.doris;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.*;


public class Flink_stream_write_doris_rowdata {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};
        String[] fields = {"siteid", "citycode", "username", "pv"};

        env
                .fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}")

                .map(json -> {
                    JSONObject obj = JSON.parseObject(json);
                    GenericRowData rowData = new GenericRowData(4);
                    rowData.setField(0, obj.getIntValue("siteid"));
                    rowData.setField(1, obj.getShortValue("citycode"));
                    rowData.setField(2, StringData.fromString(obj.getString("username")));
                    rowData.setField(3, obj.getLongValue("pv"));
                    return rowData;

                })


                .addSink(DorisSink.sink(
                        fields,
                        types,
                        new DorisExecutionOptions.Builder()
                                .setBatchIntervalMs(2000L)
                                .setEnableDelete(false)
                                .setMaxRetries(3)
                                .build(),
                        new DorisOptions.Builder()
                                .setFenodes("hdt-dmcp-ops01:8130")
                                .setUsername("root")
                                .setPassword("123456")
                                .setTableIdentifier("test_db.flinktest")
                                .build())
                );

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
    代码执行前: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+

    代码执行后: 7 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|     10 |     1001 | ww       |  100 |
|    100 |     1002 | wang     |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.3 Flink-SQL 方式写Doris

Doris测试表:

use test_db;

truncate table flinktest;

insert into flinktest values
(1,1,'aaa',1),
(2,2,'bbb',2),
(3,3,'ccc',3);

select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        2 | bbb      |    2 |
|      1 |        1 | aaa      |    1 |
|      3 |        3 | ccc      |    3 |
+--------+----------+----------+------+
3 rows in set (0.01 sec)

Flink-SQL代码示例:

package com.zenitera.bigdata.doris;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0518(" +
                " siteid int, " +
                " citycode int, " +
                " username string, " +
                " pv bigint " +
                ")with(" +
                "  'connector' = 'doris', " +
                "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
                "  'table.identifier' = 'test_db.flinktest', " +
                "  'username' = 'root', " +
                "  'password' = '123456' " +
                ")");

        tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Flink_0518 {
        private Integer siteid;
        private Integer citycode;
        private String username;
        private Long pv;
    }
}

执行代码,执行完成后查看Doris对应表数据进行验证:文章来源地址https://www.toymoban.com/news/detail-524631.html

select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      3 |        3 | ccc      |    3 |
|      2 |        2 | bbb      |    2 |
|      1 |        1 | aaa      |    1 |
|      4 |        4 | wangting |    4 |
+--------+----------+----------+------+
4 rows in set (0.01 sec)

3.4 Flink-SQL 方式读Doris

package com.zenitera.bigdata.doris;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris_read {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0520(" +
                " siteid int, " +
                " citycode SMALLINT, " +
                " username string, " +
                " pv bigint " +
                ")with(" +
                "  'connector' = 'doris', " +
                "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
                "  'table.identifier' = 'test_db.flinktest', " +
                "  'username' = 'root', " +
                "  'password' = '123456' " +
                ")");

        tEnv.sqlQuery("select * from flink_0520").execute().print();

    }
}

/*
   控制台输出信息:
+----+-------------+----------+---------------+---------+
| op |      siteid | citycode |      username |      pv |
+----+-------------+----------+---------------+---------+
| +I |           1 |        1 |           aaa |       1 |
| +I |           3 |        3 |           ccc |       3 |
| +I |           2 |        2 |           bbb |       2 |
| +I |           4 |        4 |      wangting |       4 |
+----+-------------+----------+---------------+---------+
4 rows in set
*/

到了这里,关于Flink读写Doris操作介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(46)
  • Apache Doris 系列: 基础篇-Flink SQL写入Doris

    本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。 支持二阶段提交,可实现Exatly Once的写入。 1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置

    2023年04月08日
    浏览(48)
  • Apache Doris (六十四): Flink Doris Connector - (1)-源码编译

     🏡 个人主页:IT贫道-CSDN博客   🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink与Doris版本兼容

    2024年01月18日
    浏览(55)
  • flink streamload写入doris

    官方教程详细介绍了基于flink 1.16的各种写入方式,本文主要介绍的是基于flink 1.13的RowData 数据流(RowDataSerializer)写入

    2024年02月04日
    浏览(50)
  • Flink+Doris 实时数仓

    Doris基本原理 Doris基本架构非常简单,只有FE(Frontend)、BE(Backend)两种角色,不依赖任何外部组件,对部署和运维非常友好。架构图如下 可以 看到Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。 FE(Frontend)以 Java 语言为主,主要功能职责: 接收用户

    2024年02月07日
    浏览(50)
  • 生态扩展:Flink Doris Connector

    官网地址: https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector flink的安装: flink环境配置:vim /etc/profile 复制到flink的lib目录 doris官网:https://doris.apache.org/docs/ecosystem/flink-doris-connector

    2024年02月06日
    浏览(41)
  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(78)
  • Doris-05-集成Spark、Flink、Datax,以及数据湖分析(JDBC、ODBC、ES、Hive、多源数据目录Catalog)

    准备表和数据: Spark 读写 Doris Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。 代码库地址:https://github.com/apache/incubator-doris-spark-connector 支持从 Doris 中读取数据 支持 Spark DataFrame 批量/流式 写入 Doris 可以将 Doris 表映射为 DataFra

    2024年02月06日
    浏览(61)
  • flink+kafka+doris+springboot集成例子

    目录 一、例子说明 1.1、概述 1.1、所需环境 1.2、执行流程  二、部署环境 2.1、中间件部署 2.1.1部署kakfa 2.1.1.1 上传解压kafka安装包 2.1.1.2 修改zookeeper.properties 2.1.1.3 修改server.properties 2.1.1.3 启动kafka 2.1.2、部署flink 2.1.2.1 上传解压flink安装包  2.1.2.1 修改flink配置 2.1.2.3 flink单节

    2024年02月14日
    浏览(42)
  • Flink实时电商数仓之Doris框架(七)

    大规模并行处理的分析型数据库产品。使用场景:一般先将原始数据经过清洗过滤转换后,再导入doris中使用。主要实现的功能有: 实时看板 面向企业内部分析师和管理者的报表 面向用户或者客户的高并发报表分析 即席查询 统一数仓构建:替换了原来由Spark, Hive,Kudu, Hba

    2024年02月03日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包