【Flink】【ClickHouse】写入流式数据到ClickHouse

这篇具有很好参考价值的文章主要介绍了【Flink】【ClickHouse】写入流式数据到ClickHouse。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析

Flink 1.13.2, ClickHouse 22.1.3.7

1、安装ClickHouse(MacOS)

这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可(已经安装了docker的可以忽略

brew install --cask --appdir=/Applications docker

【Flink】【ClickHouse】写入流式数据到ClickHouse

1.1 启动docker 

四指进入Application ,双击Docker 图表安装,一直点击下一步即可

【Flink】【ClickHouse】写入流式数据到ClickHouse

1.2 拉取ClickHouse 镜像

--客户端
docker pull yandex/clickhouse-client

【Flink】【ClickHouse】写入流式数据到ClickHouse

--服务端
docker pull yandex/clickhouse-server

【Flink】【ClickHouse】写入流式数据到ClickHouse

 

1.3 启动ClickHouse Server

docker run -d --name ch-server --ulimit nofile=262144:262144 -p 8123:8123 -p 9000:9000 -p 9009:9009 yandex/clickhouse-server

1.4 查询是否启动成功

docker ps

【Flink】【ClickHouse】写入流式数据到ClickHouse

1.4.1 打开web-ui 页面

http://127.0.0.1:8123/play

【Flink】【ClickHouse】写入流式数据到ClickHouse

 1.5 成功安装好了ClickHouse

 想看更多的安装方式可以查看官网安装 | ClickHouse Docs

1.6 拓展:Client登陆方式

有些同学想等过clickhouse-client登陆,也是可以的,不过就是稍微有点麻烦,建议只是测试使用的时候用web-ui就能满足,想通过clickhouse-client登陆呢,就必须进去到镜像

1.6.1 获取镜像id

-- 获取镜像ID
docker ps

【Flink】【ClickHouse】写入流式数据到ClickHouse 

1.6.2 进入镜像

docker exec -it a7d127a4f91b /bin/bash

【Flink】【ClickHouse】写入流式数据到ClickHouse

 1.6.3 使用client 登陆(默认没有密码)

bin/clickhouse-client

【Flink】【ClickHouse】写入流式数据到ClickHouse

 【Flink】【ClickHouse】写入流式数据到ClickHouse

2.  编码

2.1 在clickhouse中创建表

CREATE TABLE IF NOT EXISTS default.t_user(id UInt16,
name String,
age UInt16 ) ENGINE = TinyLog();

【Flink】【ClickHouse】写入流式数据到ClickHouse

【Flink】【ClickHouse】写入流式数据到ClickHouse  

2.2 引入flink-sink-clickhouse jar包

        <!-- clickhouse -->
		<dependency>
			<groupId>ru.ivi.opensource</groupId>
			<artifactId>flink-clickhouse-sink</artifactId>
			<version>1.3.3</version>
        <dependency>

2.3 读取流式数据

DataStream<String> inputStream = env.socketTextStream("localhost", 18888);

2.4 Transform 部分

//transform
        SingleOutputStreamOperator<String> userStream = sourceStream
                .map(v -> new UserTest(v))
                .setParallelism(1)
                .name("convert_user_map")
                .map(v -> UserTest.convertToCsv(v))
                .setParallelism(1)
                .name("convert_csv_map");
public class UserTest {
    public int id;
    public String name;
    public int age;

    public UserTest(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public UserTest(String v) {
        String[] split = v.split(",");
        this.id = Integer.valueOf(split[0]);
        this.name = split[1];
        this.age = Integer.valueOf(split[2]);
    }

    public UserTest of(int id, String name, int age) {
        return new UserTest(id, name, age);
    }
 
    // 构造Value 部分(1,'李四',20)
    public static String convertToCsv(UserTest user) {
        StringBuilder sb = new StringBuilder("(");

        // add user.id
        sb.append(user.id);
        sb.append(", ");
 
        // add user.name
        sb.append("'");
        sb.append(String.valueOf(user.name));
        sb.append("', ");
 
        // add user.age
        sb.append(user.age);

        sb.append(" )");
        return sb.toString();
    }
}

2.5 sink 

// create props for sink
        Properties props = new Properties();
        props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "default.t_user");
        props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
        ClickHouseSink sink = new ClickHouseSink(props);
        dataStream.addSink(sink);
        dataStream.print();

2.6 测试

2.6.1 启动18888端口

nc -l 18888

【Flink】【ClickHouse】写入流式数据到ClickHouse 

2.6.2 启动程序

【Flink】【ClickHouse】写入流式数据到ClickHouse

2.6.3 输入测试数据

1,lisi,20
5,wangwu,30
6,zz,100

【Flink】【ClickHouse】写入流式数据到ClickHouse

 【Flink】【ClickHouse】写入流式数据到ClickHouse

 2.6.4 查看数据是否成功写入

select * from default.t_user;

【Flink】【ClickHouse】写入流式数据到ClickHouse 【Flink】【ClickHouse】写入流式数据到ClickHouse

 发现所有数据都成功写入到ClickHouse中了,WellDone!!!

3.拓展

3.1 完整的代码(包括SQL,测试数据)

都上传到github上了

(如果可以,请点一下star⭐️,谢谢支持):
Github:https://github.com/BiGsuw/flink-learning/blob/main/src/main/java/com/flink/demo/sink/ClickHouseTest.java文章来源地址https://www.toymoban.com/news/detail-436676.html

到了这里,关于【Flink】【ClickHouse】写入流式数据到ClickHouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(27)
  • Spark写入kafka(批数据和流式)

    写入kafka基础 kafka写入策略 写入kafka应答响应级别

    2024年01月25日
    浏览(42)
  • 大数据Flink(五十):流式计算简介

    文章目录 流式计算简介 一、数据的时效性 二、流式计算和批量计算

    2024年02月15日
    浏览(32)
  • Flink + MySQL 流式计算数据分析

    作者:禅与计算机程序设计艺术 大数据时代,海量的数据源源不断涌入到互联网、移动应用、企业数据库等各个领域,同时这些数据也逐渐成为各种业务场景中的主要输入数据。如何在短时间内对海量数据进行处理、分析并得出有价值的信息,已经成为当今社会越来越关注的

    2024年02月06日
    浏览(35)
  • Flink的流式数据处理与时间序列分析

    Apache Flink 是一个流处理框架,用于实时数据处理和分析。它支持大规模数据流处理,具有高吞吐量和低延迟。Flink 可以处理各种数据源和数据接收器,如 Kafka、HDFS、TCP 流等。 时间序列分析是一种用于分析时间序列数据的方法,用于发现数据中的趋势、季节性和随机性。时间

    2024年02月21日
    浏览(33)
  • 【Apache-Flink零基础入门】「入门到精通系列」手把手+零基础带你玩转大数据流式处理引擎Flink(基础概念解析+有状态的流式处理)

    Apache Flink 是业界公认的最佳流计算引擎之一,它不仅仅局限于流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎。Flink 的用户只需根据业务逻辑开发一套代码,就能够处理全量数据、增量数据和实时数据,无需针对不同的数据类型开发不同的方案。这使得

    2024年02月03日
    浏览(57)
  • node笔记_写文件(异步写入,同步写入,追加写入,流式写入)

    大家好,我是yma16,本期分享node写文件。

    2024年02月06日
    浏览(30)
  • 【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同

    Flink版本: 本文主要是基于Flink1.14.4 版本 导言: Apache Flink 作为流式处理领域的先锋,为实时数据处理提供了强大而灵活的解决方案。其中,KafkaSink 是 Flink 生态系统中的关键组件之一,扮演着将 Flink 处理的数据可靠地发送到 Kafka 主题的角色。本文将深入探讨 KafkaSink 的工作

    2024年02月20日
    浏览(51)
  • Flink将数据写入MySQL(JDBC)

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。 2.1 版本说明 2.2 导入相关依赖 2.3 连接数据库,创建表 2.4 创建POJO类 2.5 自定义map函数 2.5 Flink2MySQL 2.6 启动necat、Flink,观察数据库写

    2024年02月07日
    浏览(30)
  • 6.2、Flink数据写入到Kafka

    目录 1、添加POM依赖 2、API使用说明 3、序列化器 3.1 使用预定义的序列化器 3.2 使用自定义的序列化器 4、容错保证级别 4.1 至少一次 的配置 4.2 精确一次 的配置 5、这是一个完整的入门案例 Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    2024年02月09日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包