【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

这篇具有很好参考价值的文章主要介绍了【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

🚀 作者 :“大数据小禅”

🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


Flink怎么操作Redis

  • Flink怎么操作redis?

    • 方式一:自定义sink
    • 方式二:使用connector
  • Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

    • getCommandDescription 选择对应的数据结构和key名称配置
    • getKeyFromData 获取key
    • getValueFromData 获取value
  • 使用

    • 添加依赖
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    
  • 编码

    public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
        }
    
        @Override
        public String getKeyFromData(Tuple2<String, Integer> value) {
            return value.f0;
        }
    
        @Override
        public String getValueFromData(Tuple2<String, Integer> value) {
            return value.f1.toString();
        }
    }
    

Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

  • Redis环境说明 redis6

    • 使用docker部署redis6.x 看个人主页docker相关文章

      docker run -d  -p 6379:6379 redis
      
  • 编码实战

数据源

public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {


    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("小滴课堂面试专题第一季");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }


    /**
     * run 方法调用前 用于初始化连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }


    /**
     * 产生数据的逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());

            ctx.collect(videoOrder);
        }


    }

    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {

        flag = false;
    }
}

保存的格式与存取的方法

public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {


    /***
     * 选择需要用到的命令,和key名称
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
    }

    /**
     * 获取对应的key或者filed
     *
     * @param data
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {

        System.out.println("getKeyFromData=" + data.f0);
        return data.f0;
    }

    /**
     * 获取对应的值
     *
     * @param data
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        System.out.println("getValueFromData=" + data.f1.toString());
        return data.f1.toString();
    }
}

落地

public class Flink07RedisSinkApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //数据源 source
//        DataStream<VideoOrder> ds = env.fromElements(
//                new VideoOrder("21312","java",32,5,new Date()),
//                new VideoOrder("314","java",32,5,new Date()),
//                new VideoOrder("542","springboot",32,5,new Date()),
//                new VideoOrder("42","redis",32,5,new Date()),
//                new VideoOrder("4252","java",32,5,new Date()),
//                new VideoOrder("42","springboot",32,5,new Date()),
//                new VideoOrder("554232","flink",32,5,new Date()),
//                new VideoOrder("23323","java",32,5,new Date())
//        );
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());



        //transformation
       DataStream<Tuple2<String,Integer>> mapDS =  ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
                return new Tuple2<>(value.getTitle(),1);
            }
        });



//        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
//            @Override
//            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                out.collect(new Tuple2<>(value.getTitle(),1));
//            }
//        });


       //分组
        KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //统计每组有多少个
        DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);

        //控制台打印
        sumDS.print();

        //单机redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();

        sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));


        //DataStream需要调用execute,可以取个名称
        env.execute("custom redis sink job");
    }

}

【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X,Flink,flink,大数据,原力计划文章来源地址https://www.toymoban.com/news/detail-742374.html

到了这里,关于【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python数据分析之产品销量时序分析与商品关联分析

    这是我们之前的课后作业,根据自己的想法对这个数据进行分析,只要求写出五个点出来就可以了,因此我就对这些数据进行了分析一番。涉及的python知识点还是挺多的,包括了python连接数据库,SQL提取数据并保存为csv格式,pandas处理数据,matplotlib画图以及购物篮分析与关联

    2024年02月07日
    浏览(47)
  • 淘宝商品API使用示例:如何通过调用外部API来获取淘宝商品价格销量主图详情数据

    淘宝上的商品信息量非常之大,商品的详情信息也很齐全。如何通过调用外部API来实现批量获取商品价格销量主图详情等信息呢?上周刚好完成了一个完整的淘宝商品采集项目,今天特来分享一下。 接口名称:item_get 请求地址:https://api-test.cn/taobao/item_get result_type:[json,jso

    2024年02月10日
    浏览(45)
  • 电商数据平台西域根据ID取商品详情API接口采集产品详情数据、价格 、销量数据操作指南

    公共参数 请求地址: 注册调用key请求接入 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中) secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址中)[item_search,item_get,item_search_shop等] cache String 否 [yes,no]默认yes,将调用缓存的数据,速度比较

    2024年02月07日
    浏览(42)
  • 快手商品详情商品价格、销量、库存、sku信息

         商品详情数据API是用来获取快手商品详情页数据的接口,请求参数为商品ID,这是每个商品唯一性的标识。返回参数有商品标题、商品标题、商品简介、价格、掌柜昵称、库存、宝贝链接、宝贝图片、商品SKU等 公共参数 请求地址: 获取key和密钥 名称 类型 必须 描述 k

    2024年01月23日
    浏览(47)
  • 淘宝APP商品详情接口(商品信息,价格销量,优惠券信息,详情图等)

    淘宝APP商品详情接口(商品信息接口,价格销量接口,优惠券信息接口,详情图接口等)代码对接如下: 公共参数 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中),点击获取请key和secret secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址

    2024年02月12日
    浏览(47)
  • 淘宝/天猫获取商品销量详情 API 返回值说明

    taobao.item_get_sales 公共参数 名称 类型 必须 描述 key String 是 调用key(必须以GET方式拼接在URL中) secret String 是 调用密钥 api_name String 是 API接口名称(包括在请求地址中)[item_search,item_get,item_search_shop等] cache String 否 [yes,no]默认yes,将调用缓存的数据,速度比较快 result_type St

    2024年02月09日
    浏览(47)
  • item_get_sales-获取商品销量详情

    一、接口参数说明: item_get_sales-获取商品销量详情,点击更多API调试,请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名称 类型 必须 描述 key String 是 调用key(点击获取测试key和secret) secret String 是 调用密钥 api_name String

    2024年02月13日
    浏览(39)
  • python 爬虫某东网商品信息 | 没想到销量最高的是

    哈喽大家好,我是咸鱼 好久没更新 python 爬虫相关的文章了,今天我们使用 selenium 模块来简单写个爬虫程序——爬取某东网商品信息 网址链接:https://www.jd.com/ 完整源码在文章最后 我们需要找到网页上元素的位置信息(xpth 路径) 我们首先需要知道搜索框和搜索按钮的位置

    2024年02月08日
    浏览(41)
  • 爬虫之牛刀小试(十):爬取某宝手机商品的销量,价格和店铺

    首先淘宝需要登录,这一点如果用selenium如何解决,只能手动登录?如果不用selenium,用cookies登录也可。但是验证码又是一个问题,现在的验证码五花八门,难以处理。 我们回到正题,假设你已经登录上淘宝了,接着我们需要找到输入框和搜索按钮,输入“手机”,点击搜索

    2024年04月10日
    浏览(60)
  • 电商API接口的应用||大数据电商数仓分析项目||电商热门商品统计

    如何定义热门商品? 简单模型:直接通过用户对商品的点击量来衡量商品热度。 复杂模型:依据各类别权重(后续补充) 如何获取区域? 通过用户点击日志,获取访问IP,进而获取区域信息。 通过数据库中的订单关联用户表,获取用户的地域信息 如何去除爬虫水军(商家

    2024年04月28日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包