【大数据学习篇11】广告点击流实时统计

这篇具有很好参考价值的文章主要介绍了【大数据学习篇11】广告点击流实时统计。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

学习目标/Target

掌握广告点击流实时统计实现思路

掌握利用Kafka生产用户广告点击流数据

了解数据库设计

掌握如何创建Spark Streaming连接

掌握利用Spark Streaming读取业务数据

掌握利用Spark读取黑名单用户

掌握利用Spark Streaming过滤黑名单用户

掌握利用Spark Streaming统计每个城市不同广告的点击次数

掌握利用Spark Streaming添加黑名单用户

掌握将数据持久化到HBase数据库

熟悉如何利用HBase Shell命令向HBase数据库的表中添加数据

概述

        电商网站通常会存在一些广告位,当用户浏览网站时投放的广告内容会在对应广告位显示。此时,有些用户可能会点击广告跳转到对应界面去查看详情,从而提升用户在网站的浏览深度和购买概率,针对这种用户广告点击行为的实时数据进行实时计算和统计,可以帮助公司实时地掌握各种广告的投放效果,以便于后续能够及时地对广告投放相关的策略进行调整和优化,以期望通过广告的投放获取更高的收益。

目录

学习目标/Target

概述

1.数据集分析

2.实现思路分析

3.数据库设计

4. 实现广告点击流实时统计

5.运行程序


1.数据集分析

        本需求采用Java程序模拟生成用户广告点击数据,通过Kafka的生产者发布用户广告点击数据形成实时数据流,数据流中的每一条数据代表一个用户点击广告的行为,当Kafka生产者程序运行时会产出源源不断的用户广告点击流数据。

1596006895171,16,6,tianjin

        单条用户广告点击数据包含四个字段内容,依次分别是时间戳(time)、用户ID(userid)、广告ID(adid)和城市(city)。

2.实现思路分析

        通过Kafka实时生产用户广告点击流数据,SparkStreaming作为消费者实时读取Kafka生产的数据,与HBase数据库中黑名单用户表的数据进行合并,并过滤包含黑名单用户的数据。对过滤后的数据进行两次聚合操作,第一次聚合统计每个广告在不用城市的点击次数。第二次聚合统计用户出现的次数,用于将广告点击次数超过100的用户添加到黑名单用户中。

【大数据学习篇11】广告点击流实时统计

读取:读取Kafka实时生产用户广告点击流数据。

转换:将数据格式转换为以userid为Key,adid和city作为一个整体为Value的数据形式。

合并/过滤:将转换后的数据与读取的黑名单用户数据进行合并,并过滤包含黑名单用户的数据。

转换/聚合:将数据格式转换为以adid和city作为一个整体为Key,数值1作为Value的数据形式。

转换/聚合:将数据格式转换为以userid作为Key,值1作为Value的数据形式,然后进行聚合操作统计每个用户出现的次数。

读取:读取HBase数据库中黑名单用户

添加:将用户出现次数超过100的用户添加到HBase数据库中的黑名单用户中。

3.数据库设计

        读取HBase数据库中黑名单用户 将转换后的数据与读取的黑名单用户数据进行合并。

【大数据学习篇11】广告点击流实时统计

 数据表adstream:存储用户广告点击流实时统计结果。

 数据表blacklist:存储黑名单用户。

STEP  01

打开HBase命令行工具:

        打开虚拟机启动大数据集群环境(此时可以不启动使用远程连接工具SecureCRT连接虚拟机Spark01,执行“hbase shell”命令进入HBase的命令行工具。

STEP  02

创建表blacklist:

        通过HBase的命令行工具创建表blacklist并指定列族为black_user,用于存储黑名单用户数据。

 create 'blacklist','black_user'

STEP  03

插入黑名单用户:

        通过HBase的命令行工具在表blacklist的列族black_user下插入黑名单用户,指定uerid为33、44和55的用户为黑名单用户。

STEP  04

创建表adstream:

        通过HBase的命令行工具创建表adstream并指定列族为area_ads_count,用于存储用户广告点击流实时统计结果。

create 'adstream','area_ads_count'

4. 实现广告点击流实时统计

4.1  修改pom.xml文件

【大数据学习篇11】广告点击流实时统计在项目SparkProject的pom.xml文件中添加Spark Streaming、Hadoop和Spark Streaming整合Kafka依赖。

<dependency>

    <groupId>org.apache.hadoop</groupId>

    <artifactId>hadoop-common</artifactId>

    <version>2.7.4</version>

</dependency>

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming_2.11</artifactId>

    <version>2.3.2</version>

</dependency>

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

    <version>2.3.2</version>

</dependency>

 4.2  生产用户广告点击流数据

STEP  01

实现Kafka生产者:

        在项目SparkProject的java目录新建Package包“cn.itcast.streaming”,用于存放广告点击流实时统计的Java文件,并在该包中创建Java类文件MockRealTime,用于实现Kafka生产者,生产用户广告点击流数据。

STEP  02

启动Kafka消费者:

        打开虚拟机启动大数据集群环境(包括Kafka),使用远程连接工具SecureCRT连接虚拟机Spark01,进入Kafka安装目录(/export/servers/kafka_2.11-2.0.0)启动Kafka消费者。

bin/kafka-console-consumer.sh \

--bootstrap-server spark01:9092,spark02:9092,spark03:9092 \

--topic ad

STEP  03

启动Kafka生产者:
 
       在项目SparkProject的包“cn.itcast.streaming”中选中文件MockRealTime.java并单击右键,在弹出的菜单栏选择“Run. MockRealTime.main()”运行Kafka生产者程序,生产用户广告点击流数据。
STEP  04

查看Kafka消费者:

        在虚拟机Spark01的Kafka消费者窗口查看数据是否被成功接收。

【大数据学习篇11】广告点击流实时统计

STEP  05 

关闭Kafka消费者:

        在虚拟机Spark01的Kafka消费者窗口通过组合键“Ctrl+C”关闭当前消费者。

STEP  06

关闭Kafka生产者:

        在IntelliJ IDEA控制台中单击红色方框的按钮关闭Kafka生产者程序,关闭Kafka生产者程序。

【大数据学习篇11】广告点击流实时统计

 4.3  创建Spark Streaming连接

        在项目SparkProject的包“cn.itcast.streaming”中创建Java类文件AdsRealTime.java,用于实现广告点击流实时统计。

public class AdsRealTime {

    public static void main(String[] arg) throws IOException,

            InterruptedException {

         //实现Spark Streaming程序

    }

}

        在类AdsRealTime的main()方法中,创建JavaStreamingContext对象和SparkConf对象,JavaStreamingContext对象用于实现Spark Streaming程序,SparkConf对象用于配置Spark Streaming程序各种参数。

System.setProperty("HADOOP_USER_NAME","root");

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("stream_ad"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); jsc.checkpoint("hdfs://192.168.121.133:9000/checkpoint");

4.4  读取用户广告点击流数据

在类AdsRealTime的main()方法中,指定Kafka消费者的相关配置信息。

final Collection<String> topics = Arrays.asList("ad"); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers","spark01:9092,spark02:9092,spark03:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "adstream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", true);

        在类AdsRealTime的main()方法中,使用类KafkaUtils的createDirectStream()方法从Kafka生产者读取用户广告点击流数据,并加载到userAdStream。

JavaInputDStream<ConsumerRecord<String, String>> userAdStream = KafkaUtils.createDirectStream(

         jsc,LocationStrategies.PreferConsistent(),

         ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)

    );

4.5  获取业务数据

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换userAdStream中每一行数据生,获取用户广告点击流数据中的userid(用户ID)、adid(广告ID)和city(城市),将转化结果加载到userClickAdsStream。

JavaPairDStream<String,Tuple2<String,String>> userClickAdsStream =         userAdStream.mapToPair((PairFunction<ConsumerRecord<String, String>,                         String,Tuple2<String, String>>) record -> {

    String[] value = record.value().split(",");

    String userid =value[1];

    String adid =value[2];

    String city =value[3];

    return new Tuple2<>(userid,new Tuple2<>(adid,city));

});

4.6  读取黑名单用户数据

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换userAdStream中每一行数据生,获取用户广告点击流数据中的userid(用户ID)、adid(广告ID)和city(城市),将转化结果加载到userClickAdsStream。

JavaPairDStream<String,Tuple2<String,String>> userClickAdsStream =         userAdStream.mapToPair((PairFunction<ConsumerRecord<String, String>,                         String,Tuple2<String, String>>) record -> {

    String[] value = record.value().split(",");

    String userid =value[1];

    String adid =value[2];

    String city =value[3];

    return new Tuple2<>(userid,new Tuple2<>(adid,city));

});

        在HBase数据库操作工具类HbaseUtils中添加方法scan(),用于获取HBase数据库中指定表的全部数据。

public static ResultScanner scan(String tableName)

        throws IOException {

    Table table = HbaseConnect.getConnection().getTable(TableName.valueOf(tableName));

    Scan scan = new Scan();

    return table.getScanner(scan);

}

        在类AdsRealTime中添加方法getBlackUser(),用于获取HBase数据库中黑名单用户表的数据。

public static ArrayList getBlackUser() throws IOException {

    ResultScanner blcakResult = HbaseUtils.scan("blacklist");

    Iterator<Result> blackIterator = blcakResult.iterator();

    ArrayList<Tuple2<String,String>> blackList = new ArrayList<>();

    while (blackIterator.hasNext()){

        String blackUserId = new String(blackIterator.next().value());

        blackList.add(new Tuple2<>(blackUserId,"black"));

    }

    return blackList;

}

        在类AdsRealTime的main()方法中,使用parallelizePairs()算子将存放黑名单用户的集合转换JavaPairRDD,将转换结果加载到blackUserRDD。

JavaPairRDD<String,String> blackUserRDD =                                 jsc.sparkContext().parallelizePairs(getBlackUser());

4.7  过滤黑名单用户

        在类AdsRealTime的main()方法中,使用transformToPair()算子转换userClickAdsStream的每一行数据,在转换的过程中过滤黑名单用户,将转换结果加载到checkUserClickAdsStream。

JavaPairDStream<String,Tuple2<String,String>> checkUserClickAdsStream = userClickAdsStream.transformToPair( ……

);

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换checkUserClickAdsStream的每一行数据,将用户ID的值替换为1,通过areaAdsStream加载转换结果。

JavaPairDStream<Tuple2<String,String>, Integer> areaAdsStream    

= checkUserClickAdsStream.mapToPair(

            (PairFunction<

                    Tuple2<String, Tuple2<String, String>>,

                    Tuple2<String, String>,

                    Integer>) checkUserClickAdsTuple2 -> {

        String adid = checkUserClickAdsTuple2._2._1;

        String city = checkUserClickAdsTuple2._2._2;

        return new Tuple2<>(new Tuple2<>(city,adid),new Integer(1));

    });

        在类AdsRealTime的main()方法中,使用updateStateByKey()算子维护areaAdsStream的状态,用于统计每个城市不同广告的点击次数,将统计结果加载到countAreaAdsStream。

JavaPairDStream<Tuple2<String,String>, Integer> countAreaAdsStream

        = areaAdsStream.updateStateByKey(                 (Function2<List<Integer>,Optional<Integer>, Optional<Integer>>)

                        (valueList, oldState) -> {

    Integer newState = 0;

    if (oldState.isPresent()){

        newState = oldState.get();

    }

    for (Integer value : valueList){

        newState += value;

    }

    return Optional.of(newState);

});

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换checkUserClickAdsStream的每一行数据,便于后续聚合统计每个用户点击广告的次数,将转换结果加载到userStream。

JavaPairDStream<String,Integer> userStream = checkUserClickAdsStream

    .mapToPair(

        (PairFunction<Tuple2<String, Tuple2<String, String>>,

            String, Integer>) checkUserClickAdsTuple2 ->

                new Tuple2<>(

                    checkUserClickAdsTuple2._1,

                    new Integer(1)));

        在类AdsRealTime的main()方法中,使用updateStateByKey()算子维护userStream的状态,用于统计每个用户点击广告的次数,将统计结果加载到countUserStream。

JavaPairDStream<String, Integer> countUserStream           =userStream.updateStateByKey((Function2<List<Integer>,Optional<Integer>,Optional<Integer>>)

            (valueList, oldState) -> {

    Integer newState = 0;

    if (oldState.isPresent()){

        newState = oldState.get();

    }

    for (Integer value : valueList){

        newState += value;

    }

    return Optional.of(newState);

});

4.9  添加黑名单用户

        在类AdsRealTime的main()方法中,使用foreachRDD()算子遍历countUserStream中的RDD,将广告点击次数超过100的用户添加到HBase数据库的黑名单表blacklist中。

countUserStream.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>)         countUserRDD -> countUserRDD.foreach((VoidFunction<Tuple2<String, Integer>>)                         countUserTuple2 -> {

    if (countUserTuple2._2>100){

        HbaseUtils.putsOneToHBase(

                "blacklist",

                "user"+countUserTuple2._1,

                "black_user",

                "userid",

                countUserTuple2._1);

    }

 }));

4.10  持久化数据

        在类AdsRealTime的main()方法中,使用foreachRDD()算子遍历countAreaAdsStream,将每个城市中不同广告的点击次数持久化到HBase数据库的adstream表。

countAreaAdsStream.foreachRDD((

        VoidFunction<JavaPairRDD<Tuple2<String, String>,Integer>>

        ) countAreaAdsRDD ->countAreaAdsRDD.foreach((VoidFunction<              Tuple2<Tuple2<String, String>, Integer>>)

                 countAreaAdsTuple2 -> {

    String adid = countAreaAdsTuple2._1._2;

    String city = countAreaAdsTuple2._1._1;

    int count = countAreaAdsTuple2._2;     HbaseUtils.putsOneToHBase("adstream",city+"_"+adid,"area_ads_count","area",city);     HbaseUtils.putsOneToHBase("adstream",city+"_"+adid,"area_ads_count","ad",adid);     HbaseUtils.putsOneToHBase("adstream",city+"_"+adid,"area_ads_count","count",String.valueOf(count)); }));

        在类AdsRealTime的main()方法中,添加启动与关闭Spark Streaming连接等方法。

jsc.start();

jsc.awaitTermination();

HbaseConnect.closeConnection();

jsc.close();

5.运行程序

【大数据学习篇11】广告点击流实时统计【大数据学习篇11】广告点击流实时统计

【大数据学习篇11】广告点击流实时统计【大数据学习篇11】广告点击流实时统计

【大数据学习篇11】广告点击流实时统计【大数据学习篇11】广告点击流实时统计

         Kafka生产者程序和用户广告点击流实时统计程序启动成功后,可在IDEA的控制台查看程序运行状态。

【大数据学习篇11】广告点击流实时统计

         使用远程连接工具SecureCRT连接虚拟机Spark01,执行“hbase shell”命令,进入HBase命令行工具,在HBase命令行工具中执行“scan 'adstream'”命令,查看HBase数据库中表adstream的统计结果。

【大数据学习篇11】广告点击流实时统计

小结 

        本文主要讲解了如何通过用户广告点击流数据实现广告点击流实时统计,首先我们对数据集进行分析,使读者了解广告点击流的数据结构。接着通过实现思路分析,使读者了解广告点击流实时统计的实现流程。然后通过IntelliJ IDEA开发工具实现广告点击流实时统计程序并将统计结果实时存储到HBase数据库,使读者掌握运用Java语言编写Spark Streaming、HBase和Kafka生产者程序的能力。最后在IntelliJ IDEA开发工具运行用户广告点击流实时统计程序,使读者了解IntelliJ IDEA开发工具运行程序的方法。

点赞一建三连!文章来源地址https://www.toymoban.com/news/detail-473615.html

到了这里,关于【大数据学习篇11】广告点击流实时统计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【实时数仓】介绍、需求分析、统计架构分析和ods层日志行为数据采集

    普通的实时计算 优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差(如B想要使用A的结果),开发成本随着需求增加直线上升。 实时

    2023年04月23日
    浏览(50)
  • 苍穹外卖day11——数据统计图形报表(Apache ECharts)

    常见图表     快速上手 - Handbook - Apache ECharts   VO设计 对应的映射文件     对应的映射文件     对应的映射文件         对应的映射文件    

    2024年02月14日
    浏览(41)
  • Java项目-苍穹外卖-Day11-Apache ECharts数据统计

    主要是以下四项的统计,以不同形式的图形进行展示 自己去网站上看一哈,我不太懂前端 com.sky.controller.admin.ReportController com.sky.service.impl.ReportServiceImpl.java orderMapper orderMapper.xml Reportcontroller ReportServiceImpl orderMapper.xml reportController ReportServiceImpl orderMapper.xml

    2024年02月09日
    浏览(43)
  • 《尚贤达猎头网站流量统计模块》,通过HTTP自定义模块实时获取asp.net网站访问流量,并保存到数据库

    开发了个网站流量统计模块,实时获取asp.net网站访问流量,并保存到数据库。 一、功能: 通过HTTP自定义模块实时获取网站流量 二、支持平台:windows+IIS 三、安装方法: 1、将文件www.sunsharer.cn.dll复制到网站bin目录下; 2、将配置好的sqlstr.txt复制到网站bin目录下; 3、将数据

    2024年01月16日
    浏览(44)
  • 无涯教程-机器学习 - 数据统计

    在进行机器学习项目时,通常无涯教程会忽略两个最重要的部分,分别是 数学 和 数据 。这是因为知道ML是一种数据驱动的方法,并且ML模型只会产生与提供给它的数据一样好的或坏的输出。 在上一章中,讨论了如何将CSV数据上传到ML项目中,但是最好在上传之前了解数据。

    2024年02月10日
    浏览(42)
  • Python 机器学习入门:数据集、数据类型和统计学

    机器学习是通过研究数据和统计信息使计算机学习的过程。机器学习是迈向人工智能(AI)的一步。机器学习是一个分析数据并学会预测结果的程序。 在计算机的思维中,数据集是任何数据的集合。它可以是从数组到完整数据库的任何东西。 数组的示例: [99,86,87,88,111,86,10

    2024年02月05日
    浏览(46)
  • 【大数据学习篇6】 Spark操作统计分析数据操作

    通过前面的文章安装好环境下面我们就可以开始来操作 使用MySQL的root用户对数据库进行修改以下设置

    2024年02月05日
    浏览(43)
  • Python学习——数据分组统计、分组运算及透视

    分割 split : 按照键值(key)或者分组变量将数据分组 应用 apply : 对每个组应用函数, 通常是累计,转换或过滤函数 组合 combine : 将每一组的结果合并成一个输出组 常用功能 新增加一列 年龄ew 将填充后的年龄补充上去 数据聚合(agg):一般指的是能够从数组产生的标量值的数

    2024年02月10日
    浏览(45)
  • [机器学习、Spark]Spark MLlib实现数据基本统计

    👨‍🎓👨‍🎓博主:发量不足 📑📑本期更新内容: Spark MLlib基本统计 📑📑下篇文章预告:Spark MLlib的分类🔥🔥 简介:耐心,自信来源于你强大的思想和知识基础!!   目录 Spark MLlib基本统计 一.摘要统计 二.相关统计 三.分层抽样   MLlib提供了很多统计方法,包含

    2024年02月02日
    浏览(46)
  • 数理统计的深度学习:探索大数据的潜在能量

    随着数据的不断增长,人工智能技术也随之发展迅速。深度学习技术在处理大规模数据方面表现出色,成为人工智能领域的重要技术之一。数理统计学则是研究数据的概率分布和统计规律的学科。在深度学习中,数理统计学的理论和方法有着重要的应用价值。本文将从深度学

    2024年02月20日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包