轻松通关Flink第34讲:Flink 和 Redis 整合以及 Redis Sink 实现

这篇具有很好参考价值的文章主要介绍了轻松通关Flink第34讲:Flink 和 Redis 整合以及 Redis Sink 实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

上一课时我们使用了 3 种方法进行了 PV 和 UV 的计算,分别是全窗口内存统计、使用分组和过期数据剔除、使用 BitMap / 布隆过滤器。到此为止我们已经讲了从数据清洗到水印、窗口设计,PV 和 UV 的计算,接下来需要把结果写入不同的目标库供前端查询使用。

下面我们分别讲解 Flink 和 Redis/MySQL/HBase 是如何整合实现 Flink Sink 的。

Flink Redis Sink

我们在第 27 课时,详细讲解过 Flink 使用 Redis 作为 Sink 的设计和实现,分别使用自定义 Redis Sink、开源的 Redis Connector 实现了写入 Redis。

在这里我们直接使用开源的 Redis 实现,首先新增 Maven 依赖如下:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-redis_2.11</artifactId> 
    <version>1.1.5</version> 
</dependency> 

可以通过实现 RedisMapper 来自定义 Redis Sink,在这里我们使用 Redis 中的 HASH 作为存储结构,Redis 中的 HASH 相当于 Java 语言里面的 HashMap:

public class MyRedisSink implements RedisMapper<Tuple3<String,String, Integer>>{ 
    /** 
     * 设置redis数据类型 
     */ 
    @Override 
    public RedisCommandDescription getCommandDescription() { 
        return new RedisCommandDescription(RedisCommand.HSET,"flink_pv_uv"); 
    } 
    //指定key 
    @Override 
    public String getKeyFromData(Tuple3<String, String, Integer> data) { 
        return data.f1; 
    } 
    //指定value 
    @Override 
    public String getValueFromData(Tuple3<String, String, Integer> data) { 
        return data.f2.toString(); 
    } 
} 

上面实现了 RedisMapper 并覆写了其中的 getCommandDescription、getKeyFromData、getValueFromData 3 种方法,其中 getCommandDescription 定义了存储到 Redis 中的数据格式。这里我们定义的 RedisCommand 为 HSET,使用 Redis 中的 HASH 作为数据结构;getKeyFromData 定义了 HASH 的 Key;getValueFromData 定义了 HASH 的值。

然后我们直接调用 addSink 函数即可:

... 
userClickSingleOutputStreamOperator 
            .keyBy(new KeySelector<UserClick, String>() { 
                @Override 
                public String getKey(UserClick value) throws Exception { 
                    return value.getUserId(); 
                } 
            }) 
            .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) 
            .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20))) 
            .evictor(TimeEvictor.of(Time.seconds(0), true)) 
            .process(new MyProcessWindowFunction()) 
            .addSink(new RedisSink<>(conf,new MyRedisSink())); 
... 

接下来讲解 Flink 和 MySQL 是如何整合实现 Flink Sink 的?

Flink MySQL Sink

Flink 在最新版本 1.11 中支持了新的 JDBC Connector,我们可以直接在 Maven 中新增依赖:

<dependency> 
  <groupId>org.apache.flink</groupId> 
  <artifactId>flink-connector-jdbc_2.11</artifactId> 
  <version>1.11.0</version> 
</dependency> 

可以直接使用 JdbcSink 如下:

String driverClass = "com.mysql.jdbc.Driver"; 
String dbUrl = "jdbc:mysql://127.0.0.1:3306/test"; 
String userNmae = "root"; 
String passWord = "123456"; 

userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return value.getUserId();
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFunction())
.addSink(
JdbcSink.sink(
“replace into pvuv_result (type,value) values (?,?)”,
(ps, value) -> {
ps.setString(1, value.f1);
ps.setInt(2,value.f2);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(dbUrl)
.withDriverName(driverClass)
.withUsername(userNmae)
.withPassword(passWord)
.build())
);

JDBC Sink 可以保证 "at-least-once" 语义保障,可通过实现“有则更新、无则写入”来实现写入 MySQL 的幂等性来实现 "exactly-once" 语义。

当然我们也可以自定义 MySQL Sink,直接继承 RichSinkFunction :

public class MyMysqlSink extends RichSinkFunction<Person> { 
    private PreparedStatement ps = null; 
    private Connection connection = null; 
    String driver = "com.mysql.jdbc.Driver"; 
    String url = "jdbc:mysql://127.0.0.1:3306/test"; 
    String username = "root"; 
    String password = "123456"; 
    // 初始化方法 
    @Override 
    public void open(Configuration parameters) throws Exception { 
        super.open(parameters); 
        // 获取连接 
        connection = getConn(); 
        connection.setAutoCommit(false); 
    } 
    private Connection getConn() { 
        try { 
            Class.forName(driver); 
            connection = DriverManager.getConnection(url, username, password); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
        return connection; 
    } 
    //每一个元素的插入,都会调用一次 
    @Override 
    public void invoke(Tuple3<String,String,Integer> data, Context context) throws Exception { 
        ps.prepareStatement("replace into pvuv_result (type,value) values (?,?)") 
        ps.setString(1,data.f1); 
        ps.setInt(2,data.f2); 
        ps.execute(); 
        connection.commit(); 
    } 
    @Override 
    public void close() throws Exception { 
        super.close(); 
        if(connection != null){ 
            connection.close(); 
        } 
        if (ps != null){ 
            ps.close(); 
        } 
    } 
} 

我们通过重写 open、invoke、close 方法,数据写入 MySQL 时会首先调用 open 方法新建连接,然后调用 invoke 方法写入 MySQL,最后执行 close 方法关闭当前连接。

最后来讲讲 Flink 和 HBase 是如何整合实现 Flink Sink 的?

Flink HBase Sink

HBase 也是我们经常使用的存储系统之一。

HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文“Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统(File System)所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。HBase 是 Apache 的 Hadoop 项目的子项目。HBase 不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库;另一个不同的是 HBase 基于列的而不是基于行的模式。

如果你对 HBase 不了解,可以参考官网给出的 快速入门。

Flink 没有提供直接连接 HBase 的连接器,我们通过继承 RichSinkFunction 来实现 HBase Sink。

首先,我们在 Maven 中新增以下依赖:

<dependency> 
    <groupId>org.apache.hbase</groupId> 
    <artifactId>hbase-client</artifactId> 
    <version>1.2.6.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-common</artifactId> 
    <version>2.7.5</version> 
</dependency> 

接下来通过继承 RichSinkFunction 覆写其中的 open、invoke、close 方法。代码如下:

public class MyHbaseSink extends RichSinkFunction<Tuple3<String, String, Integer>> { 
    private transient Connection connection; 
    @Override 
    public void open(Configuration parameters) throws Exception { 
        super.open(parameters); 
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); 
        conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:2181"); 
        connection = ConnectionFactory.createConnection(conf); 
    } 
    @Override 
    public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception { 
        String tableName = "database:pvuv_result"; 
        String family = "f"; 
        Table table = connection.getTable(TableName.valueOf(tableName)); 
        Put put = new Put(value.f0.getBytes()); 
        put.addColumn(Bytes.toBytes(family),Bytes.toBytes(value.f1),Bytes.toBytes(value.f2)); 
        table.put(put); 
        table.close(); 
    } 
    @Override 
    public void close() throws Exception { 
        super.close(); 
        connection.close(); 
    } 
} 

因为我们的程序是每 20 秒计算一次,并且输出,所以在写入 HBase 时没有使用批量方式。在实际的业务中,如果你的输出写入 HBase 频繁,那么推荐使用批量提交的方式。我们只需要稍微修改一下代码实现即可:

public class MyHbaseSink extends RichSinkFunction<Tuple3<String, String, Integer>> { 
    private transient Connection connection; 
    private transient List<Put> puts = new ArrayList<>(100); 
    @Override 
    public void open(Configuration parameters) throws Exception { 
        super.open(parameters); 
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); 
        conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:2181"); 
        connection = ConnectionFactory.createConnection(conf); 
    } 
    @Override 
    public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception { 
        String tableName = "database:pvuv_result"; 
        String family = "f"; 
        Table table = connection.getTable(TableName.valueOf(tableName)); 
        Put put = new Put(value.f0.getBytes()); 
        put.addColumn(Bytes.toBytes(family),Bytes.toBytes(value.f1),Bytes.toBytes(value.f2)); 
        puts.add(put); 
        if(puts.size() == 100){ 
            table.put(puts); 
            puts.clear(); 
        } 
        table.close(); 
    } 
    @Override 
    public void close() throws Exception { 
        super.close(); 
        connection.close(); 
    } 
} 

我们定义了一个容量为 100 的 List,每 100 条数据批量提交一次,可以大大提高写入效率。

总结

这节课我们学习了 Flink 计算 PV、UV后的结果分别写入 Redis、MySQL 和 HBase。我们在实际业务中可以选择使用不同的目标库,你可以在本文中找到对应的实现根据实际情况进行修改来使用。


精选评论

**民:

老师,您好。请问代码中的MyHbaseSink当puts链表中有缓存数据,但应用程序异常退出了,请问对业务是否有影响呢,puts中的数据会自动重算吗(假如设置了CheckPoint)?

    讲师回复:

    有影响。内存中的数据会丢失,需要重置消费位点。这也是实时计算很大的痛点,需要你在下游做好幂等,不要因为再次消费导致数据不一致。文章来源地址https://www.toymoban.com/news/detail-718516.html

到了这里,关于轻松通关Flink第34讲:Flink 和 Redis 整合以及 Redis Sink 实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(50)
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月01日
    浏览(46)
  • Flink Data Sink

    本专栏案例代码和数据集链接:   https://download.csdn.net/download/shangjg03/88477960 在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提

    2024年02月08日
    浏览(35)
  • Flink - sink算子

    水善利万物而不争,处众人之所恶,故几于道💦   1. Kafka_Sink   2. Kafka_Sink - 自定义序列化器   3. Redis_Sink_String   4. Redis_Sink_list   5. Redis_Sink_set   6. Redis_Sink_hash   7. 有界流数据写入到ES   8. 无界流数据写入到ES   9. 自定义sink - mysql_Sink   10. Jdbc_Sink 官方

    2024年02月14日
    浏览(49)
  • Flink之JDBC Sink

    这里介绍一下Flink Sink中jdbc sink的使用方法,以 mysql 为例,这里代码分为两种,事务和非事务 非事务代码 事务代码 pom依赖 结果 jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

    2024年02月14日
    浏览(37)
  • Flink之Kafka Sink

    代码内容 结果数据

    2024年02月15日
    浏览(44)
  • flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

    前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的! Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王

    2024年02月21日
    浏览(43)
  • flink 13.5 sink elasticsearch-7

    mysql 数据-- flink sql --es mysql flink elasticsearch 5.7.20-log 13.5 7.12.0 官网可以下载包 flink-sql-connector-elasticsearch7_2.11-1.13.6.jar https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

    2024年02月14日
    浏览(40)
  • Flink创建Hudi的Sink动态表

    工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。 1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“

    2024年02月07日
    浏览(36)
  • Flink Table/Sql自定义Kudu Sink实战(其它Sink可参考)

    使用第三方的org.apache.bahir » flink-connector-kudu,batch模式写入数据到Kudu会有FlushMode相关问题 具体可以参考我的这篇博客通过Flink SQL操作创建Kudu表,并读写Kudu表数据 Flink的Dynamic table能够统一处理batch和streaming 实现自定义Source或Sink有两种方式: 通过对已有的connector进行拓展。比

    2024年02月14日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包