MapReduce分布式计算(一)

这篇具有很好参考价值的文章主要介绍了MapReduce分布式计算(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MapReduce是Hadoop系统核心组件之一,它是一种可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是目前分布式计算模型中应用较为广泛的一种。

练习:计算a.txt文件中每个单词出现的次数

hello world
hello hadoop
hello 51doit
hadoop mapreduce
mapreduce spark


public class WordCount {
    public static void main(String[] args) throws IOException {
        //获取到resource文件夹下a.txt的路径
        URL resource = WordCount.class.getClassLoader().getResource("a.txt");
        String path = resource.getPath();
        //使用FileUtils将文件读取成字符串
        String s = FileUtils.readFileToString(new File(path),"utf-8");
        //将文件使用空格进行切割  \s可以切割 空格 tab键
        String[] arr = s.split("\\s+");

        //创建Map集合
        Map<String,Integer> map = new HashMap<>();

        //遍历数组
        for (String s1 : arr) {
            //判断集合是否包含指定键
            if(!map.containsKey(s1)){
                //如果不包含 添加 单词 1
                map.put(s1,1);
            }else{
                //如果包含  获取当前键的次数 +1 在添加回集合
                Integer count = map.get(s1);
                map.put(s1,count+1);
            }
        }

        System.out.println(map);
    }
}

通过以上的方式 计算出来了a.txt文件中每个单词出现的次数,但是我们想一下 ,如果a.txt文件非常大,怎么办?

比如有一个a.txt文件10个T的大小。这时一台计算机就没有办法计算了,因为我们根本存储不了,计算不了,那么一台计算机无法计算,就使用多台计算机来进行计算!

MapReduce核心思想

​ MapReduce的核心思想是“分而治之”。所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果,这种思想来源于日常生活与工作时的经验,同样也完全适合技术领域。

为了更好地理解“分而治之”思想,我们光来举一个生活的例子。例如,某大型公司在全国设立了分公司,假设现在要统计公司今年的营收情况制作年报,有两种统计方式,第1种方式是全国分公司将自己的账单数据发送至总部,由总部统一计算公司今年的营收报表:第2种方式是采用分而治之的思想,也就是说,先要求分公司各自统计营收情况,再将统计结果发给总部进行统一汇总计算。这两种方式相比,显然第2种方式的策略更好,工作效率更高效。

MapReduce 作为一种分布式计算模型,它主要用于解决海量数据的计算问题。使用MapReduce操作海量数据时,每个MapReduce程序被初始化为一个工作任务,每个工作任务可以分为Map 和l Reducc两个阶段,具体介绍如下:

Map阶段::负责将任务分解,即把复杂的任务分解成若干个“简单的任务”来行处理,但前提是这些任务没有必然的依赖关系,可以单独执行任务。

Reduce阶段:负责将任务合并,即把Map阶段的结果进行全局汇总。下面通过一个图来描述上述MapReduce 的核心思想。

MapReduce就是“任务的分解与结和的汇总”。即使用户不懂分布式计算框架的内部运行机制,但是只要能用Map和 Reduce思想描述清楚要处理的问题,就能轻松地在Hadoop集群上实现分布式计算功能。

MapReduce编程模型

MapReduce是一种编程模型,用于处理大规模数据集的并行运算。使用MapReduce执行计算任务的时候,每个任务的执行过程都会被分为两个阶段,分别是Map和Reduce,其中Map阶段用于对原始数据进行处理,Reduce阶段用于对Map阶段的结果进行汇总,得到最终结果。

MapReduce编程模型借鉴了函数式程序设计语言的设计思想,其程序实现过程是通过map()和l reduce()函数来完成的。从数据格式上来看,map()函数接收的数据格式是键值对,生的输出结果也是键值对形式,reduce()函数会将map()函数输出的键值对作为输入,把相同key 值的 value进行汇总,输出新的键值对。

(1)将原始数据处理成键值对<K1,V1>形式。

(2)将解析后的键值对<K1,V1>传给map()函数,map()函数会根据映射规则,将键值对<K1,V1>映射为一系列中间结果形式的键值对<K2,V2>。

(3)将中间形式的键值对<K2,V2>形成<K2,{V2,....>形式传给reduce()函数处理,把具有相同key的value合并在一起,产生新的键值对<K3,V3>,此时的键值对<K3,V3>就是最终输出的结果。

词频统计

因为我们的数据都存储在不同的计算机中,那么将对象中的数据从网络中传输,就一定要用到序列化!

/*
	JDK序列化对象的弊端 
	我们进行序列化 其实最主要的目的是为了 序列化对象的属性数据
	比如如果序列化一个Person对象 new Person("柳岩",38); 其实我们想要的是 柳岩 38
	但是如果直接序列化一个对象的话 JDK为了反序列化方便 会在文件中加入其他的数据 这样
	序列化后的文件会变的很大,占用空间
*/
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));

        //JDK序列化对象 
        Person p = new Person();
        p.setName("柳岩");
        p.setAge(38);
        oos.writeObject(p);
        oos.close();
    }
}

本来其实数据就占几个字节,序列化后,多占用了很多字节,这样如果序列化多的话就会浪费很多空间.

/*
	可以通过序列化属性的方式解决问题
	只序列化属性 可以减小序列化后的文件大小
*/
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));

        Person p = new Person();
        p.setName("柳岩");
        p.setAge(38);

        //只序列化属性
        oos.writeUTF(p.getName());
        oos.writeInt(p.getAge());
        oos.close();
    }
}

/*
	需要注意
	反序列化时 需要按照序列化的顺序来反序列化
*/
public class Test {
    public static void main(String[] args) throws Exception {
        ObjectInputStream ois = new ObjectInputStream(new FileInputStream("d:\\person.txt"));
		//先反序列化name 在反序列化age
        String name = ois.readUTF();
        int age = ois.readInt();
        System.out.println(name + " "+age);
        ois.close();
    }
}

Hadoop对java的序列化又进行了优化,对一些类型进行了进一步的封装,方便按照自己的方式序列化

Integer  ----> IntWritable
Long     ----> LongWritable
String   ----> Text
Double   ----> DoubleWritable
Boolean  ----> BooleanWritable

WorldCount代码编写

map函数定义

/*
	KEYIN: K1   
	VALUIN: V1  
	KEYOUT:K2   
	VALUEOUT:V2 
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  
   protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 		context) throws IOException, InterruptedException {
    		
    }
}

我们只需要继承Mapper类,重写map方法就好

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
     K1 : 行起始位置   数字 Long  ---- > LongWritable
     V1 : 一行数据  字符串 String  -----> Text
     K2 : 单词     字符串 String  ----->  Text
     V2 : 固定数字1 数组  Long -----> LongWritable
 */
public class WordCountMapper  extends Mapper<LongWritable, Text,Text,LongWritable> {


    /**
     *
     * @param key   K1
     * @param value V1
     * @param context  上下文对象 将map的结果 输出给reduce
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //将一行数据 转换成字符串 按照空格切割 
        String[] arr = value.toString().split("\\s+");
        for (String k2 : arr) {
              //将单词输出给reduce
              context.write(new Text(k2),new LongWritable(1));
        }
    }
}

reduce函数定义

/*
	KEYIN:K2 
	VALUEIN:V2
	KEYOUT:K3
	VALUEOUT:V3
*/
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  
    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, 					VALUEOUT>.Context context) throws IOException, InterruptedException {

    }
}

我们只需要继承Reducer类型重写reduce方法就好

/*
    K2:单词        String  ----> Text
    V2:固定数字 1   Long    ----> LongWritable
    K3:单词        String  ----> Text
    V3:相加后的结果 Long    ----> LongWritable
 */
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

    /**
     *
     * @param key  K2
     * @param values  V2的集合 {1,1,1,1}
     * @param context 上下文对象 输出结果
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        int count = 0;
        //将次数相加
        for (LongWritable value : values) {
               count+=value.get();
        }
        //写出 k3 v3
        context.write(key,new LongWritable(count));
    }
}

最后编写启动程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Test {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //创建配置对象
        Configuration conf = new Configuration();
        //创建工作任务
        Job job = Job.getInstance(conf, "wordCount");

        //设置Map类
        job.setMapperClass(WordCountMapper.class);
        //设置Reduce类
        job.setReducerClass(WordCountReducer.class);

        //设置map的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //设置读取文件位置  可以是文件 也可以是文件夹
        FileInputFormat.setInputPaths(job,new Path("d:\\work\\abc"));
        //设置输出文件位置
        FileOutputFormat.setOutputPath(job,new Path("d:\\work\\abc\\out_put"));

        //提交任务 并等待任务结束
        job.waitForCompletion(true);
    }
}

如果抛这个异常 需要查看windows环境
Exception in thread "main"java.lang .UnsatisfiedLinkError: org.apache .hadoop.io.nativeio.NativeIO$windows.access0(Ljava/lang/string;1) .
 如果已经配置了环境 还不行 在src新建包 org.apache.hadoop.io.nativeio
 然后hadoop02文件夹中的 NativeIO.java添加到这个包下 重新运行尝试

若要显示报错信息在resouces目录下添加log4j.properties
内容如下:文章来源地址https://www.toymoban.com/news/detail-500797.html

log4j.rootCategory=INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

到了这里,关于MapReduce分布式计算(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ❤️❤️❤️Mapreduce分布式计算组件和YARN分布式资源调度

    上文我们已经介绍Hadoop中HDFS分布式存储组件 今天我们来学习Hadoop生态中另两大组件Mapreduce和YARN Map阶段 : 将数据拆分到不同的服务器后执行Maptask任务,得到一个中间结果 Reduce阶段 : 将Maptask执行的结果进行汇总,按照Reducetask的计算 规则获得一个唯一的结果 我们在MapReduce计算框

    2024年04月13日
    浏览(41)
  • Linux从零搭建Hadoop集群(CentOS7+hadoop 3.2.0+JDK1.8+Mapreduce完全分布式集群案例)

    和相关配置版本 :Linux CentOS Hadoop Java 版本: CentOS7 Hadoop3.2.0 JDK1.8 虚拟机参数信息内存3.2G、处理器2x2、内存50G ISO:CentOS-7-x86_64-DVD-2009.iso 基本主从思路: 先把基础的设置(SSH、JDK、Hadooop、环境变量、Hadoop和MapReduce配置信息)在一台虚拟机(master)上配好,通过

    2024年02月05日
    浏览(56)
  • hadoop分布式计算组件

    ·计算:对数据进行处理,使用统计分析等手段得到需要的结果 ·分布式计算:多台服务器协同工作,共同完成一个计算任务 分布式计算常见的2种工作模式 分散-汇总(MapReduce就是这种模式) 中心调度-步骤执行(大数据体系的Spark、Flink等是这种模式) MapReduce是“分散-汇总”模

    2024年04月11日
    浏览(31)
  • 分布式计算平台 Hadoop 简介

    Hadoop是一种分析和处理大数据的软件平台,是一个用Java语言实现的Apache的开源软件框架,在大量计算机组成的集群中实现了对海量数据的分布式计算。其主要采用MapReduce分布式计算框架,包括根据GFS原理开发的分布式文件系统HDFS、根据BigTable原理开发的数据存储系统HBase以及

    2024年02月01日
    浏览(39)
  • 大数据中的分布式文件系统MapReduce的选择题

    一. 单选题(共9题,49.5分) (单选题)下列传统并行计算框架,说法错误的是哪一项? A. 刀片服务器、高速网、SAN,价格贵,扩展性差上 B. 共享式(共享内存/共享存储),容错性好 C. 编程难度高 D. 实时、细粒度计算、计算密集型 正确答案: B:共享式(共享内存/共享存储),容错性好; 5.5分

    2024年02月04日
    浏览(29)
  • 分布式计算 第五章 大数据多机计算:Hadoop

    5.2.1 从硬件思考大数据 从硬件角度看,一台或是几台机器似乎难以胜任大数据的存储和计算工作。 • 大量机器的集群构成数据中心 • 使用高速互联网络对大量机器进行连接以确保数据传递 • 综合考量数据中心的散热问题、能耗问题,以及各方面成本 • 集群中硬件发生故

    2024年02月05日
    浏览(45)
  • 【云计算平台】Hadoop全分布式模式环境搭建

    此前搭建了hadoop的单机模式与伪分布式模式: 单机模式部署 伪分布式模式部署 中间拖得有点久了,今天索性做个了结,把hadoop的全分布式模式部署的操作也简单地记录一下,算是一个系统性的学习吧。 伪分布式模式是学习阶段最常用的模式,它可以将进程都运行在同一台机

    2023年04月08日
    浏览(36)
  • 【云计算】Hadoop2.x完全分布式集群(入门)

    【虚拟机】VMware Workstation 16 Pro 【镜像】CentOS-7-x86_64-DVD-1804.iso 【java】jdk-8u281-linux-x64.rpm 【Hadoop】hadoop-2.7.1.tar.gz 【SSH远程】SecureCRTPortable.exe 【上传下载】SecureFXPortable.exe 配网卡ens33 重启网络 私钥、公钥 克隆、改名、改IP 三台机都要做:👇 生成密钥 密钥发送 登录测试 had

    2024年04月12日
    浏览(35)
  • Hadoop分布式文件系统(三)

    目录 一、Hadoop 1、MapReduce 1.1、理解MapReduce思想 1.2、分布式计算概念 1.3、MapReduce介绍 1.4、MapReduce特点 1.5、MapReduce局限性 1.6、MapReduce实例进程 1.7、MapReduce阶段组成 1.8、MapReduce数据类型 1.9、MapReduce官方示例 1.9.1、示例说明--圆周率PI评估 1.9.2、官方示例--WordCount单词统计 1.10、

    2024年01月16日
    浏览(31)
  • Hadoop HDFS(分布式文件系统)

    一、Hadoop HDFS(分布式文件系统) 为什么要分布式存储数据 假设一个文件有100tb,我们就把文件划分为多个部分,放入到多个服务器 靠数量取胜,多台服务器组合,才能Hold住 数据量太大,单机存储能力有上限,需要靠数量来解决问题 数量的提升带来的是网络传输,磁盘读写,

    2024年02月06日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包