我们继续通过项目强化掌握Combiner和Partitioner优化Hadoop性能

1、项目介绍

  本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星。

2、数据集

  image

3、分析

  基于项目的需求,我们通过以下几步完成:

  1、编写Mapper类,按需求将数据集解析为key=gender,value=name+hotIndex,然后输出。

  2、编写Combiner类,合并Mapper输出结果,然后输出给Reducer。

  3、编写Partitioner类,按性别,将结果指定给不同的Reduce执行。

  4、编写Reducer类,分别统计出男、女明星的最高搜索指数。

  5、编写run方法执行MapReduce任务

4、实现

package com.buaa; 
 
import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
/**  * @ProjectName CountStarSearchIndex 
* @PackageName com.buaa 
* @ClassName SearchStarIndex 
* @Description 统计分别统计出男女明星最大搜索指数 
* @Author 刘吉超 
* @Date 2016-05-12 16:30:23 
*/ public class SearchStarIndex extends Configured implements Tool { // 分隔符\t private static String TAB_SEPARATOR = "\t"; // 男 private static String MALE = "male"; // 女 private static String FEMALE = "female"; 
     /* 
     * 解析明星数据 
     */ public static class IndexMapper extends Mapper<Object, Text, Text, Text> { /* 
         * 每次调用map(LongWritable key, Text value, Context context)解析一行数据。 
         * 每行数据存储在value参数值中。然后根据'\t'分隔符,解析出明星姓名,性别和搜索指数 
         */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 将数据解析为数组 
            String[] tokens = value.toString().split(TAB_SEPARATOR); 
             if(tokens != null && tokens.length >= 3){ // 性别 
                String gender = tokens[1].trim(); // 名称、关注指数 
                String nameHotIndex = tokens[0].trim() + TAB_SEPARATOR + tokens[2].trim(); 
                 // 输出key=gender value=name+hotIndex 
                context.write(new Text(gender), new Text(nameHotIndex)); 
            } 
        } 
    } 
     /* 
     * 根据性别对数据进行分区,将 Mapper的输出结果均匀分布在 reduce上 
     */ public static class IndexPartitioner extends Partitioner<Text, Text> {          
        @Override public int getPartition(Text key, Text value, int numReduceTasks) {  // 按性别分区 
            String sex = key.toString(); 
             // 默认指定分区 0 if(numReduceTasks == 0) return 0; 
             // 性别为男,选择分区0 if(MALE.equals(sex)){            return 0; 
            }else if(FEMALE.equals(sex)){ // 性别为女,选择分区1 return 1 % numReduceTasks; 
            }else // 性别未知,选择分区2 return 2 % numReduceTasks; 
            
        } 
    } 
     /* 
     * 定义Combiner,对 map端的输出结果,先进行一次合并,减少数据的网络输出 
     */ public static class IndexCombiner extends Reducer<Text, Text, Text, Text> { 
         
        @Override public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; 
            String name= ""; 
             for (Text val : values) { 
                String[] valTokens = val.toString().split(TAB_SEPARATOR); 
                 int hotIndex = Integer.parseInt(valTokens[1]); 
                 if(hotIndex > maxHotIndex){ 
                    name = valTokens[0]; 
                    maxHotIndex = hotIndex; 
                } 
            } 
             
            context.write(key, new Text(name + TAB_SEPARATOR + maxHotIndex)); 
        } 
    } 
     /* 
     * 统计男、女明星最高搜索指数 
     */ public static class IndexReducer extends Reducer<Text, Text, Text, Text> { /* 
         * 调用reduce(key, Iterable< Text> values, context)方法来处理每个key和values的集合。 
         * 我们在values集合中,计算出明星的最大搜索指数 
         */ 
        @Override public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; 
            String name = " "; 
             // 根据key,迭代 values集合,求出最高搜索指数 for (Text val : values) { 
                String[] valTokens = val.toString().split(TAB_SEPARATOR); 
                 int hotIndex = Integer.parseInt(valTokens[1]); 
                 if (hotIndex > maxHotIndex) { 
                    name = valTokens[0]; 
                    maxHotIndex = hotIndex; 
                } 
            } 
             
            context.write(new Text(name), new Text(key + TAB_SEPARATOR + maxHotIndex)); 
        } 
    } 
     
    @SuppressWarnings("deprecation") 
    @Override public int run(String[] args) throws Exception { // 读取配置文件 
        Configuration conf = new Configuration(); 
         // 如果目标文件夹存在,则删除 
        Path mypath = new Path(args[1]); 
        FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { 
            hdfs.delete(mypath, true); 
        } 
 // 新建一个任务 
        Job job = new Job(conf, "searchStarIndex"); // 主类 
        job.setJarByClass(SearchStarIndex.class); 
         // reduce的个数设置为2 
        job.setNumReduceTasks(2); // 设置Partitioner类 
        job.setPartitionerClass(IndexPartitioner.class); 
         // Mapper 
        job.setMapperClass(IndexMapper.class); // Reducer 
        job.setReducerClass(IndexReducer.class); 
         // map 输出key类型 
        job.setMapOutputKeyClass(Text.class); // map 输出value类型 
        job.setMapOutputValueClass(Text.class); 
         // 设置Combiner类 
        job.setCombinerClass(IndexCombiner.class); 
         // 输出结果 key类型 
        job.setOutputKeyClass(Text.class); // 输出结果 value类型 
        job.setOutputValueClass(Text.class); 
         // 输入路径 
        FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
         // 提交任务 return job.waitForCompletion(true) ? 0 : 1; 
    } 
     public static void main(String[] args) throws Exception { 
        String[] args0 = {  "hdfs://ljc:9000/buaa/index/index.txt", "hdfs://ljc:9000/buaa/index/out/" 
        }; int ec = ToolRunner.run(new Configuration(), new SearchStarIndex(), args0); 
        System.exit(ec); 
    } 
}

5、运行效果

  image

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

MapReduce计数器详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。