一、创建用户和数据库
- 创建用户
- CREATE USER ‘test’ IDENTIFIED BY ‘123456’;
- 后续登录就可以直接使用命令登录
- mysql -h 192.168.1.101 -P9030 -utest -p12345
- 创建数据库并赋予权限
- 初始可以通过 root 或 admin 用户创建数据库
- create database test_db;
- 查看数据库
- show databases;
- 授权
- grant all on test_db to test;
- 注意
- 可以使用 help command 查看语法帮助,不清楚命令全名的话可以使用 ‘help 某一字段’ 进行模糊查询
- information_schema 数据库是为了兼容 MySQL 协议而存在的
- 初始可以通过 root 或 admin 用户创建数据库
二、建表
1. 基本概念
- Doris 数据都以表 Table 的形式进行逻辑上的描述,一张表包括 行 Row 和 列 Column,Row 就是用户一行数据
- 从表的角度来看,一张 Table 会拆成多个 Tablet,Tablet 会存成多副本,存储在不同的 BE 中,BE 节点上物理数据的可靠性通过多副本来实现,默认 3 副本
- Tablet 和 Partition
- Doris 存储引擎中,用户数据被水平划分为若干数据分片 Tablet,也称为数据分桶,每个 Tablet 包含若干行数据,和其他 Tablet 没有交集,物理上独立存储
- 多个 Tablet 在逻辑上归属于不同的分区 Partition,一个 Tablet 只属于一个 Partition,一个 Partition 包含若干个 Tablet,若干个 Partition 组成一个 Table
- Tablet 是数据移动、复制等操作的最小物理存储单元,Partition 可以视为逻辑上最小的管理单元,数据导入和删除,都可以针对一个 Partition 进行
-
Doris 存储引擎规则
- 用户数据首先被划分成若干个分区 Partition,划分对规则通常是按照用户指定的分区进行范围划分,比如按时间划分
- 在每个分区内,数据被进一步按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶,每个分桶就是一个 Tablet,也是最小数据划分逻辑单元
- Partition 可以视为逻辑上最小的管理单元,数据的导入与删除,都可以针对一个 Partition 进行
- Tablet 直接的数据是没有交集的,独立存储,Tablet 也是数据移动、复制操作的最小物理存储单元
2. 创建表
- 首先需要切换数据库(use test_db)
- 使用帮助命令可以查看很多案例 help create table
- 数据类型
- tinyint、smallint、int、bigint、largeint、float、double、decimal(precision,scale)、date、datetime、char(length)、varchar(length)、hll、bitmap、agg_type
- 建表方式
- 单分区
- 即数据不分区,只做 HASH 分布,也就是分桶
- 复合分区
- 第一级为 Partition,即分区,用户可以指定某一纬度列作为分区列(整型、时间类型),并指定分区取值范围
- 第二级为 Distribution,即分桶,用户指定一个或多个纬度列以及桶数对数据进行 HASH 分布
- 使用复合分区的场景
- 有时间纬度或类似带有有序值的纬度:可以以这类纬度列作为分区列,分区粒度可以根据导入频次、分区数量等进行评估
- 历史数据删除需求:可以通过删除历史分区来达到目的,也可以通过在指定分区内发送 DELETE 语句进行数据删除
- 解决数据倾斜问题:每个分区可以单独指定分桶数量,如按天分区,当天的数据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择区分度大的列
- 单分区
3. 数据导入
-
相关文档:https://doris.apache.org/zh-CN/docs/data-operate/import/load-manual
-
所有导入方式都支持 CSV 数据格式,Broker load 还支持 parquet 和 orc 数据格式
-
Stream load
- 用户通过 HTTP 协议提交请求并携带原始数据创建导入,主要用于快速将本地文件或数据流中的数据导入到 Doris,导入命令同步返回导入结果
- curl --location-trusted -u root:123456 -H “label:table1_20221121” -H “column_separator:,” -T table1_data http://hybrid01:8030/api/test_db/table1/_stream_load
-
Insert
- 类似 MySQL 中的 Insert 语句,Doris 提供 inssert into tbl SELECT …; 的方式从 Doris 表中读取数据并导入到另一张表或 insert into tbl values(…) 插入
- insert into table1 values(1,1,‘user1’,10);
-
Broker load
- 通过 Broker 进程访问并读取外部数据源导入到 Doris,用户通过 MySQL 协议提交导入任务后,异步执行,show load 命令查看导入结果
- 具体语法案例见代码
-
Multi load
- 用户通过 HTTP 协议提交多个导入作业,Multi Load 可以保证多个导入作业的原子生效
-
Routine load
- 通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断从数据源中读取数据并导入到 Doris 中
- 目前仅支持 Kafka 进行导入,支持无认证、SSL 认证的 Kafka 集群,支持的格式为 csv 文本格式,每个 message 为一行,行尾不包括换行符
- 原理
- FE 通过 JobScheduler 将一个导入任务拆分成若干个 Task,每个 Task 负责导入指定的一部分数据,Task 被 TaskScheduler 分配到指定的 BE 上执行
- 在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入,导入完成后向 FE 汇报
- FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试
- 整个例行导入作业通过不断的产生新的 Task 来完成数据不间断的导入
- 具体语法案例见代码
-
通过 S3 协议直接导入文章来源:https://www.toymoban.com/news/detail-491660.html
- 用法和 Broker Load 类似
三、代码案例
-- 建表语句案例
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name
(column_definition1[, column_definition2, ...]
[, index_definition1[, index_definition2, ...]])
[ENGINE = [olap|mysql|broker|hive|iceberg]]
[key_desc]
[COMMENT "table comment"];
[partition_desc]
[distribution_desc]
[rollup_index]
[PROPERTIES ("key"="value", ...)]
[BROKER PROPERTIES ("key"="value", ...)]
-- 创建表-单分区表
CREATE TABLE test_db.table1
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
-- insert 导入
insert into table1 values(1,1,'user1',10);
insert into table1 values(1,1,'user1',10);
insert into table1 values(1,2,'user1',10);
-- 创建表-复合分区表
-- event_day 作为分区列,建立三个分区,每个分区使用 siteid 进行哈希分桶,桶数为10
-- p202209: 范围[最小值,2022-10-01]
-- p202210: 范围[2022-10-01, 2022-11-01]
-- p202211: 范围[2022-11-01, 2022-12-01]
-- 左闭右开
CREATE TABLE test_db.table2
(
event_day DATE,
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(event_day,siteid,citycode,username)
PARTITION BY RANGE(event_day)
(
PARTITION p202209 VALUES LESS THAN ('2022-10-01'),
PARTITION p202210 VALUES LESS THAN ('2022-11-01'),
PARTITION p202211 VALUES LESS THAN ('2022-12-01')
)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "3");
-- Broker 导入,broker_id 就是 broker 的名字,最后一个是能容忍的错误数据量
LOAD LABEL test_db.table2
(
DATA INFILE("hdfs://hybrid01:8020/data/table2_data.csv")
INTO TABLE `table2`
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(
event_day,siteid,citycode,username,pv
)
)
WITH BROKER broker_id
(
"dfs.nameservices" = "my_cluster",
"dfs.ha.namenodes.my_cluster" = "nn1,nn2,nn3",
"dfs.namenode.rpc-address.my_cluster.nn1" = "hybrid01:8020",
"dfs.namenode.rpc-address.my_cluster.nn2" = "hybrid02:8020",
"dfs.namenode.rpc-address.my_cluster.nn3" = "hybrid03:8020",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
PROPERTIES
(
"max_filter_ratio" = "0.00002"
);
-- Routine load 案例
CREATE TABLE student_kafka
(
id int,
name varchar(50),
age int
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;
-- 创建导入任务,任务名字 kafka_job1
CREATE ROUTINE LOAD test_db.kafka_job1 on student_kafka
PROPERTIES
(
"desired_concurrent_number"="1",
"strict_mode"="false",
"format"="json"
)
FROM KAFKA
(
"kafka_broker_list"="hybrid01:9092,hybrid02:9092,hybrid03:9092",
"kafka_topic"="test",
"property.group.id"="test_group_1",
"property.kafka_default_offsets"="OFFSET_BEGINNING",
"property.enable.auto.commit"="false"
);
文章来源地址https://www.toymoban.com/news/detail-491660.html
到了这里,关于Doris 入门:基本操作(三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!