Doris系列之导入Kafka数据操作

这篇具有很好参考价值的文章主要介绍了Doris系列之导入Kafka数据操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Doris系列

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天和大家分享一下Doris系列之导入Kafka数据操作
#博学谷IT学习技术支持#



前言

Doris系列之导入Kafka数据操作

接着上次的Doris系列继续和大家分享,上次讲了Doris 建表操作,和从Broker Load导入hdfs数据操作,今天和大家分享从Routine Load导入kafka数据操作。
Doris系列之导入Kafka数据操作
如上图,Client 向 FE 提交一个例行导入作业。
FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。
在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。


一、Kafka集群使用步骤

Doris系列之导入Kafka数据操作
Kafka也是Doris一个非常重要的数据来源。

1.启动kafka集群环境

这里根据自己的路径启动kafka集群环境

cd /export/servers/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

2.创建kafka的topic主题

这里创建一个topic名字是test的kafka消息队列,设置1个partitions ,并且只备份1份数据。

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 1 \
--partitions 1 \
--topic test

如果Topic已经存在,则可以删除

bin/kafka-topics.sh  --delete --zookeeper node01:2181  --topic test 

3.往kafka中插入一批测试数据

这里简单做个小案例,插入2条数据。

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
{"id":1,"name":"zhangsan","age":20}
{"id":2,"name":"lisi","age":30}

二、Doris使用步骤

1.创建对应表

这里根据自己kafka生成的数据创建对应字段和格式的表格

create table student_kafka2
(
id int,
name varchar(50),
age int
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;

2.创建导入作业

  • student_kafka2为第一步创建的表格名称
  • desired_concurrent_number是并行度相关的参数
  • strict_mode是否采用严格模式
  • format为导入的格式,这里是json
CREATE ROUTINE LOAD test_db.kafka_job_new on student_kafka2
PROPERTIES
(
    "desired_concurrent_number"="1",
	"strict_mode"="false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list"= "node01:9092,node02:9092,node03:9092",
    "kafka_topic" = "test",
    "property.group.id" = "test_group_1",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING",
    "property.enable.auto.commit" = "false"
);
select * from student_kafka2;

Doris系列之导入Kafka数据操作

三、Doris常用的参数

设置删除时是否允许不分区直接删除

  • SET delete_without_partition = true;

设置最大内存限制

  • SET exec_mem_limit = 8589934592;
  • SHOW VARIABLES LIKE “%mem_limit%”;

设置最长查询时间限制

  • SET query_timeout = 600;
  • SHOW VARIABLES LIKE “%query_timeout%”;

添加新的含预聚合的列

  • ALTER TABLE table1 ADD COLUMN uv BIGINT SUM DEFAULT ‘0’ after pv;

Broadcast/Shuffle Join 操作,默认为Broadcast

  • select sum(table1.pv) from table1 join [broadcast] table2 where
    table1.siteid = 12;
  • select sum(table1.pv) from table1 join [shuffle] table2 where
    table1.siteid = 12;

总结

今天主要和大家分享了Doris系列之导入Kafka数据操作,如果大家实际工作中需要用到Kafka结合Doris操作,可以参考一下使用步骤。文章来源地址https://www.toymoban.com/news/detail-475446.html

到了这里,关于Doris系列之导入Kafka数据操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Doris (三十三):Doris 数据导入(十一)Routine Load 2- 导入案例及注意事项

    目录 1. 导入Kafka数据到Doris 2. 严格模式导入Kafka数据到Doris 3. kafka 简单json格式数据导入到Doris

    2024年02月16日
    浏览(46)
  • Apache Doris (二十三) :Doris 数据导入(一)Insert Into

    目录 1. 语法及参数 2. 案例 ​​​​3. 注意事项 3.1. 关于插入数据量

    2024年02月13日
    浏览(40)
  • Doris系列之建表操作

    注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。 今天和大家分享一下Doris系列之

    2023年04月09日
    浏览(24)
  • Apache Doris (二十八):Doris 数据导入(六)Spark Load 1- 原理及配置

    目录 1. 基本原理  2. Spark集群搭建 2.1 Spark Standalone 集群搭建 2.2 Spark On Yarn 配置

    2024年02月16日
    浏览(33)
  • Doris(7):数据导入(Load)之Routine Load

    例行导入功能为用户提供了义中自动从指定数据源进行数据导入的功能 1 适用场景 当前仅支持kafka系统进行例行导入。 2 使用限制 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。 支持的消息格式为 csv 文本格式。每一个 message 为一行,且行尾不包含换行符。 仅

    2023年04月24日
    浏览(34)
  • Doris(6):数据导入(Load)之Stream Load

    Broker load是一个同步的导入方式,用户通过发送HTTP协议将本地文件或者数据流导入到Doris中,Stream Load同步执行导入并返回结果,用户可以通过返回判断导入是否成功。 1 适用场景 Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。 2 基本原理 下图展示了

    2023年04月19日
    浏览(37)
  • Doris1.1.1多种异构数据源数据导入方案

            Apache Doris 是一个基于 MPP 架构的高性能、实时的分析型数据库,以极速易用的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持高吞吐的复杂分析场景。基于此,Apache Doris 能够较好的满足报表

    2024年02月03日
    浏览(37)
  • Apache Doris 数据导入:Insert Into语句;Binlog Load;Broker Load;HDFS Load;Spark Load;例行导入(Routine Load)

    Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。Doris支持各种各样的数据导入方式:Insert Into、json格式数据导入、Binlog Load、Broker Load、Routine Load、Spark Load、Stream Load、S3 Load,下面分别进行介绍。 注意: Doris 中的所有导入操作都有原子性保

    2024年02月21日
    浏览(44)
  • Doris的数据模型和增删改查操作

    了解Doris的朋友都知道,Doris是一个MPP的分析型数据库。可以支持大数据下的实时分析。 说到数据分析,不得不提的是Doris的数据模型。 目前Doris支持三种数据模型,分别是: Aggregate Model(聚合模型) Uniq Model(唯一模型) Duplicate Model(冗余模型) Aggregate Model(聚合模型)

    2024年02月04日
    浏览(22)
  • spark导入doris的几种方式

    本文主要介绍通过spark导入doris的3种方式。 jdbc 方式需要引入mysql-connector-java的依赖 代码demo 注意: 一定要添加?rewriteBatchedStatements=true参数,不然导入速度会很慢。 Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。 代码库地址:h

    2024年02月11日
    浏览(67)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包