大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)

这篇具有很好参考价值的文章主要介绍了大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、需求描述

每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。

二、需求分析

  • 1.统计每个商品的点击量, 开窗
  • 2.分组窗口分组
  • 3.over窗口

三、需求实现

3.1、创建数据源示例

input/UserBehavior.csv

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000

3.2、创建目标表

CREATE DATABASE flink_sql; //创建flink_sql库
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (
  `w_end` timestamp NOT NULL,
  `item_id` bigint(20) NOT NULL,
  `item_count` bigint(20) NOT NULL,
  `rk` bigint(20) NOT NULL,
  PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.3、导入JDBC Connector依赖

<!-- 导入JDBC Connector依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

3.4、代码实现

package com.atguigu.flink.java.chapter_12;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/1/31 9:11
 */
public class Flink01_HotItem_TopN {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        // 使用sql从文件读取数据
        tenv.executeSql(
            "create table user_behavior(" +
                "   user_id bigint, " +
                "   item_id bigint, " +
                "   category_id int, " +
                "   behavior string, " +
                "   ts bigint, " +
                "   event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +
                "   watermark for event_time as  event_time - interval '5' second " +
                ")with(" +
                "   'connector'='filesystem', " +
                "   'path'='input/UserBehavior.csv', " +
                "   'format'='csv')"
        );

        // 每隔 10m 统计一次最近 1h 的热门商品 top

        // 1. 计算每每个窗口内每个商品的点击量
        Table t1 = tenv
            .sqlQuery(
                "select " +
                    "   item_id, " +
                    "   hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +
                    "   count(*) item_count " +
                    "from user_behavior " +
                    "where behavior='pv' " +
                    "group by hop(event_time, interval '10' minute, interval '1' hour), item_id"
            );
        tenv.createTemporaryView("t1", t1);
        // 2. 按照窗口开窗, 对商品点击量进行排名
        Table t2 = tenv.sqlQuery(
            "select " +
                "   *," +
                "   row_number() over(partition by w_end order by item_count desc) rk " +
                "from t1"
        );
        tenv.createTemporaryView("t2", t2);

        // 3. 取 top3
        Table t3 = tenv.sqlQuery(
            "select " +
                "   item_id, w_end, item_count, rk " +
                "from t2 " +
                "where rk<=3"
        );

        // 4. 数据写入到mysql
        // 4.1 创建输出表
        tenv.executeSql("create table hot_item(" +
                            "   item_id bigint, " +
                            "   w_end timestamp(3), " +
                            "   item_count bigint, " +
                            "   rk bigint, " +
                            "   PRIMARY KEY (w_end, rk) NOT ENFORCED)" +
                            "with(" +
                            "   'connector' = 'jdbc', " +
                            "   'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +
                            "   'table-name' = 'hot_item', " +
                            "   'username' = 'root', " +
                            "   'password' = 'aaaaaa' " +
                            ")");
        // 4.2 写入到输出表
        t3.executeInsert("hot_item");
    }
}

执行结果:
大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N),大数据-玩转数据-FLINK,大数据,flink,sql

四、总结

Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。文章来源地址https://www.toymoban.com/news/detail-728667.html

到了这里,关于大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据开发之Spark(累加器、广播变量、Top10热门品类实战)

    累加器:分布式共享只写变量。(executor和executor之间不能读数据) 累加器用来把executor端变量信息聚合到driver端。在driver中定义的一个变量,在executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行合并计算。 1、累加器使用 1)

    2024年01月24日
    浏览(31)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(79)
  • flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    ⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。 ⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。 经过测试 在fl

    2024年02月22日
    浏览(39)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(32)
  • 大数据-玩转数据-Flink 容错机制

    在分布式架构中,当某个节点出现故障,其他节点基本不受影响。在 Flink 中,有一套完整的容错机制,最重要就是检查点(checkpoint)。 在流处理中,我们可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。所以

    2024年02月07日
    浏览(41)
  • 大数据-玩转数据-Flink RedisSink

    具体版本根据实际情况确定 参见大数据-玩转数据-Redis 安装与使用 可以根据要写入的redis的不同数据类型进行调整

    2024年02月13日
    浏览(28)
  • 大数据-玩转数据-Flink营销对账

    在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用

    2024年02月11日
    浏览(27)
  • 大数据-玩转数据-Flink定时器

    基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Long): Unit 会注

    2024年02月10日
    浏览(27)
  • 大数据-玩转数据-Flink恶意登录监控

    对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。 因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就

    2024年02月07日
    浏览(33)
  • 大数据-玩转数据-Flink状态后端(下)

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包