Apache Doris 系列: 基础篇-Flink SQL写入Doris

这篇具有很好参考价值的文章主要介绍了Apache Doris 系列: 基础篇-Flink SQL写入Doris。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分:

  • Flink Doris connector
  • Doris FE 节点配置
  • Flink SQL 写 Doris

Flink Doris connector

Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。
支持二阶段提交,可实现Exatly Once的写入。

Doris FE 节点配置

1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置:

enable_http_server_v2 = true
  1. 重启 FE 节点
apache-doris/fe/bin/stop_fe.sh
apache-doris/fe/bin/start_fe.sh --daemon

Doris BE 节点配置

1)需在 apache-doris/be/be.conf 配置文件添加如下配置:

enable_stream_load_record = true
  1. 重启 BE 节点
apache-doris/be/bin/stop_be.sh
apache-doris/be/bin/start_be.sh --daemon

Doris 建表语句

CREATE TABLE order_info (
  order_date date NOT NULL COMMENT '下单日期',
  order_id int(11) NOT NULL COMMENT '订单id',
  buy_num tinyint(4) NULL COMMENT '购买件数',
  user_id int(11) NULL COMMENT '[-9223372036854775808, 9223372036854775807]',
  create_time datetime NULL COMMENT '创建时间',
  update_time datetime NULL COMMENT '更新时间'
) ENGINE=OLAP
DUPLICATE KEY(order_date, order_id)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(order_id) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

本例使用的明细模型,仅insert操作,如需update/delete,则需选择Unique模型

生成测试数据

通过Flink SQL自带的datagen生成测试数据:

CREATE TABLE order_info_source (
    order_date DATE,
    order_id     INT,
    buy_num      INT,
    user_id      INT,
    create_time  TIMESTAMP(3),
    update_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' =  '10',
  'fields.order_id.min' = '30001',
  'fields.order_id.max' = '30500',
  'fields.user_id.min' = '10001',
  'fields.user_id.max' = '20001',
  'fields.buy_num.min' = '10',
  'fields.buy_num.max' = '20',
  'number-of-rows' = '100'
)

datagen参数:
'rows-per-second' = '10' : 每秒发送10条数据
'fields.order_id.min' = '30001': order_id最小值为30001
'fields.order_id.max' = '30500': order_id最大值为30500
'fields.user_id.min' = '10001': user_id最小值为10001
'fields.user_id.max' = '20001': user_id最大值为20001
'fields.buy_num.min' = '10': buy_num最小值为10
'fields.buy_num.max' = '20': buy_num最大值为20
'number-of-rows' = '100': 共发送100条数据, 不设置的话会无限量发送数据

更多细节,请前往官网DataGen SQL Connector

注册Doris Sink表

CREATE TABLE order_info_sink (  
order_date DATE,  
order_id INT,  
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3)
)  
WITH (
'connector' = 'doris',   
'fenodes' = '192.168.56.104:8030',   
'table.identifier' = 'test.order_info_example',   
'username' = 'test',   
'password' = 'password123',   
'sink.label-prefix' = 'sink_doris_label_8'
)

写入Doris Sink表

insert into order_info_sink select * from order_info_source

通过Mysql客户端查看Doris表的数据

mysql> select * from  test.order_info_example limit 10;
+------------+----------+---------+---------+---------------------+---------------------+
| order_date | order_id | buy_num | user_id | create_time         | update_time         |
+------------+----------+---------+---------+---------------------+---------------------+
| 2022-09-22 |    30007 |      10 |   10560 | 2022-09-22 07:42:21 | 2022-09-22 07:42:21 |
| 2022-09-22 |    30125 |      16 |   17591 | 2022-09-22 07:42:26 | 2022-09-22 07:42:26 |
| 2022-09-22 |    30176 |      17 |   10871 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 |    30479 |      16 |   19847 | 2022-09-22 07:42:25 | 2022-09-22 07:42:25 |
| 2022-09-22 |    30128 |      16 |   19807 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 |    30039 |      13 |   18237 | 2022-09-22 07:42:28 | 2022-09-22 07:42:28 |
| 2022-09-22 |    30060 |      10 |   18309 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 |    30246 |      18 |   10855 | 2022-09-22 07:42:24 | 2022-09-22 07:42:24 |
| 2022-09-22 |    30288 |      19 |   12347 | 2022-09-22 07:42:26 | 2022-09-22 07:42:26 |
| 2022-09-22 |    30449 |      17 |   11488 | 2022-09-22 07:42:23 | 2022-09-22 07:42:23 |
+------------+----------+---------+---------+---------------------+---------------------+
10 rows in set (0.05 sec)

完整代码

src/main/java/FlinkSQLSinkExample.java文章来源地址https://www.toymoban.com/news/detail-403945.html

到了这里,关于Apache Doris 系列: 基础篇-Flink SQL写入Doris的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink系列之:Apache Kafka SQL 连接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 以下示例展示了如何创建 Kafka 表: 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(W)。 只读列必须声明为 VI

    2024年02月01日
    浏览(37)
  • Apache Doris (六十四): Flink Doris Connector - (1)-源码编译

     🏡 个人主页:IT贫道-CSDN博客   🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录 1. Flink与Doris版本兼容

    2024年01月18日
    浏览(39)
  • 如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓

    随着大数据应用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需求已成为数据应用新常态。伴随着实时分析需求的不断膨胀,传统的数据架构面临的成本高、实时性无法保证、组件繁冗、运维难度高等问题日益凸显。为了适应业务快速迭代的特点,帮助企业

    2024年02月12日
    浏览(35)
  • Apache Flink X Apache Doris构建极速易用的实时数仓架构

    大家好,我叫王磊。是SelectDB 大数据研发。今天给大家带来的分享是《Apache Flink X Apache Doris构建极速易用的实时数仓架构》。 下面是我们的个人介绍:我是Apache Doris Contributor 和阿里云 MVP。同时著有《 图解 Spark 大数据快速分析实战》等书籍。 接下来咱们进入本次演讲的正题

    2023年04月24日
    浏览(31)
  • 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

    Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。 社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何

    2024年02月07日
    浏览(47)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(48)
  • 【Apache-Flink零基础入门】「入门到精通系列」手把手+零基础带你玩转大数据流式处理引擎Flink(基础概念解析+有状态的流式处理)

    Apache Flink 是业界公认的最佳流计算引擎之一,它不仅仅局限于流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎。Flink 的用户只需根据业务逻辑开发一套代码,就能够处理全量数据、增量数据和实时数据,无需针对不同的数据类型开发不同的方案。这使得

    2024年02月03日
    浏览(62)
  • Apache Doris 系列:Bucket(分桶)数量设置建议

    问题描述: 上线运行一段时间后,随着越来越多的数据增长,集群每次重启后一周左右,读写就会开始变得越来越慢,直到无法正常进行读写。 问题处理: 对数仓表的 Schema 的分析,发现有些表数据并不大,但是 Bucket 却设置的非常大 通过 show data from table 命令列出所有表Bu

    2024年02月16日
    浏览(25)
  • Apache Doris 系列: 入门篇-数据导入及查询

    本文档主要介绍 Doris 的数据导入及数据查询。 Doris 为了满足不同业务场景的数据接入需求,提供不丰富的数据导入方式,去支持不同的数据源:外部存储(HDFS,对象存储)、本地文件、消息队列(Kafka)及其他外部业务系统数据库(MySQL、Oracle、SQLServer、PostgreSQL等),支持

    2023年04月09日
    浏览(40)
  • 【Apache-Flink零基础入门】「入门到精通系列」手把手+零基础带你玩转大数据流式处理引擎Flink(特点和优势分析+事件与时间维度分析)

    本文介绍了Apache Flink的定义、架构、基本原理,并辨析了大数据流计算相关的基本概念。同时回顾了大数据处理方式的历史演进以及有状态的流式数据处理的原理。最后,分析了Apache Flink作为业界公认为最好的流计算引擎之一所具备的天然优势,旨在帮助读者更好地理解大数

    2024年02月03日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包