怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

这篇具有很好参考价值的文章主要介绍了怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。

社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何使用 Flink Doris Connector 如何将 bitmap 数据写入 Doris 中。

前置准备
Doris2.0.1的环境

Flink1.16,同时将 Doris Flink Connector的Jar包放在<FLINK_HOME>/lib 下面。

创建Doris表

CREATE TABLE `page_view_bitmap` (
`dt` int,
`page` varchar(256),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)

写入Bitmap数据
这里模拟Flink读取MySQL数据写入Doris,同时将user_id存储到bitmap中。

模拟数据

创建MySQL表

CREATE TABLE `page_view` (
 `id` int NOT NULL,
 `dt` int,
 `page` varchar(256),
 `user_id` int,
 PRIMARY KEY (`id`)
);

#模拟数据
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (1, 20230921, 'home', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (2, 20230921, 'home', 1002);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (3, 20230921, 'search', 1003);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (4, 20230922, 'mine', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (5, 20230922, 'home', 1002);
FlinkSQL写入Bitmap
#使用JDBC读取mysql数据
CREATE TABLE page_view (
   `dt` int,
   `page` string,
   `user_id` int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/test',
   'table-name' = 'page_view',
   'username' = 'root',
   'password' = '123456'
);

doris connector写入数据

CREATE TABLE page_view_bitmap (
dt int,
page string,
user_id int
)
WITH (
 'connector' = 'doris',
 'fenodes' = '127.0.0.1:8030',
 'table.identifier' = 'test.page_view_bitmap',
 'username' = 'root',
 'password' = '',
 'sink.label-prefix' = 'doris_label1',
 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
);

insert into page_view_bitmap select * from page_view
我们知道 Doris Flink Connector Sink 底层是基于 Doris Stream Load 来实现的,同样 Stream load 在 Connector 里也是一样适用,我们将这个参数封装在了 :sink.properties 参数里,
这里我们可以看到上面这个例子里我们在是 With 属性里加入了我们 Columns 参数,这里我们配置了列的转换操作,将 user_id 通过 to_bitmap 函数进行转换,并导入到 Doris 表里。
查询结果

mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt       | page   | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home   | 1001,1002                 |
| 20230921 | search | 1003                      |
| 20230922 | home   | 1002                      |
| 20230922 | mine   | 1001                      |
+----------+--------+---------------------------+
4 rows in set (0.00 sec)

Flink DataStream
使用 DataStream API 模拟数据写入刚才的表中。

DataStream API 对 Bitmap 的操作也是和上面 SQL 操作的方式一样。

public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.setRuntimeMode(RuntimeExecutionMode.BATCH);

       DorisSink.Builder<String> builder = DorisSink.builder();
       final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
       Properties properties = new Properties();
       properties.setProperty("column_separator", ",");
       properties.setProperty("format", "csv");
       properties.setProperty("columns", "dt,page,user_id,user_id=to_bitmap(user_id)");

       DorisOptions.Builder dorisBuilder = DorisOptions.builder();
       dorisBuilder.setFenodes("127.0.0.1:8030")
              .setTableIdentifier("test.page_view_bitmap")
              .setUsername("root")
              .setPassword("");
       DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
       executionBuilder.setLabelPrefix("doris_label")
              .setStreamLoadProp(properties)
              .setDeletable(false);

       builder.setDorisReadOptions(readOptionBuilder.build())
              .setDorisExecutionOptions(executionBuilder.build())
              .setSerializer(new SimpleStringSerializer())
              .setDorisOptions(dorisBuilder.build());

       //mock data
       DataStreamSource<String> stringDataStreamSource = env.fromCollection(
               Arrays.asList("20230921,home,1003", "20230921,search,1001", "20230923,home,1001"));
       stringDataStreamSource.sinkTo(builder.build());
       env.execute("doris bitmap write");
  }

查询结果文章来源地址https://www.toymoban.com/news/detail-723854.html

mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt       | page   | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home   | 1001,1002,1003            |
| 20230921 | search | 1001,1003                 |
| 20230922 | home   | 1002                      |
| 20230922 | mine   | 1001                      |
| 20230923 | home   | 1001                      |
+----------+--------+---------------------------+
5 rows in set (0.00 sec)

到了这里,关于怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Flink X Apache Doris构建极速易用的实时数仓架构

    大家好,我叫王磊。是SelectDB 大数据研发。今天给大家带来的分享是《Apache Flink X Apache Doris构建极速易用的实时数仓架构》。 下面是我们的个人介绍:我是Apache Doris Contributor 和阿里云 MVP。同时著有《 图解 Spark 大数据快速分析实战》等书籍。 接下来咱们进入本次演讲的正题

    2023年04月24日
    浏览(49)
  • Flink实时写入Apache Doris如何保证高吞吐和低延迟

    随着实时分析需求的不断增加,数据的时效性对于企业的精细化运营越来越重要。借助海量数据,实时数仓在有效挖掘有价值信息、快速获取数据反馈、帮助企业更快决策、更好的产品迭代等方面发挥着不可替代的作用。 在这种情况下,Apache Doris 作为一个实时 MPP 分析数据库脱颖

    2024年01月17日
    浏览(49)
  • Apache Doris 入门教程03:使用Docker或Kubernetes部署Doris

    该文档主要介绍了如何通过 Dockerfile 来制作 Apache Doris 的运行镜像,以便于在容器化编排工具或者快速测试过程中可迅速拉取一个 Apache Doris Image 来完成集群的创建。 概述​ Docker 镜像在制作前要提前准备好制作机器,该机器的平台架构决定了制作以后的 Docker Image 适用的平台

    2024年02月07日
    浏览(47)
  • Apache Doris大规模数据使用指南

    目录 一、发展历史 二、架构介绍 弹性MPP架构-极简架构 逻辑架构 基本访问架构 三、Doris的数据分布

    2024年02月12日
    浏览(48)
  • 大数据Doris(十四):Doris表中的数据基本概念

    文章目录 Doris表中的数据基本概念 一、​​​​​​​Row Column

    2024年02月06日
    浏览(48)
  • 使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

    Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。 通过内置的Flink CDC,连接器可以直接将上游源的表模式和数据同步到Apache Doris,这意味着用户不再需要编写DataStream程序或在Doris中预先创建映射表。

    2024年02月09日
    浏览(62)
  • Redis之bitmap类型解读

    目录 基本介绍 基本命令  Setbit  Getbit  BITCOUNT  应用场景 统计当日活跃用户 用户签到 bitmap - Redis布隆过滤器 (应对缓存穿透问题) 基本介绍 Redis 的位图(bitmap)是由多个二进制位组成的数组,只有两种状态,0和1, 数组中的每个二进制位都有与之对应的偏移量(从 0 开始)

    2024年02月11日
    浏览(36)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(39)
  • 使用 Apache Flink 开发实时 ETL

    Apache Flink 是大数据领域又一新兴框架。它与 Spark 的不同之处在于,它是使用流式处理来模拟批量处理的,因此能够提供亚秒级的、符合 Exactly-once 语义的实时处理能力。Flink 的使用场景之一是构建实时的数据通道,在不同的存储之间搬运和转换数据。本文将介绍如何使用 F

    2024年02月05日
    浏览(41)
  • 怎么使用VBA查找多个工作表中相同和不相同的数据

    您可以使用VBA代码来查找多个工作表中的相同和不相同的数据。以下是一些基本步骤: 打开Microsoft Excel,然后打开您想要查找的工作簿。 按下\\\"Alt + F11\\\"键打开\\\"Visual Basic for Applications\\\" (VBA) 编辑器。 在VBA编辑器中,单击\\\"插入\\\",然后选择\\\"模块\\\"以创建一个新的模块。 在模块窗口

    2024年02月12日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包