第1关:统计共享单车每天的平均使用时间
- 任务描述
-
相关知识
- 如何配置Hbase的MapReduce类
- 如何使用Hbase的MapReduce进行数据分析
- 编程要求
- 测试说明
任务描述
本关任务:使用Hbase
的MapReduce
对已经存在 Hbase 的共享单车运行数据进行分析,统计共享单车每天的平均使用时间,其中共享单车运行数据在Hbase
的t_shared_bicycle
表中(表结构可在编程要求中进行查看)。
相关知识
为了完成本关任务,你需要掌握:
- 如何配置
Hbase
的MapReduce
类; - 如何使用
Hbase
的MapReduce
进行数据分析。
如何配置Hbase
的MapReduce
类
MapReduce
是运行在Job
上的一个并行计算框架,分为Map
节点和Reduce
节点。
Hbase
提供了org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
的initTableMapperJob
和initTableReducerJob
两个方法来完成MapReduce
的配置。
initTableMapperJob 方法:
/**
*在提交TableMap作业之前使用它。 它会适当地设置
* 工作。
*
* @param table要读取的表名。
* @param scan具有列,时间范围等的扫描实例。
* @param mapper要使用的mapper类。
* @param outputKeyClass输出键的类。
* @param outputValueClass输出值的类。
* @param job当前要调整的工作。 确保传递的作业是
*携带所有必要的HBase配置。
* @throws IOException设置细节失败。
*/
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job)
throws IOException
/ **
initTableReducerJob 方法:
/**
*在提交TableReduce作业之前使用它。 它会
*适当设置JobConf。
*
* @param table输出表。
* @param reducer要使用的reducer类。
* @param job当前要调整的工作。
* @throws IOException确定区域计数失败时。
*/
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job)
throws IOException
如何使用Hbase
的MapReduce
进行数据分析
下面我们以统计每个城市的酒店个数的例子来介绍MapReduce
的Map
节点和Reduce
节点:
Map
节点执行类需要继承抽象类TableMapper
,实现其map
方法,结构如下:
public static class MyMapper extends TableMapper<Text, DoubleWritable> {
@Override
protected void map(ImmutableBytesWritable rowKey, Result result, Context context) {
}
}
在**map
方法中可从输入表(原数据表)得到行数据,最后向Reduce
节点输出键值对(key/value)
**。
String cityId = Bytes.toString(result.getValue("cityInfo".getBytes(), "cityId".getBytes()));
DoubleWritable i = new DoubleWritable(1);
context.write(new Text(cityId),i);
下面介绍Reduce
节点,Reduce
节点执行类需要继承抽象类TableReducer
,实现其reduce
方法:
public static class MyTableReducer extends TableReducer<Text, DoubleWritable, ImmutableBytesWritable> {
@Override
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
}
}
在reduce
方法里会接收map
方法里相同key
的集合,最后把结果存到输出到表里。
double sum = 0;
for (DoubleWritable num:values){
sum += num.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn("total_infos".getBytes(),"total".getBytes(),Bytes.toBytes(String.valueOf(sum)));
context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了
编程要求
在右侧代码窗口完成代码编写:
-
MapReduce
类已经配置好,只需完成MapReduce
的数据分析; -
在
map
方法中,获取输入表t_shared_bicycle
的相关信息,计算出使用时间
=结束时间
-开始时间
,并把使用时间
和开始时间的日期
传给reduce
-
在
reduce
方法中通过使用时间
和开始时间的日期
计算共享单车每天平均使用时间,并把每天平均使用时间,四舍五入保留两位有效数字,存入到列族为info
,字段为avgTime
,ROWKEY 为avgTime
的表里。
t_shared_bicycle
表结构如下:
列族名称 | 字段 | 对应的文件的描述 | ROWKEY (格式为:骑行id ) |
---|---|---|---|
info | beginTime | 开始时间 | trip_id |
info | endTime | 结束时间 | trip_id |
info | bicycleId | 车辆id
|
trip_id |
info | departure | 出发地 | trip_id |
info | destination | 目的地 | trip_id |
info | city | 所在城市 | trip_id |
info | start_longitude | 开始经度 | trip_id |
info | stop_longitude | 结束经度 | trip_id |
info | start_latitude | 开始纬度 | trip_id |
info | stop_latitude | 结束纬度 | trip_id |
测试说明
平台会对你编写的代码进行测试,若是与预期输出相同,则算通关。文章来源:https://www.toymoban.com/news/detail-779875.html
开始你的任务吧,祝你成功!文章来源地址https://www.toymoban.com/news/detail-779875.html
package com.educoder.bigData.sharedbicycle; import java.io.IOException; import java.text.ParseException; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Scanner; import java.math.RoundingMode; import java.math.BigDecimal; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HBaseUtil; /** * 统计共享单车每天的平均使用时间 */ public class AveragetTimeMapReduce extends Configured implements Tool { public static final byte[] family = "info".getBytes(); public static class MyMapper extends TableMapper<Text, BytesWritable> { protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { /********** Begin *********/ long beginTime = Long.parseLong(Bytes.toString(result.getValue(family, "beginTime".getBytes()))); long endTime = Long.parseLong(Bytes.toString(result.getValue(family, "endTime".getBytes()))); String format = DateFormatUtils.format(beginTime, "yyyy-MM-dd", Locale.CHINA); long useTime = endTime - beginTime; BytesWritable bytesWritable = new BytesWritable(Bytes.toBytes(format + "_" + useTime)); context.write(new Text("avgTime"), bytesWritable); /********** End *********/ } } public static class MyTableReducer extends TableReducer<Text, BytesWritable, ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { /********** Begin *********/ double sum = 0; int length = 0; Map<String, Long> map = new HashMap<String, Long>(); for (BytesWritable price : values) { byte[] copyBytes = price.copyBytes(); String string = Bytes.toString(copyBytes); String[] split = string.split("_"); if (map.containsKey(split[0])) { Long integer = map.get(split[0]) + Long.parseLong(split[1]); map.put(split[0], integer); } else { map.put(split[0], Long.parseLong(split[1])); } } Collection<Long> values2 = map.values(); for (Long i : values2) { length++; sum += i; } BigDecimal decimal = new BigDecimal(sum / length /1000); BigDecimal setScale = decimal.setScale(2, RoundingMode.HALF_DOWN); Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn(family, "avgTime".getBytes(), Bytes.toBytes(setScale.toString())); context.write(null, put); /********** End *********/ } } public int run(String[] args) throws Exception { // 配置Job Configuration conf = HBaseUtil.conf; // Scanner sc = new Scanner(System.in); // String arg1 = sc.next(); // String arg2 = sc.next(); String arg1 = "t_shared_bicycle"; String arg2 = "t_bicycle_avgtime"; try { HBaseUtil.createTable(arg2, new String[] { "info" }); } catch (Exception e) { // 创建表失败 e.printStackTrace(); } Job job = configureJob(conf, new String[] { arg1, arg2 }); return job.waitForCompletion(true) ? 0 : 1; } private Job configureJob(Configuration conf, String[] args) throws IOException { String tablename = args[0]; String targetTable = args[1]; Job job = new Job(conf, tablename); Scan scan = new Scan(); scan.setCaching(300); scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存 // 初始化Mapreduce程序 TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, BytesWritable.class, job); // 初始化Reduce TableMapReduceUtil.initTableReducerJob(targetTable, // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); return job; } }
到了这里,关于共享单车之数据分析-统计共享单车每天的平均使用时间的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!