本文将讨论如何使用Hadoop MapReduce来统计汽车销售信息。
汽车销售数据文件
汽车销售的记录文件名叫Cars.csv,里面记录了汽车的销售信息,数据内容如下:
山西省,3,朔州市,朔城区,2013,LZW6450PF,上汽通用五菱汽车股份有限公司,五菱,小型普通客车,个人,非营运,1,L3C,8424,79,汽油,4490,1615,1900,,,,2,3050,1386,175/70R14LT,4,2110,1275,,7,,,,,客车,1913,男性
山西省,3,晋城市,城区,2013,EQ6450PF1,东风小康汽车有限公司,东风,小型普通客车,个人,非营运,1,DK13-06,1587,74,汽油,4500,1680,1960,,,,2,3050,1435,185R14LT 6PR,4,1970,1290,,7,,东风小康汽车有限公司,,EQ6440KMF,客车,1929,男性
山西省,12,长治市,长治城区,2013,BJ6440BKV1A,北汽银翔汽车有限公司,北京,小型普通客车,个人,非营运,1,BJ415A,1500,75,,4440,,,,,,,,,,,,,,,,北汽银翔汽车有限公司,北京,BJ6440BKV1A,,1938,男性
山西省,12,长治市,长治城区,2013,DXK6440AF2F,东风小康汽车有限公司,东风,小型普通客车,个人,非营运,1,DK15,1499,85,汽油,4365,1720,1770,,,,2,2725,1425,185/65R14,4,1835,1235,,5,,东风小康汽车有限公司,东风,DXK6440AF2F,多用途乘用车,1926,女性
...
格式为:
第1列:销售的省份
第3列:销售的城市
第7列:汽车生产商
第8列:汽车品牌名
第12列:汽车销售数量
已经将Cars.csv上传到HDFS文件系统的/input
目录下。
统计各城市销售汽车的数量
思路
要统计城市销售汽车的数量,由于只涉及到了城市、数量,所以可以可以采取用城市作为Key,用销售数量作为Value,当成Map的输出,再由Reduce对Map的结果按照城市将销售数据进行汇总即可。
代码
package com.wux.labs.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CarSalesCount1 {
public static class CarSalesCountMapper
extends Mapper<Object, Text, Text, IntWritable> {
private Text city = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// CSV文件,所以这里用逗号分割列
String values[] = value.toString().split(",");
// 第3列是城市,这里以城市作为Key
city.set(values[2]);
// 第12列是销售数量,直接取销售数量,而不是取1
context.write(city, new IntWritable(Integer.parseInt(values[11])));
}
}
public static class CarSalesCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 对销售数量累加
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// 根据城市得到累加的结果
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SalesCount1");
job.setJarByClass(CarSalesCount1.class);
job.setMapperClass(CarSalesCountMapper.class);
job.setReducerClass(CarSalesCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
打包后直接运行:
hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount1
运行完成后可查看结果:
统计各城市销售品牌的数量
思路
与销售数量不同,销售数量的话直接将销售记录中的数量相加就可以了,但是销售品牌数,则只统计到汽车品牌的数量,不管一个品牌销售了多少辆汽车,都只算一个品牌,因此,不能按销售数量累加统计。
方案1
在Map阶段,可以按:城市 + 品牌 作为输出的Key,Value任意,在Map之后增加Combiner阶段,因为Map阶段的Key可以保证输出的记录中相同城市的不同记录中品牌都是不重复的,所以Combiner阶段可以按城市作为Key,1作为Value输出,最后由Reduce阶段按城市Key,对Combiner的Value进行汇总即可。
方案1代码
package com.wux.labs.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CarSalesCount2 {
public static class CarSalesCountMapper
extends Mapper<Object, Text, Text, IntWritable> {
private Text city = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// CSV文件,所以这里用逗号分割列
String values[] = value.toString().split(",");
// 第3列是城市,第8列是品牌,这里以城市+品牌作为Key
city.set(values[2] + " " + values[7]);
// Map阶段的Value不会使用,所以Value可以任意
context.write(city, new IntWritable(1));
}
}
public static class CarSalesCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Text cs = new Text();
// Combiner阶段按照Map阶段的Key进行拆分,城市作为Key
cs.set(key.toString().split(" ")[0]);
// 因为每行记录中同一个城市的品牌不会重复,所以Value取1
context.write(cs, new IntWritable(1));
}
}
public static class CarSalesCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 对品牌数量累加
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// 根据城市得到累加的结果
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SalesCount");
job.setJarByClass(CarSalesCount2.class);
job.setMapperClass(CarSalesCountMapper.class);
job.setCombinerClass(CarSalesCountCombiner.class);
job.setReducerClass(CarSalesCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
打包后直接运行:
hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount2
运行完成后可查看结果:
方案2
除了可以重组Key,增加Combiner阶段来进行计数,还可以直接在Map阶段用城市作为Key,用品牌作为Value进行输出,Map阶段后不需要Combiner阶段直接到Reduce阶段,Reduce阶段由于接收到的Map的输出不是数字,而是汽车品牌,是字符串,所以可以用Set进行数据保存,最后统计Set中元素的个数即可。
方案2代码
package com.wux.labs.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class CarSalesCount3 {
public static class CarSalesCountMapper
extends Mapper<Object, Text, Text, Text> {
private Text city = new Text();
private Text brand = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// CSV文件,所以这里用逗号分割列
String values[] = value.toString().split(",");
// 第3列是城市,这里以城市作为Key
city.set(values[2]);
// 第8列是品牌,直接取品牌作为Value
brand.set(values[7]);
context.write(city, brand);
}
}
public static class CarSalesCountReducer extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Set set = new HashSet();
// 将销售品牌放入Set集合
for (Text val : values) {
String brand = val.toString();
if (!set.contains(brand)) {
set.add(brand);
}
}
result.set(set.size());
// 根据城市得到累加的结果
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SalesCount1");
job.setJarByClass(CarSalesCount3.class);
job.setMapperClass(CarSalesCountMapper.class);
job.setReducerClass(CarSalesCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
打包后直接运行:
hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount3
运行完成后可查看结果:文章来源:https://www.toymoban.com/news/detail-775881.html
文章来源地址https://www.toymoban.com/news/detail-775881.html
到了这里,关于Hadoop MapReduce 统计汽车销售信息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!