数据空间
您当前的位置: 首页 /数据知识

Hadoop 学习记录之基础篇

发布时间:[2018-03-29] 来源:Latte_z博客
点击量:

  中国企业数据治理联盟www.chinaedg.com/

  进入》主数据管理    企业数据治理    信息资源规划     数据安全管理

      经历了一个学期的洗礼之后,最终我还是选择了走大数据处理这条道路,个人觉得自己不是一个愿意扎实看论文潜心研究的人,所以机器学习->深度学习这条路不是特别适合我,还是更加愿意去写一些工程代码锻炼自己的能力。中翰软件专注数据治理11http://www.jobhand.cn/

1MapReduce的理论基础

那么什么是 MapReduce 呢?MapReduce 就是一种分治的思想,把一个大规模的数据操作,分解成一个个小数据集操作,同时分发到一个主节点管理的集群中进行任务工作。然后再把各个节点完成的工作合并在一起,这就得到了最终的结果。总得来说,MapReduce 就是 Map + Reduce,即任务的分解与结果的汇总

​ hadoop中共有两种任务机器角色,一种是 JobTracker,一种是 TaskTracker,顾名思义,JobTracker 是用于工作的调度,而 TaskTracker 是用于执行工作的。其中,一个集群中只有一台 JobTracker

​ MapReduce 框架负责了并行计算中的诸多问题,比如分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等问题,抽象成了以上提到的 Map + Reduce 两个方法函数。

3.jpg

2MapReduce 处理过程 

 

 

整个过程可以简单做成以上的流程,『 原始数据 -> 分割 -> map -> map 端排序 -> Combine -> Reduce 端排序 -> Reduce 输出 』,具体的内容,我们通过一个 WordCount 例子来说明。

3WordCount 解析

​ WordCount是一个非常简单,但是又很能够体现 MapReduce 思想的程序,这个程序被 Hadoop 内置作为了一个测试程序,功能很简单,就是统计一个输入文件内每个单词的个数。

我们暂时利用 maven 导入的 Hadoop 环境作为测试平台,pom 文件如下:

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
        </dependency>
    </dependencies>
/* 这里我们使用新版 API 来编写代码,原因如下:
     * 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。
     * 例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,MapperReducer是抽象类。
     *
     * 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。
     * 例如,MapContext基本上充当着JobConfOutputCollectorReporter的角色。
     */
public class WordCount{ 

    /*

 

 

     * Hadoop 提供了如下内容数据类型,均实现了 WritableComparable 接口,以便用这些类型定义的数据可以被序列化,用以网络传输和文件存储
     * BooleanWritable: 标准布尔
     * ByteWritable: 单字节
     * DoubleWritable: 双字节
     * FloatWritable: 浮点数
     * IntWritable: 整型数
     * LongWritable: 长整型数
     * Text: UTF8 格式存储的文本
     * NullWritable: <key,value>  key  value 为空的时候使用
     */ 

    // KEYIN, VALUEIN, KEYOUT, VALUEOUT

 

    public static class TokenizerMapperextends Mapper<Object,Text,Text,IntWritable>{

 

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text(); 

        @Override

 

        public void map(Object key, Text value, Context context)throws IOException, InterruptedException {

 

            // map 方法中,value 值存储的是文本文件中的一行(回车符为行结束标记)
            // key 值为该行首字母相对于文本文件首字母偏移量 

            /*

 

 

             * 分割过程,例如:
             * Hello World      ->      <0, "Hello World">
             * Bye World        ->      <12, "Bye World">
             * 其中偏移量包括了回车所占的字符数(注意 Windows  Linux/macOS/*nix 环境下换行符的差别)
             * Windows: \r\n
             * Linux/macOS/*nix: \n
             * 老版本的 Mac OS, OS X: \r
             */ 

            StringTokenizer itr = new StringTokenizer(value.toString()); // StringTokenizer 将每一行拆分成一个个单词

            /*

 

 

             * Map 过程,例如:
             * <0, "Hello World>      ->      <Hello, 1> + <World, 1>
             * <12, "Bye World>       ->      <Bye, 1> + <World, 1>
             */ 

            while (itr.hasMoreTokens()) {

 

                word.set(itr.nextToken());

 

                //  <word, 1> 作为方法结果输出,等于是做了一个词频的统计
                context.write(word, one);
                // 剩余工作交给 MapReduce 框架处理
            } 

            /*

 

 

             * 得到 map 方法输出的 <key,value> 后,Mapper 对这一些键值对按照 key 值进行排序(字典序)
             * 再执行 Combine 过程,将 key 值相同的 value 累加,得到 Mapper 最终输出结果
             * <Bye, 1>         ->      <Bye, 1>
             * <Hello, 1>       ->      <Hello, 1>
             * <World, 1>       ->      <World, 2>
             * <World, 1>            
             */
        }
    } 

    // KEYIN, VALUEIN, KEYOUT, VALUEOUT

 

 

 

    public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable>{

 

        private IntWritable result = new IntWritable(); 

        @Override

 

        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {

 

            // key 为单个单词,values 则是对应单词计数值组成的列表
            // reduce 的过程就是遍历 values 求和的操作,得到某个单词的总词数 

            /*

 

 

             * 首先 Reducer 对从 Mapper 接收的数据进行排序操作,再交给用户重写的 reduce 方法进行处理
             * 得到新的 <key,value> 对,作为 WordCount 的输出结果
             * <Bye, list(1,1)>       ->      <Bye, 2>
             * <Hadoop, list(2)>      ->      <Hadoop, 1>
             * <Hello, list(1,1)>     ->      <Hello, 2>
             * <Word, list(2)>        ->      <Word, 2>
             */ 

            int sum = 0;

 

            for (IntWritable val : values) {

 

                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    } 

    public static void main(String[] args)throws Exception {

 

        //  Configuration 类对 MapReduce Job 进行一个初始化

 

        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (2 != otherArgs.length) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        } 

        //  Job 对象负责管理和运行一个计算任务,通过 Job 的一些方法来对任务参数进行相关设置

 

        //  Job 进行命名操作,使得在 JobTracker  TaskTracker 页面进行监视

 

        Job job = Job.getInstance(conf, "word count"); 

        // 设置主类

 

        job.setJarByClass(WordCount.class);

        // 设置 Job  Map(拆分操作)、Combiner(中间结果合并)、Reduce(合并操作)三个相关处理类

 

        // 原始数据 -> 分割 -> map -> map 端排序 -> Combine -> Reduce 端排序 -> Reduce 输出

 

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class); 

        // 设置 Job 输出结果,为 <key,value>

 

        // 本例子中是 <单词,个数>,故 key  Text 型,Value  IntWritable 类型

 

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class); 

        // 设置输入输出的路径

 

        FileInputFormat.addInputPath(job, new Path(args[0]));

 

        FileOutputFormat.setOutputPath(job, new Path(args[1])); 

        // 等待任务结束后退出

 

        System.exit(job.waitForCompletion(true) ? 0 : 1);

 

    }
}

 


发表评论 共有条评论
用户名: 密码:
匿名发表