目录
思路
代码
1.创建通用基础模块
2.数据生产模块
3.上传至Linux生成数据
思路
操作
思路
代码
执行
-
通话记录数据分析
- 项目背景
通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。
-
- 项目架构
-
-
- 整体架构
-
文章来源:https://www.toymoban.com/news/detail-700874.html
-
-
- 数据分析流程
-
-
-
- 数据展示流程
-
-
-
- 消费模型
-
-
- 项目实现
系统环境:
表1
系统 |
版本 |
windows |
10 专业版 |
linux |
CentOS 6.8 |
开发工具:
表2
工具 |
版本 |
idea |
2017.2.5旗舰版 |
maven |
3.3.9 |
JDK |
1.8+ |
提示:idea2017.2.5必须使用maven3.3.9,不要使用maven3.5,有部分兼容性问题
集群环境:
表3
框架 |
版本 |
hadoop |
2.7.2 |
zookeeper |
3.4.10 |
hbase |
1.3.1 |
flume |
1.7.0 |
kafka |
2.11-0.11.0.0 |
硬件环境:
表4
hadoop102 |
hadoop103 |
hadoop104 |
|
内存 |
4G |
2G |
2G |
CPU |
2核 |
1核 |
1核 |
硬盘 |
50G |
50G |
50G |
-
-
- 数据生产
-
此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。
-
-
-
- 数据结构
-
-
我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。
表5
列名 |
解释 |
举例 |
call1 |
第一个手机号码 |
15369468720 |
call1_name |
第一个手机号码人姓名(非必须) |
李雁 |
call2 |
第二个手机号码 |
19920860202 |
call2_name |
第二个手机号码人姓名(非必须) |
卫艺 |
date_time |
建立通话的时间 |
20171017081520 |
date_time_ts |
建立通话的时间(时间戳形式) |
|
duration |
通话持续时间(秒) |
0600 |
-
-
-
- 编写代码
-
-
思路
1.创建Java集合类存放模拟的电话号码和联系人;
2.随机选取两个手机号码当作“主叫”与“被叫”(注意判断两个手机号不能重复),产出call1与call2字段数据;
3.创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据;
4.随机一个通话时长,单位:秒,产出duration字段数据;
5.将产出的一条数据拼接封装到一个字符串中;
6.使用IO操作将产出的一条通话数据写入到本地文件中;
代码
首先新建一个maven项目 qingtaishuju-project-ct 作为整个工程的父项目,因为所有的业务不可能通过一个项目来实现,会有很多模块,比如有生产、消费、统计、展示等等。
1.创建通用基础模块
新建ct-common,创建好之后导入项目所需要的依赖,如下所示。
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
依赖导入成功之后,接下来创建相应的包、接口和类,搭建基础的环境,模块结构如下
将对应的包复制到对应的
2.数据生产模块
1. 通用模块创建好之后,新建数据生产模块:ct-producer
模块包结构如下:
2. 打开Bootstrap类,修改代码如下
3. 将cantact.log复制到Datas目录下(需要新建Datas目录)
4. 运行Bootstrap的main方法
5. 此时在Datas目录下自动生成call.log文件,并持续写入数据(点击可停止运行)
3.上传至Linux生成数据
1. 将主函数的输入输出路径写成动态的
2.打jar包(注意将程序打包成运行包,不是依赖包)
3.将数据包和jar包上传至Linux
运行jar包
-
-
- 数据采集/消费(存储)
-
欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。
flume:cloudera公司研发
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与Hadoop生态圈对接的操作。
kafka:linkedin公司研发
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作(支持replication);
因此我们常用的一种模型是:
线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
-
-
-
- 数据采集
-
-
思路
1. 配置kafka,启动zookeeper和kafka集群;
2. 创建kafka主题;
3. 启动kafka控制台消费者(此消费者只用于测试使用);
4. 配置flume,监控日志文件;
5. 启动flume监控任务;
6. 观察测试。
操作
启动zookeeper,kafka集群
zkServer.sh start kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties |
创建kafka主题
kafka-topics.sh --zookeeper master:2181 --topic calllog --create --replication-factor 1 --partitions 3 启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #建议使用这种方式,不需要启动多个窗口 启动kafka bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh -daemon config/server.properties #建议使用这种方式,不需要启动多个窗口 |
检查一下是否创建主题成功:
kafka-topics.sh --zookeeper master:2181 --list
|
启动kafka控制台消费者,等待flume信息的输入
kafka-console-consumer.sh --bootstrap-server master:9092 -topic calllog --from-beginning |
配置flume-env.sh
export JAVA_HOME=/home/software/jdk1.8 export HADOOP_HOME=/home/software/hadoop |
解决版本冲突 flume-ng
配置flume(flume-kafka.conf)
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/open/call.log # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave01:9092,slave02:9092 a1.sinks.k1.kafka.topic = calllog a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
启动flume
flume-ng agent --conf $FLUME_HOME/conf --name a1 --conf-file $FLUME_HOME/conf/flume-kafka.conf -Dflume.root.logger=INFO,console 在flume所在的文件夹即安装目录
|
观察kafka控制台消费者是否成功显示产生的数据
-
-
-
- 数据消费
-
-
如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中。
思路
- 编写kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;
- 既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;
- 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中。
代码
创建新的module项目:ct_consumer
模块目录如下
执行
1.虚拟机执行jar包生产数据
java -jar ct-producer.jar contact.log call.log
2.flume采集
bin/flume-ng agent -c conf/ -n a1 -f /opt/open-bigdata-project/flume-2-kafka.conf
3.启动Hadoop(否则无法启动hbase)
4.启动虚拟机HBASE
./start-hbase.sh
4.idea消费者消费
消费完之后查看HBASE数据库
-
-
-
- 数据查询方式一
-
-
使用scan查看HBase中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时长的数据。进行该单元测试前,需要先运行数据采集任务,确保HBase中已有数据存在。
新建工具过滤器工具类:HBaseFilterUtil
新建单元测试类:HBaseScanTest1
运行单元测试
-
-
-
- 数据消费测试
-
-
项目成功后,则将项目打包后在linux中运行测试。
1) 打包HBase消费者代码
-
-
- 数据分析
-
我们的数据已经完整的采集到了HBase集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个mapreduce-job的方式,如果情景允许,我们会采取一个mapreduce分析多个业务指标的方式来进行任务。具体何时采用哪种方式,我们后续会详细探讨。
分析模块流程如图所示:
业务指标:
a) 用户每天主叫通话个数统计,通话时间统计。
b) 用户每月通话记录统计,通话时间统计。
c) 用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系)
-
-
-
- 需求分析
-
-
根据需求目标,设计出下述(3.2.2)表结构。我们需要按照时间范围(年月日),结合MapReduce统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。
思路:
a) 维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计2017年所有月份所有日子的通话记录,那这个维度我们大概可以表述为2017年*月*日
b) 通过Mapper将数据按照不同维度聚合给Reducer
c) 通过Reducer拿到按照各个维度聚合过来的数据,进行汇总,输出
d) 根据业务需求,将Reducer的输出通过Outputformat把数据
数据输入:HBase
数据输出:Mysql
HBase中数据源结构:
标签 |
举例&说明 |
rowkey |
hashregion_call1_datetime_call2_flag_duration 01_15837312345_20170527081033_13766889900_1_0180 |
family |
f1列族:存放主叫信息 f2列族:存放被叫信息 |
call1 |
第一个手机号码 |
call2 |
第二个手机号码 |
date_time |
通话建立的时间,例如:20171017081520 |
date_time_ts |
date_time对应的时间戳形式 |
duration |
通话时长(单位:秒) |
flag |
标记call1是主叫还是被叫(call1的身份与call2的身份互斥) |
a) 已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现;
b) 根据目标数据结构,构建Mysql表结构,建表;
c) 思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。
d) 描述数据,一定是基于某个维度(视角)的,所以构建维度类。比如按照“年”与“手机号码”的组合作为key聚合所有的数据,便可以统计这个手机号码,这一年的相关结果。
e) 自定义OutputFormat用于对接Mysql,使数据输出。
f) 创建相关工具类。
-
-
-
- MySQL数据表结构设计
-
-
我们将分析的结果数据保存到Mysql中,以方便Web端进行查询展示。
1) :db_telecom.tb_contacts
用于存放用户手机号码与联系人姓名。
列 |
备注 |
类型 |
id |
自增主键 |
int(11) NOT NULL |
telephone |
手机号码 |
varchar(255) NOT NULL |
name |
联系人姓名 |
varchar(255) NOT NULL |
2) :db_telecom.tb_call
用于存放某个时间维度下通话次数与通话时长的总和。
列 |
备注 |
类型 |
id_date_contact |
复合主键(联系人维度id,时间维度id) |
varchar(255) NOT NULL |
id_date_dimension |
时间维度id |
int(11) NOT NULL |
id_contact |
查询人的电话号码 |
int(11) NOT NULL |
call_sum |
通话次数总和 |
int(11) NOT NULL DEFAULT 0 |
call_duration_sum |
通话时长总和 |
int(11) NOT NULL DEFAULT 0 |
3) :db_telecom.tb_dimension_date
用于存放时间维度的相关数据
列 |
备注 |
类型 |
id |
自增主键 |
int(11) NOT NULL |
year |
年,当前通话信息所在年 |
int(11) NOT NULL |
month |
月,当前通话信息所在月,如果按照年来统计信息,则month为-1。 |
int(11) NOT NULL |
day |
日,当前通话信息所在日,如果是按照月来统计信息,则day为-1。 |
int(11) NOT NULL |
4):db_telecom.tb_intimacy
用于存放所有用户用户关系的结果数据。
列 |
备注 |
类型 |
id |
自增主键 |
int(11) NOT NULL |
intimacy_rank |
好友亲密度排名 |
int(11) NOT NULL |
id_contact1 |
联系人1,当前所查询人 |
int(11) NOT NULL |
id_contact2 |
联系人2,与联系人为好友 |
int(11) NOT NULL |
call_count |
两联系人通话次数 |
int(11) NOT NULL DEFAULT 0 |
call_duration_count |
两联系人通话持续时间 |
int(11) NOT NULL DEFAULT 0 |
-
-
-
- 环境准备
-
-
1) 新建module:ct_analysis
2) 创建包结构,如下图
3) 类表
类名 |
备注 |
CountDurationMapper |
数据分析的Mapper类,继承自TableMapper |
CountDurationReducer |
数据分析的Reducer类,继承自Reduccer |
CountDurationRunner |
数据分析的驱动类,组装Job |
MySQLOutputFormat |
自定义Outputformat,对接Mysql |
BaseDimension |
维度(key)基类 |
BaseValue |
值(value)基类 |
ComDimension |
时间维度+联系人维度的组合维度 |
ContactDimension |
联系人维度 |
DateDimension |
时间维度 |
CountDurationValue |
通话次数与通话时长的封装 |
JDBCUtil |
连接Mysql的工具类 |
JDBCCacheBean |
单例JDBCConnection |
IConverter |
转化接口,用于根据传入的维度对象,得到该维度对象对应的数据库主键id |
DimensionConverter |
IConverter实现类,负责实际的维度转id功能 |
LRUCache |
用于缓存已知的维度id,减少对mysql的操作次数,提高效率 |
Constants |
常量类 |
-
-
-
- 需求实现
-
-
将代码导入对应位置
-
-
-
- 运行测试
-
-
- 将mysql驱动包放入到hadoop根目录的$HADOOP_HOME/share/hadoop/common/目录下
- 将打包好的程序上传到master中
3) 提交任务
cd 程序包目录 $ HADOOP_HOME/yarn jar ct_analysis-1.0-SNAPSHOT.jar |
观察Mysql中的结果。
-
-
- 数据展示
-
-
-
-
- 环境准备
-
-
- 新建module或项目:ct_web
项目结构如下:
pom.xml配置文件:
2) 创建包结构
bean |
||
contants |
||
controller |
||
dao |
||
entries |
3) 类表
类名 |
备注 |
CallLog |
用于封装数据分析结果的JavaBean |
Contact |
用于封装联系人的JavaBean |
Contants |
常量类 |
CallLogHandler |
用于处理请求的Controller |
CallLogDAO |
查询某人某个维度通话记录的DAO |
ContactDAO |
查询联系人的DAO |
QueryInfo |
用于封装向服务器发来的请求参数 |
4) web目录结构,web部分的根目录:webapp
文件夹名 |
备注 |
css |
存放css静态资源的文件夹 |
html |
存放html静态资源的文件夹 |
images |
存放图片静态资源文件夹 |
js |
存放js静态资源的文件夹 |
jsp |
存放jsp页面的文件夹 |
WEB-INF |
存放web相关配置的文件夹 |
5) resources目录下创建spring相关配置文件
6) WEB-INF目录下创建web相关配置
7) 拷贝js框架到js目录下
-
-
-
- 编写代码
-
-
思路:
a) 首先测试数据通顺以及完整性,写一个联系人的测试用例。
b) 测试通过后,通过输入手机号码以及时间参数,查询指定维度的数据,并以图表展示。
代码:将代码复制到对应包中
-
-
-
- 最终预览
-
-
查询人通话时长与通话次数统计大概如下所示:
折线图如图所示:
柱状图如图所示:
统一展示如图所示:
-
- 项目总结
重新总结梳理整个项目流程和方法论。文章来源地址https://www.toymoban.com/news/detail-700874.html
到了这里,关于开源大数据案例(第1章 通话记录数据分析)思路,操作,及执行ct-common的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!