15_基于Flink将pulsar数据写入到ClickHouse

这篇具有很好参考价值的文章主要介绍了15_基于Flink将pulsar数据写入到ClickHouse。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

3.8.基于Flink将数据写入到ClickHouse

编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作

3.8.1.ClickHouse基本介绍

ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。
15_基于Flink将pulsar数据写入到ClickHouse,# Apache Pulsar,pulsar
结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。

3.8.2.ClickHouse安装步骤

本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本

  • 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
  • 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
  • 3-修改配置文件
vim /etc/clickhouse-server/config.xml 
修改178行: 打开这一行的注释 
<listen_host>::</listen_host>

15_基于Flink将pulsar数据写入到ClickHouse,# Apache Pulsar,pulsar

  • 4-启动clickhouse的server
systemctl start clickhouse-server 
停止:
systemctl stop clickhouse-server 
重启
systemctl restart clickhouse-server
  • 5-进入客户端
    15_基于Flink将pulsar数据写入到ClickHouse,# Apache Pulsar,pulsar

3.8.3.在ClickHouse中创建目标表

create database itcast_ck; 
use itcast_ck; 
create table itcast_ck.itcast_ck_ems( 
id int, 
sid varchar(128), 
ip varchar(128), 
create_time varchar(128), 
session_id varchar(128), 
yearInfo varchar(128), 
monthInfo varchar(128), 
dayInfo varchar(128), 
hourInfo varchar(128), 
seo_source varchar(128), 
area varchar(128), 
origin_channel varchar(128), 
msg_count int(128), 
from_url varchar(128), 
PRIMARY KEY (`id`) 
) ENGINE=ReplacingMergeTree();

3.8.4.编写Flink代码完成写入到CK操作

import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;

import java.sql.Types;
import java.util.Properties;

// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {

    public static void main(String[] args) throws Exception {
        //1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2. 添加Source组件, 从Pulsar中读取消息数据
        Properties props = new Properties();
        props.setProperty("topic","persistent://public/default/itcast_ems_tab");
        props.setProperty("partition.discovery.interval-millis","5000");
        FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
                "pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
                JsonDeser.of(PulsarTopicPojo.class),props);
        //2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费
        pulsarSource.setStartFromLatest();

        DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);


        //2.2  转换数据操作: 将 PulsarTopicPojo 转换为ROW对象
        SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {
            @Override
            public Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {

                return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),
                        pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),
                        pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),
                        pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());
            }
        });


        //2.3: 设置sink操作写入到CK操作
        String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

        JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
                .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
                .setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck")
                .setQuery(insertSql)
                .setBatchSize(1)
                .setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR)
                .build();

        tableSink.emitDataStream(rowDataSteam);


        //3. 提交执行
        env.execute("itcast_to_ck");

    }
}

3.9.HBase对接Phoenix实现即席查询

3.9.1.Phoenix安装操作

Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase

整个安装操作, 大家可以参考资料中安装手册, 进行安装即可

3.9.2.在Phoenix中创建表

create view "itcast_h_ems" ( 
"id" integer primary key, 
"f1"."sid" varchar, 
"f1"."ip" varchar, 
"f1"."create_time" varchar, 
"f1"."session_id" varchar, 
"f1"."yearInfo" varchar, 
"f1"."monthInfo" varchar, 
"f1"."dayInfo" varchar, 
"f1"."hourInfo" varchar, 
"f1"."seo_source" varchar, 
"f1"."area" varchar, 
"f1"."origin_channel" varchar, 
"f1"."msg_count" integer, 
"f1"."from_url" varchar 
);

3.9.3.在Phoenix中类型说明

15_基于Flink将pulsar数据写入到ClickHouse,# Apache Pulsar,pulsar文章来源地址https://www.toymoban.com/news/detail-634454.html

到了这里,关于15_基于Flink将pulsar数据写入到ClickHouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache-Pulsar安装操作说明

    Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。 Pulsar 的主要特性如下: 对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。 极低的发布和端到端延迟。 无缝可扩展至超过一百万个主题。 一个简单的客户端 API,具有Java、Go、Python和C++的绑

    2024年04月14日
    浏览(49)
  • Pulsar's Integration with Apache Samza for Stateful Stream Processing

    随着数据的增长和复杂性,流处理技术变得越来越重要。流处理系统允许实时分析大规模的、高速变化的数据流。Apache Pulsar 是一个高性能的分布式消息系统,适用于流处理和批处理。Apache Samza 是一个用于有状态流处理的系统,它可以与 Pulsar 集成,以实现高效的状态流处理

    2024年04月14日
    浏览(46)
  • Apache Pulsar 技术系列 - GEO replication 中订阅状态的同步原理

    Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO Replication)、快速扩容、灵活容错等特性,GEO Replication 可以原生支持数据和订阅状态在多个集群之间进行复制,GEO 目前在 Apache InLong 内部已经有长期稳定的实践,

    2024年02月16日
    浏览(41)
  • 结合云计算的最新技术和现状,介绍云计算基础知识、开源分布式数据库Clickhouse、可视化数据分析工具、分布式链路跟踪系统Pinpoint、数据湖存储系统Pulsar等

    作者:禅与计算机程序设计艺术 2019年,“云计算”将成为“经济全球化”的热门词汇之一,2020年全球云计算市场规模预计达到1万亿美元。中国是继美国、英国之后,成为全球第四大云服务提供商。华为、腾讯、阿里巴巴等互联网巨头纷纷布局云计算领域,各家公司纷纷推出

    2024年02月08日
    浏览(56)
  • 消息队列之六脉神剑:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar对比和如何使用

    消息队列(Message Queue)是一种异步通信机制,它将消息发送者和接收者解耦,从而提高了应用程序的性能、可扩展性和可靠性。在分布式系统中,消息队列经常被用于处理高并发、异步处理、应用解耦等场景。 本篇回答将分析比较常见的六种消息队列:RabbitMQ、Kafka、Active

    2024年02月14日
    浏览(46)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(44)
  • Flink写入数据到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依赖 Flink开发相关依赖 3.Bean实体类 User.java 4.ClickHouse业务写入逻辑 ClickHouseSinkFunction.java open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。 invoke():定义了在每个元素到达Sink操

    2024年02月12日
    浏览(55)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

    需求描述: 1、数据从 Kafka 写入 ClickHouse。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。 5、Kafka 数据为 Json 格式,通过 FlatMap 扁平

    2024年02月03日
    浏览(47)
  • Pulsar的消费模式

    Pulsar 提供了三种消费模式:独立消费者模式、共享订阅模式和发布订阅模式。 1: 独立消费者模式:每个消费者实例都会独立地消费消息,并且每个消息只会被一个消费者消费。这种模式适合于需要完全独立处理消息的场景,例如数据采集和日志处理。 在这个示例中,我们创

    2024年02月11日
    浏览(38)
  • Pulsar-架构与设计

    随着云原生的兴起,对消息中间件的伸缩性和多租户隔离有了更高的要求。现有的消息中间件不支持多租户的隔离,但是有一定伸缩性,需要一定的迁移工具支持和手工操作。 Pulsar是下一代云原生分布式消息平台,采用存储和计算分离架构设计,支持弹性伸缩,支持多租户、

    2024年02月22日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包