Flink异步io关联Hbase

这篇具有很好参考价值的文章主要介绍了Flink异步io关联Hbase。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

主程序

    public static void main(String[] args) throws Exception {
        //1.获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        //设置动态参数
        ParameterTool propertiesargs = ParameterTool.fromArgs(args);
        String fileName = propertiesargs.get("CephConfPath");
        //从hdfs获取动态参数配置文件
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        FileSystem fs = FileSystem.get(URI.create(fileName), conf);
        fs.open(new org.apache.hadoop.fs.Path(fileName));
        ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(fs.open(new org.apache.hadoop.fs.Path(fileName)).getWrappedStream());
        // 注册给环境变量(HBASE使用)
        env.getConfig().setGlobalJobParameters(propertiesFile);
        new CephConfig(propertiesFile);

        //2.设置CK&状态后端
        env.setStateBackend(new FsStateBackend(FSSTATEBACKEND));
        env.enableCheckpointing(10000);// 每 ** ms 开始一次 checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次
        env.getCheckpointConfig().setCheckpointTimeout(100000);// Checkpoint 必须在** ms内完成,否则就会被抛弃
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);// 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** ms
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s

        //3.从kafka中读取日志信息,将将每行数据转换为JavaBean对象 主流
        DataStreamSource<String> dataStream = env.addSource(KafkaUtils.getKafkaSource(KAFKA_SOURCE_TOPIC, KAFKA_SOURCE_GROUP));
        …………
        //8.读取HBase中user表,进行维度关联
        SingleOutputStreamOperator<CephAccessRecord> record = AsyncDataStream.unorderedWait(
                validDS,
                new DimAsyncFunction<CephAccessRecord>() {
                    @Override
                    public String getKey(CephAccessRecord record) {
                        return record.access_key;
                    }
                },
                60, TimeUnit.SECONDS);
        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path(HDFS_FILE_PATH),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.DAYS.toMillis(1))//至少包含 20 分钟的数据
                                .withInactivityInterval(TimeUnit.DAYS.toMillis(1 ))//最近 20 分钟没有收到新的数据
                                .withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB
                                .build())
                .withBucketAssigner(assigner)
                .build();

        // 将record-->过滤上传数据-->转换成jsonstring-->写入到hdfs
//        allDataDS.filter(log->log.event_type.equals("upload")).map(line->JSON.toJSONString(line)).addSink(fileSink);
        dataStream.map(line->JSON.toJSONString(line)).addSink(fileSink);

        //10.流环境执行
        env.execute();

异步关联程序

package com.data.ceph.function;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;

import java.util.Collections;
import java.util.Map;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimAsyncJoinFunction<T> {

    private org.apache.hadoop.hbase.client.Connection connection = null;
    private ResultScanner rs = null;
    private Table table = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //不启用安全认证
        System.setProperty("zookeeper.sasl.client", "false");
        Map<String, String> stringStringMap = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
        String hbase = stringStringMap.get("hbase_zookeeper_quorum");
        org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
        hconf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.23.37,172.16.23.38,172.16.23.39");
//        hconf.set(HConstants.ZOOKEEPER_QUORUM, hbase);
        hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
        hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");

        //指定用户名为hbase的用户去访问hbase服务
        UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hive");
        connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));
        table = connection.getTable(TableName.valueOf("cloud:user_info"));
    }


    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
        Get get = new Get(Bytes.toBytes(getKey(input)));
        Result rs = table.get(get);
        for (Cell cell : rs.rawCells()) {
            String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
            BeanUtils.setProperty(input, column, value);
        }
        resultFuture.complete(Collections.singletonList(input));
    }
    @Override
    public void close() throws Exception {
        if (rs != null) rs.close();
        if (table != null) table.close();
        if (connection != null) connection.close();
    }
    @Override
    public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
        System.out.println("TimeOut:" + input);
    }
}

文章来源地址https://www.toymoban.com/news/detail-840649.html

到了这里,关于Flink异步io关联Hbase的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建 下载 https://archive.apache.org/dist/  Mysql下载地址 Index of /MySQL/Downloads/ 我最终选择 Zookeeper3.7.1 +Hadoop3.3.5 + Spark-3.2.4 + Flink-1.16.1 + Kafka2.12-3.4.0 + HBase2.4.17 + Hive3.1.3  +JDK1.8.0_391  IP规划 IP hostname 192.168.1.5 node1 192.168.1.6 node

    2024年01月23日
    浏览(49)
  • Flink异步IO

    本文讲解 Flink 用于访问外部数据存储的异步 I/O API。对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。 本文代码gitee地址: https://gitee.com/ddxygq/BigDataTechnical/blob/main/Flink/src/main/java/operator/AsyncIODemo.java 在与外部系统交互(用数据库中

    2024年02月02日
    浏览(38)
  • Flink异步IO初步了解

            之前使用Flink查询Redis数据的过程中,由于对数据一致性的要求并不是很高,当时是用MapFunction +  State 的方案。先缓存一大堆数据到State中,达到一定数量之后,将批量Key提交到Redis中进行查询。         由于Redis性能极高,所以并没有出现什么问题,后来了解到了

    2024年02月03日
    浏览(42)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

    目标 : 实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows版本安装手册.docx》安装FineBI 配置连接 数据准备 小结 实现FineBI访问MySQL结果数据集的配置 目标 : 实现FineBI实时报表构建 路径 step1:实时报表构建 step2:实时报表配置 step3:实时刷新测试 实施 实

    2024年02月04日
    浏览(39)
  • Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

    书接上文 【Flink实时数仓】需求一:用户属性维表处理-Flink CDC 连接 MySQL 至 Hbase 实验及报错分析http://t.csdn.cn/bk96r 我隔了一天跑Hbase中的数据,发现kafka报错,但是kafka在这个代码段中并没有使用,原因就是我在今天的其他项目中添加的kafka依赖导致了冲突。 注释掉kafka依赖,

    2024年02月04日
    浏览(49)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆级超详细含图文)

    说明: 本篇将详细介绍用二进制安装包部署hadoop等组件,注意事项,各组件的使用,常用的一些命令,以及在部署中遇到的问题解决思路等等,都将详细介绍。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系统版本 1.2.2内存建议最少4g、2cpu、50G以上的磁盘容量 本次

    2024年02月12日
    浏览(50)
  • Linux多虚拟机集群化配置详解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    前面安装的软件,都是以单机模式运行的,学习大数据相关的软件部署,后续安装软件服务,大多数都是以集群化(多台服务器共同工作)模式运行的。所以,需要完成集群化环境的前置准备,包括创建多台虚拟机,配置主机名映射,SSH免密登录等等。 我们可以使用VMware提供

    2024年02月04日
    浏览(50)
  • 【flink番外篇】14、Flink异步I/O访问外部数据示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月16日
    浏览(45)
  • 55、Flink之用于外部数据访问的异步 I/O介绍及示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(44)
  • 轻松通关Flink第19讲:Flink 如何做维表关联

    在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单收货人所在省的名称,一般来说订单中会记录一个省的 ID,那么需要根据 ID 去查询外部的维度表补充省名称属性。 在

    2024年02月13日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包