14_基于Flink将pulsar数据写入到HBase

这篇具有很好参考价值的文章主要介绍了14_基于Flink将pulsar数据写入到HBase。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

3.7.基于Flink将数据写入到HBase

3.7.1.编写Flink完成数据写入到Hbase操作, 完成数据备份, 便于后续进行即席查询和离线分析

3.7.1.1.HBase基本介绍

hbase是基于Google发布bigTable论文产生一款软件, 是一款noSQL型数据, 不支持SQL. 不支持join的操作, 没有表关系, 不支持事务(多行事务),hbase是基于 HDFS的采用java 语言编写

查询hbase数据一般有三种方案(主键(row key)查询, 主键的范围检索,查询全部数据)

都是以字节类型存储,存储结构化和半结构化数据。

hbase表的特点: 大 面向列的存储方案 稀疏性

2.7.1.2.应用场景

1)需要进行随机读写的操作。
2)数据量比较大。
3)数据比较稀疏。

2.7.1.3.HBase安装操作

本次安装的HBase为2.2.7,详细的安装手册大家可以参考资料, 还需要大家注意,HBase的启动需要依赖于zookeeper
和HDFS的, 顾需要先安装 HADOOP与zookeeper
14_基于Flink将pulsar数据写入到HBase,# Apache Pulsar,pulsar

  • 1-在Hbase中创建目标表
create 'itcast_h_ems, {NAME=>'f1',COMPRESSION=>'GZ'},{NUMREGIONS=>6, SPLITALGO=>'HexStringSplit'}
  • 2- 编写Flink代码完成写入Hbase操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

// 基于Flink消费Pulsar数据, 然后将数据灌入到HBase中, 完成数据备份, 以及后续即席查询和离线分析
public class ItcastFlinkToHBase {

    public static void main(String[] args) throws Exception {

        //1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2. 添加Source组件, 从Pulsar中读取消息数据
        Properties props = new Properties();
        props.setProperty("topic","persistent://public/default/itcast_ems_tab");
        props.setProperty("partition.discovery.interval-millis","5000");
        FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
                "pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
                JsonDeser.of(PulsarTopicPojo.class),props);
        //2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费
        pulsarSource.setStartFromLatest();

        DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);


        //2.2 转换为Flink Table

        Schema schema = Schema.newBuilder()
                .column("id", DataTypes.INT())
                .column("sid", DataTypes.STRING())
                .column("ip", DataTypes.STRING())
                .column("session_id", DataTypes.STRING())
                .column("create_time", DataTypes.STRING())
                .column("yearInfo", DataTypes.STRING())
                .column("monthInfo", DataTypes.STRING())
                .column("dayInfo", DataTypes.STRING())
                .column("hourInfo", DataTypes.STRING())
                .column("seo_source", DataTypes.STRING())
                .column("area", DataTypes.STRING())
                .column("origin_channel", DataTypes.STRING())
                .column("msg_count", DataTypes.INT())
                .column("from_url", DataTypes.STRING())
                .build();


        tableEnv.createTemporaryView("itcast_ems",dataStreamSource,schema);


        //2.3: 定义HBase的目标表
        String hTable = "create table itcast_h_ems("+
                "rowkey int,"+
                "f1 ROW<sid STRING,ip STRING,session_id STRING,create_time STRING,yearInfo STRING,monthInfo STRING,dayInfo STRING,hourInfo STRING,seo_source STRING,area STRING,origin_channel STRING,msg_count INT,from_url STRING>,"+
                "primary key(rowkey) NOT ENFORCED" +
                ") WITH ("+
                "'connector'='hbase-2.2',"+
                "'table-name'='itcast_h_ems',"+
                "'zookeeper.quorum'='node1:2181,node2:2181,node3:2181'"+
                ")";
        //4. 执行操作
        tableEnv.executeSql(hTable);

        tableEnv.executeSql("insert into itcast_h_ems select id,ROW(sid,ip,session_id,create_time,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) from itcast_ems");

    }

}

PulsarTopicPojo文章来源地址https://www.toymoban.com/news/detail-635199.html

public class PulsarTopicPojo {
    private Integer id;
    private String sid;
    private String ip;
    private String session_id;
    private String create_time;
    private String yearInfo;
    private String monthInfo;
    private String dayInfo;
    private String hourInfo;
    private String seo_source;
    private String area;
    private String origin_channel;
    private Integer msg_count;
    private  String from_url;

    public PulsarTopicPojo() {
    }

    public PulsarTopicPojo(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {
        this.id = id;
        this.sid = sid;
        this.ip = ip;
        this.session_id = session_id;
        this.create_time = create_time;
        this.yearInfo = yearInfo;
        this.monthInfo = monthInfo;
        this.dayInfo = dayInfo;
        this.hourInfo = hourInfo;
        this.seo_source = seo_source;
        this.area = area;
        this.origin_channel = origin_channel;
        this.msg_count = msg_count;
        this.from_url = from_url;
    }

    public void setData(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {
        this.id = id;
        this.sid = sid;
        this.ip = ip;
        this.session_id = session_id;
        this.create_time = create_time;
        this.yearInfo = yearInfo;
        this.monthInfo = monthInfo;
        this.dayInfo = dayInfo;
        this.hourInfo = hourInfo;
        this.seo_source = seo_source;
        this.area = area;
        this.origin_channel = origin_channel;
        this.msg_count = msg_count;
        this.from_url = from_url;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getSid() {
        return sid;
    }

    public void setSid(String sid) {
        this.sid = sid;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getSession_id() {
        return session_id;
    }

    public void setSession_id(String session_id) {
        this.session_id = session_id;
    }
    public String getCreate_time() {
        return create_time;
    }

    public void setCreate_time(String create_time) {
        this.create_time = create_time;
    }
    public String getYearInfo() {
        return yearInfo;
    }

    public void setYearInfo(String yearInfo) {
        this.yearInfo = yearInfo;
    }

    public String getMonthInfo() {
        return monthInfo;
    }

    public void setMonthInfo(String monthInfo) {
        this.monthInfo = monthInfo;
    }

    public String getDayInfo() {
        return dayInfo;
    }

    public void setDayInfo(String dayInfo) {
        this.dayInfo = dayInfo;
    }

    public String getHourInfo() {
        return hourInfo;
    }

    public void setHourInfo(String hourInfo) {
        this.hourInfo = hourInfo;
    }

    public String getSeo_source() {
        return seo_source;
    }

    public void setSeo_source(String seo_source) {
        this.seo_source = seo_source;
    }

    public String getArea() {
        return area;
    }

    public void setArea(String area) {
        this.area = area;
    }

    public String getOrigin_channel() {
        return origin_channel;
    }

    public void setOrigin_channel(String origin_channel) {
        this.origin_channel = origin_channel;
    }

    public Integer getMsg_count() {
        return msg_count;
    }

    public void setMsg_count(Integer msg_count) {
        this.msg_count = msg_count;
    }

    public String getFrom_url() {
        return from_url;
    }

    public void setFrom_url(String from_url) {
        this.from_url = from_url;
    }

    @Override
    public String toString() {
        return "PulsarTopicPojo{" +
                "id=" + id +
                ", sid='" + sid + '\'' +
                ", ip='" + ip + '\'' +
                ", session_id='" + session_id + '\'' +
                ", create_time='" + create_time + '\'' +
                ", yearInfo='" + yearInfo + '\'' +
                ", monthInfo='" + monthInfo + '\'' +
                ", dayInfo='" + dayInfo + '\'' +
                ", hourInfo='" + hourInfo + '\'' +
                ", seo_source='" + seo_source + '\'' +
                ", area='" + area + '\'' +
                ", origin_channel='" + origin_channel + '\'' +
                ", msg_count=" + msg_count +
                ", from_url='" + from_url + '\'' +
                '}';
    }
}

到了这里,关于14_基于Flink将pulsar数据写入到HBase的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache-Pulsar安装操作说明

    Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。 Pulsar 的主要特性如下: 对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。 极低的发布和端到端延迟。 无缝可扩展至超过一百万个主题。 一个简单的客户端 API,具有Java、Go、Python和C++的绑

    2024年04月14日
    浏览(49)
  • Pulsar's Integration with Apache Samza for Stateful Stream Processing

    随着数据的增长和复杂性,流处理技术变得越来越重要。流处理系统允许实时分析大规模的、高速变化的数据流。Apache Pulsar 是一个高性能的分布式消息系统,适用于流处理和批处理。Apache Samza 是一个用于有状态流处理的系统,它可以与 Pulsar 集成,以实现高效的状态流处理

    2024年04月14日
    浏览(46)
  • Apache Pulsar 技术系列 - GEO replication 中订阅状态的同步原理

    Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO Replication)、快速扩容、灵活容错等特性,GEO Replication 可以原生支持数据和订阅状态在多个集群之间进行复制,GEO 目前在 Apache InLong 内部已经有长期稳定的实践,

    2024年02月16日
    浏览(40)
  • 消息队列之六脉神剑:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar对比和如何使用

    消息队列(Message Queue)是一种异步通信机制,它将消息发送者和接收者解耦,从而提高了应用程序的性能、可扩展性和可靠性。在分布式系统中,消息队列经常被用于处理高并发、异步处理、应用解耦等场景。 本篇回答将分析比较常见的六种消息队列:RabbitMQ、Kafka、Active

    2024年02月14日
    浏览(46)
  • Pulsar的消费模式

    Pulsar 提供了三种消费模式:独立消费者模式、共享订阅模式和发布订阅模式。 1: 独立消费者模式:每个消费者实例都会独立地消费消息,并且每个消息只会被一个消费者消费。这种模式适合于需要完全独立处理消息的场景,例如数据采集和日志处理。 在这个示例中,我们创

    2024年02月11日
    浏览(38)
  • Pulsar-架构与设计

    随着云原生的兴起,对消息中间件的伸缩性和多租户隔离有了更高的要求。现有的消息中间件不支持多租户的隔离,但是有一定伸缩性,需要一定的迁移工具支持和手工操作。 Pulsar是下一代云原生分布式消息平台,采用存储和计算分离架构设计,支持弹性伸缩,支持多租户、

    2024年02月22日
    浏览(35)
  • 与ChatGPT浅聊Pulsar

    我 : 艾米丽,谈一谈你对Pulsar的理解? ChatGPT : 当然可以!Apache Pulsar是一款分布式消息中间件,它支持多种消息模式,包括发布/订阅模式、队列模式和流模式。在发布/订阅模式下,消息发布者将消息发布到一个主题中,订阅者可以订阅该主题,并接收到所有发布到该主题

    2023年04月15日
    浏览(39)
  • Pulsar消息发送、消费架构概述

    大家好,我是威哥,《RocketMQ技术内幕》、《RocketMQ实战》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师, 越努力越幸运,唯有坚持不懈 ,与大家共勉。 Pulsar基于发布-订阅模式,消息发送者向主题发送消息,而消

    2024年02月09日
    浏览(41)
  • pulsar集群搭建_亲测成功

    pulsar集群搭建_亲测成功 单机运行请看: Linux MacBook单机部署Pulsar并开启认证功能 集群组成 搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身实例)。这三个集群组件如下: ZooKeeper 集群(3(或多) 个 ZooKeeper 节点组成) bookie 集群(

    2024年02月09日
    浏览(31)
  • HBase Shell操作&Flink写入HBase

    1)进入HBase客户端命令行 2)查看帮助命令 3)查看当前数据库中有哪些表 1)创建表 2)插入数据到表 3)扫描查看表数据 4)查看表结构 5)更新指定字段的数据 6)查看“指定行”或“指定列族:列”的数据 7)统计表数据行数 8)删除数据 9)清空表数据 10)删除表 11)变更

    2024年02月04日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包