使用clickhouse kafka表引擎消费kafka写入clickhouse

这篇具有很好参考价值的文章主要介绍了使用clickhouse kafka表引擎消费kafka写入clickhouse。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

1:seatunnel 消费kafka数据写入clickhouse


文章目录

系列文章目录

文章目录

前言

1.创建kafka 引擎表

 2.创建clickhouse MergeTree表

3.创建kafka物化视图写入结构表

三、问题

1、修改物化视图

总结


前言

本文使用seatunnel 消费kafka数据写入clickhouse文章的kafka topic以及格式,用另一种方式写入clickhouse,也是练习下clickhouse kafka引擎。

本文默认已安装了kafka与clickhouse,这方面的安装文档很多,这里不做详述;

前提准备 kafka :2.7.0 ; topic: filebeat_**** ;通过filebeat 写入kafka

               clickhouse: 22.6.3.35 。


一、kafka数据格式

topic:filebeat_****

使用kafka 命令查看数据格式:

/bin/kafka-console-consumer.sh --bootstrap-server kafka001:9092,kafka002:9092,kafka003:9092 --topic filebeat_****

{
    "@timestamp": "2022-12-15T09:04:54.870Z", 
    "@metadata": {
        "beat": "filebeat", 
        "type": "_doc", 
        "version": "7.13.1"
    }, 
    "input": {
        "type": "log"
    }, 
    "fields": {
        "fields_under_root": true, 
        "filetype": "heart"
    }, 
    "agent": {
        "type": "filebeat", 
        "version": "7.13.1", 
        "hostname": "hostname", 
        "ephemeral_id": "e2b48c2b-4459-4310-ae6d-396a9494c536", 
        "id": "72eafd44-b71d-452f-bdb5-b986d2a12c15", 
        "name": "hd.n12"
    }, 
    "ecs": {
        "version": "1.8.0"
    }, 
    "host": {
        "name": "hostname"
    }, 
    "log": {
        "offset": 62571676, 
        "file": {
            "path": "/opt/servers/tomcatServers/tomcat-*/t2/webapps/recordlogs/****.log"
        }
    }, 
    "message": "[2022-12-15 17:04:54] [DataLog(10)] [ConsumeMessageThread_3] [INFO]-20000	人名	桌面	四川省	德阳市	旌阳区	0000936861276195024	7380936861276195024	738	104.432322	31.157213	null	中国四川省某某市某某区某某路	110.189.206.*	1	1	1671095093998	1671095094013	\"B2Q15-301\"	00:a5:0a:00:3e:42	-49	中国标准时间	1671095095916	V2.0.2_202109021711"
}

主要是message区域,[2022-12-15 17:04:54] 为日志上传时间,[INFO]- 之后区域 以\t分割,结构按固定的顺序依次

    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `code` String,
    `real_code` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` String,
    `time` String,
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `current_time` String,
    `program_version` String

二、clickhouse

1.创建kafka 引擎表

CREATE TABLE default.kafka_filebeat_hearts
(
    `message` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka001:9092,kafka002:9092,kafka003:9092', 
         kafka_topic_list = 'filebeat_****', 
         kafka_group_name = 'consumer_group_clickhouse_filebeat_***', 
         kafka_format = 'JSONEachRow', 
         kafka_skip_broken_messages = 1, 
         kafka_num_consumers = 2 ;

 2.创建clickhouse MergeTree表

CREATE TABLE default.tab_mt_results
(
    `upload_time` DateTime64(3),
    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `machine_no` String,
    `real_machineno` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` DateTime64(3),
    `time` DateTime64(3),
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `machine_current_time` String,
    `program_version` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(upload_time)
PRIMARY KEY (userid, machine_no)
ORDER BY (userid, machine_no)
TTL toDate(upload_time) + toIntervalMonth(1)
SETTINGS index_granularity = 8192 ;

 以upload_time设置TTL 一个月过期时间。

3.创建kafka物化视图写入结构表

CREATE MATERIALIZED VIEW default.view_consumer_kafka2ck TO default.tab_mt_results
(
    `upload_time` DateTime64(3),
    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `machine_no` String,
    `real_machineno` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` DateTime64(3),
    `time` DateTime64(3),
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `machine_current_time` String,
    `program_version` String
) AS
SELECT
    replaceAll(splitByString(']', splitByString('[INFO]-', message)[1])[1], '[', '') AS upload_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[1] AS userid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[2] AS username,
    splitByString('\t', splitByString('[INFO]-', message)[2])[3] AS app_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[4] AS province_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[5] AS city_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[6] AS district_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[7] AS machine_no,
    splitByString('\t', splitByString('[INFO]-', message)[2])[8] AS real_machineno,
    splitByString('\t', splitByString('[INFO]-', message)[2])[9] AS product_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[10] AS longitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[11] AS latitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[12] AS rd,
    splitByString('\t', splitByString('[INFO]-', message)[2])[13] AS address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[14] AS ip,
    splitByString('\t', splitByString('[INFO]-', message)[2])[15] AS screenlight,
    splitByString('\t', splitByString('[INFO]-', message)[2])[16] AS screenlock,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[17], 'DateTime64') AS hearttime,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[18], 'DateTime64') AS time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[19] AS ssid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[20] AS mac_address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[21] AS rssi,
    splitByString('\t', splitByString('[INFO]-', message)[2])[22] AS timezone,
    splitByString('\t', splitByString('[INFO]-', message)[2])[23] AS machine_current_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[24] AS program_version
FROM hive.kafka_filebeat_hearts;

1、message 过滤掉不符合格式的数据,避免出错;

2、hearttime,time转换为dateTime64格式

三、问题

1、修改物化视图

  第一次创建物化视图后,没有写入结果表,查看clickhouse日志,发现是解析失败导致,解决方式,修改物化视图,并且过滤掉不符合格式的数据;

 DETACH TABLE default.view_consumer_kafka2ck;

 

ATTACH MATERIALIZED VIEW default.view_consumer_kafka2ck TO default.tab_mt_results
(
    `upload_time` DateTime64(3),
    `userid` String,
    `username` String,
    `app_name` String,
    `province_name` String,
    `city_name` String,
    `district_name` String,
    `machine_no` String,
    `real_machineno` String,
    `product_name` String,
    `longitude` String,
    `latitude` String,
    `rd` String,
    `address` String,
    `ip` String,
    `screenlight` String,
    `screenlock` String,
    `hearttime` DateTime64(3),
    `time` DateTime64(3),
    `ssid` String,
    `mac_address` String,
    `rssi` String,
    `timezone` String,
    `machine_current_time` String,
    `program_version` String
) AS
SELECT
    replaceAll(splitByString(']', splitByString('[INFO]-', message)[1])[1], '[', '') AS upload_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[1] AS userid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[2] AS username,
    splitByString('\t', splitByString('[INFO]-', message)[2])[3] AS app_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[4] AS province_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[5] AS city_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[6] AS district_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[7] AS machine_no,
    splitByString('\t', splitByString('[INFO]-', message)[2])[8] AS real_machineno,
    splitByString('\t', splitByString('[INFO]-', message)[2])[9] AS product_name,
    splitByString('\t', splitByString('[INFO]-', message)[2])[10] AS longitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[11] AS latitude,
    splitByString('\t', splitByString('[INFO]-', message)[2])[12] AS rd,
    splitByString('\t', splitByString('[INFO]-', message)[2])[13] AS address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[14] AS ip,
    splitByString('\t', splitByString('[INFO]-', message)[2])[15] AS screenlight,
    splitByString('\t', splitByString('[INFO]-', message)[2])[16] AS screenlock,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[17], 'DateTime64') AS hearttime,
    CAST(splitByString('\t', splitByString('[INFO]-', message)[2])[18], 'DateTime64') AS time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[19] AS ssid,
    splitByString('\t', splitByString('[INFO]-', message)[2])[20] AS mac_address,
    splitByString('\t', splitByString('[INFO]-', message)[2])[21] AS rssi,
    splitByString('\t', splitByString('[INFO]-', message)[2])[22] AS timezone,
    splitByString('\t', splitByString('[INFO]-', message)[2])[23] AS machine_current_time,
    splitByString('\t', splitByString('[INFO]-', message)[2])[24] AS program_version
FROM hive.kafka_filebeat_hearts
WHERE message LIKE '%[INFO]-%' ;

查看结果表,可以看到结果表已经有数据了。


总结

kafka表引擎还是比较好用,主要是在解析message部分,将message拆分成固定的结构,其中常用的splitByString ,visitParamExtractRaw,可以查看函数 | ClickHouse Docs

总之,clickhouse强大的第三方引擎,多看官方文档,必然可以熟练使用。

本人有多年的大数据经验,欢迎各位大咖随时交流学习。文章来源地址https://www.toymoban.com/news/detail-585258.html

到了这里,关于使用clickhouse kafka表引擎消费kafka写入clickhouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Clickhouse分布式表引擎(Distributed)写入核心原理解析

    Clickhouse分布式表引擎(Distributed)写入核心原理解析 Clickhouse分布式表引擎(Distributed)查询核心原理解析 Distributed表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点 ,所以Distributed表引擎需要和其他数

    2023年04月27日
    浏览(48)
  • pylink消费kafka写入ES

    # -*- coding: utf-8 -*- from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction from abc import ABC, abstractmethod from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import MapFunction, RuntimeContext, KeyedProcessFunction fr

    2024年02月08日
    浏览(39)
  • ClickHouse10-ClickHouse中Kafka表引擎

    Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。 Kafka大家肯定不陌生: 它可以用于发布和订阅数据流,是常见的队

    2024年04月25日
    浏览(47)
  • ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

    目录 Kafka表集成引擎 配置 Kerberos 支持 虚拟列 资料分享 系列文章 clickhouse系列文章 此引擎与Apache Kafka结合使用。 Kafka 特性: 发布或者订阅数据流。 容错存储机制。 处理流数据。 老版Kafka集成表引擎参数格式: 新版Kafka集成表引擎参数格式: 必要参数: kafka_broker_list – 以

    2024年02月02日
    浏览(41)
  • Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(44)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(44)
  • 探索ClickHouse——使用MaterializedView存储kafka传递的数据

    在 《探索ClickHouse——连接Kafka和Clickhouse》中,我们讲解了如何使用kafka engin连接kafka,并读取topic中的数据。但是遇到了一个问题,就是数据只能读取一次,即使后面还有新数据发送到该topic,该表也读不出来。 为了解决这个问题,我们引入MaterializedView。 该表结构直接借用了

    2024年02月07日
    浏览(46)
  • Flink写入数据到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依赖 Flink开发相关依赖 3.Bean实体类 User.java 4.ClickHouse业务写入逻辑 ClickHouseSinkFunction.java open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。 invoke():定义了在每个元素到达Sink操

    2024年02月12日
    浏览(54)
  • 【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

    需求描述: 1、数据从 Kafka 写入 Hive。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Flink 集成 Kafka 写入 Hive 需要进行 checkpoint 才能落盘至 HDFS。 5、先在 Hive 中创建表然后动态获取 Hive 的表

    2024年02月03日
    浏览(57)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包