IT虾米网

hadoop处理Excel通话记录详解

admin 2018年06月04日 大数据 310 0

前面我们所写mr程序的输入都是文本文件,但真正工作中我们难免会碰到需要处理其它格式的情况,下面以处理excel数据为例

1、项目需求

    有刘超与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示。我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件

    下面是部分数据,数据格式:编号  联系人  电话  时间

    image

2、分析

    统计每个月每个家庭成员给自己打电话的次数这一点很简单,我们之前已经写过几个这样的程序。实现需求的麻烦点在于文件的输入是Excel文件,输出要按月份输出到不同文件。这就要我们实现自定义的InputFormat和OutputFormat

3、实现

    首先,输入文件是Excel格式,我们可以借助poi来解析Excel文件,我们先来实现一个Excel的解析类(ExcelParser)

package com.buaa; 
 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.ArrayList; 
import java.util.Iterator; 
import java.util.List; 
 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.poi.hssf.usermodel.HSSFSheet; 
import org.apache.poi.hssf.usermodel.HSSFWorkbook; 
import org.apache.poi.ss.usermodel.Cell; 
import org.apache.poi.ss.usermodel.Row; 
 
/**  * @ProjectName HandleExcelPhone 
* @PackageName com.buaa 
* @ClassName ExcelParser 
* @Description 解析excel 
* @Author 刘吉超 
* @Date 2016-04-24 16:59:28 
*/ public class ExcelParser { private static final Log logger = LogFactory.getLog(ExcelParser.class); 
 /** 
     * 解析is 
     *  
     * @param is 数据源 
     * @return String[] 
     */ public static String[] parseExcelData(InputStream is) { // 结果集 
        List<String> resultList = new ArrayList<String>(); 
         try { // 获取Workbook 
            HSSFWorkbook workbook = new HSSFWorkbook(is); // 获取sheet 
            HSSFSheet sheet = workbook.getSheetAt(0); 
             
            Iterator<Row> rowIterator = sheet.iterator(); 
             while (rowIterator.hasNext()) { // 行 
                Row row = rowIterator.next(); // 字符串 
                StringBuilder rowString = new StringBuilder(); 
                 
                Iterator<Cell> colIterator = row.cellIterator(); while (colIterator.hasNext()) { 
                    Cell cell = colIterator.next(); 
 switch (cell.getCellType()) { case Cell.CELL_TYPE_BOOLEAN: 
                            rowString.append(cell.getBooleanCellValue() + "\t"); break; case Cell.CELL_TYPE_NUMERIC: 
                            rowString.append(cell.getNumericCellValue() + "\t"); break; case Cell.CELL_TYPE_STRING: 
                            rowString.append(cell.getStringCellValue() + "\t"); break; 
                    } 
                } 
                 
                resultList.add(rowString.toString()); 
            } 
        } catch (IOException e) { 
            logger.error("IO Exception : File not found " + e); 
        } 
         return resultList.toArray(new String[0]); 
    } 
}

    然后,我们需要定义一个从Excel读取数据的InputFormat类,命名为ExcelInputFormat,实现代码如下

package com.buaa; 
 
import java.io.IOException; 
import java.io.InputStream; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
 
/**  * @ProjectName HandleExcelPhone 
* @PackageName com.buaa 
* @ClassName ExcelInputFormat 
* @Description TODO 
* @Author 刘吉超 
* @Date 2016-04-28 17:31:54 
*/ public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{ 
    @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, 
            TaskAttemptContext context) throws IOException, InterruptedException { 
         return new ExcelRecordReader(); 
    } 
     public class ExcelRecordReader extends RecordReader<LongWritable, Text> { private LongWritable key = new LongWritable(-1); private Text value = new Text(); private InputStream inputStream; private String[] strArrayofLines; 
 
        @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // 分片 
            FileSplit split = (FileSplit) genericSplit; // 获取配置 
            Configuration job = context.getConfiguration(); 
             // 分片路径 
            Path filePath = split.getPath(); 
             
            FileSystem fileSystem = filePath.getFileSystem(job); 
             
            inputStream = fileSystem.open(split.getPath()); 
             // 调用解析excel方法             this.strArrayofLines = ExcelParser.parseExcelData(inputStream); 
        } 
 
        @Override public boolean nextKeyValue() throws IOException, InterruptedException { int pos = (int) key.get() + 1; 
             if (pos < strArrayofLines.length){ 
                 if(strArrayofLines[pos] != null){ 
                    key.set(pos); 
                    value.set(strArrayofLines[pos]); 
                     return true; 
                } 
            } 
             return false; 
        } 
 
        @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {         return key; 
        } 
 
        @Override public Text getCurrentValue() throws IOException, InterruptedException {         return value; 
        } 
 
        @Override public float getProgress() throws IOException, InterruptedException { return 0; 
        } 
 
        @Override public void close() throws IOException { if (inputStream != null) { 
                inputStream.close(); 
            } 
        } 
    } 
}

    接下来,我们要定义一个ExcelOutputFormat类,用于实现按月份输出到不同文件中

package com.buaa; 
 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Iterator; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.ReflectionUtils; 
 
/**  * @ProjectName HandleExcelPhone 
* @PackageName com.buaa 
* @ClassName ExcelOutputFormat 
* @Description TODO 
* @Author 刘吉超 
* @Date 2016-04-28 17:24:23 
*/ public class ExcelOutputFormat extends FileOutputFormat<Text,Text> {   // MultiRecordWriter对象 private MultiRecordWriter writer = null;  
     
    @Override public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException,   
            InterruptedException {   if (writer == null) {   
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));   
        } 
         return writer;   
    } 
     private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {   
        Path workPath = null; 
         
        OutputCommitter committer = super.getOutputCommitter(conf); 
         if (committer instanceof FileOutputCommitter) {   
            workPath = ((FileOutputCommitter) committer).getWorkPath();   
        } else {   
            Path outputPath = super.getOutputPath(conf);   if (outputPath == null) {   throw new IOException("没有定义输出目录");   
            }   
            workPath = outputPath;   
        } 
         return workPath;   
    } 
     /** 
     * 通过key, value, conf来确定输出文件名(含扩展名) 
     *  
     * @param key 
     * @param value 
     * @param conf 
     * @return String 
     */ protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf){ // name + month 
        String[] records = key.toString().split("\t");  return records[1] + ".txt"; 
    } 
     /**  
    * 定义MultiRecordWriter */ public class MultiRecordWriter extends RecordWriter<Text,Text> {   // RecordWriter的缓存   private HashMap<String, RecordWriter<Text,Text>> recordWriters = null; // TaskAttemptContext private TaskAttemptContext job = null;   // 输出目录   private Path workPath = null; 
         public MultiRecordWriter(TaskAttemptContext job, Path workPath) {   super();   this.job = job;   this.workPath = workPath;   this.recordWriters = new HashMap<String, RecordWriter<Text,Text>>();   
        } 
         
        @Override   public void write(Text key, Text value) throws IOException, InterruptedException {   // 得到输出文件名   
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());   
            RecordWriter<Text,Text> rw = this.recordWriters.get(baseName);   if (rw == null) {   
                rw = getBaseRecordWriter(job, baseName);   this.recordWriters.put(baseName, rw);   
            }   
            rw.write(key, value);   
        }  
         private RecordWriter<Text,Text> getBaseRecordWriter(TaskAttemptContext job, String baseName)   throws IOException, InterruptedException {   
            Configuration conf = job.getConfiguration(); 
             boolean isCompressed = getCompressOutput(job); //key value 分隔符   
            String keyValueSeparator = "\t"; 
             
            RecordWriter<Text,Text> recordWriter = null;   if (isCompressed) {   
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,   
                        GzipCodec.class);   
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); 
                 
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());   
                 
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);   
                 
                recordWriter = new MailRecordWriter<Text,Text>(new DataOutputStream(codec   
                        .createOutputStream(fileOut)), keyValueSeparator);   
            } else {   
                Path file = new Path(workPath, baseName);  
                 
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);   
                 
                recordWriter = new MailRecordWriter<Text,Text>(fileOut, keyValueSeparator);   
            }   
             return recordWriter;   
        }  
         
        @Override   public void close(TaskAttemptContext context) throws IOException, InterruptedException {   
            Iterator<RecordWriter<Text,Text>> values = this.recordWriters.values().iterator();   while (values.hasNext()) {   
                values.next().close(context);   
            }   this.recordWriters.clear();   
        }   
         
    }   
}
package com.buaa; 
 
import java.io.DataOutputStream;   
import java.io.IOException;   
import java.io.UnsupportedEncodingException;   
 
import org.apache.hadoop.io.NullWritable;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.RecordWriter;   
import org.apache.hadoop.mapreduce.TaskAttemptContext;     
   
/**  * @ProjectName HandleExcelPhone 
* @PackageName com.buaa 
* @ClassName MailRecordWriter 
* @Description TODO 
* @Author 刘吉超 
* @Date 2016-04-24 16:59:23 
*/ public class MailRecordWriter< K, V > extends RecordWriter< K, V > { // 编码 private static final String utf8 = "UTF-8"; // 换行 private static final byte[] newline;   static {   try {   
            newline = "\n".getBytes(utf8);//换行符 "/n"不对   
        } catch (UnsupportedEncodingException uee) {   throw new IllegalArgumentException("can't find " + utf8 + " encoding");   
        }   
    } // 输出数据 protected DataOutputStream out; // 分隔符 private final byte[] keyValueSeparator; 
     public MailRecordWriter(DataOutputStream out, String keyValueSeparator) {   this.out = out;   try {   this.keyValueSeparator = keyValueSeparator.getBytes(utf8);   
        } catch (UnsupportedEncodingException uee) {   throw new IllegalArgumentException("can't find " + utf8 + " encoding");   
        }   
    } 
     public MailRecordWriter(DataOutputStream out) {   this(out, "/t");   
    } 
     private void writeObject(Object o) throws IOException {   if (o instanceof Text) {   
            Text to = (Text) o;   
            out.write(to.getBytes(), 0, to.getLength());   
        } else {   
            out.write(o.toString().getBytes(utf8));   
        }   
    } 
     public synchronized void write(K key, V value) throws IOException {   boolean nullKey = key == null || key instanceof NullWritable;   boolean nullValue = value == null || value instanceof NullWritable;   if (nullKey && nullValue) {   return;   
        }   if (!nullKey) {   
            writeObject(key);   
        }   if (!(nullKey || nullValue)) {   
            out.write(keyValueSeparator);   
        }   if (!nullValue) {   
            writeObject(value);   
        }   
        out.write(newline);   
    } 
     public synchronized void close(TaskAttemptContext context) throws IOException { 
        out.close();   
    }   
}

    最后我们来编写Mapper类,实现 map() 函数;编写Reduce类,实现reduce函数;还有一些运行代码

package com.buaa; 
 
import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
/**  * @ProjectName HandleExcelPhone 
* @PackageName com.buaa 
* @ClassName ExcelContactCount 
* @Description TODO 
* @Author 刘吉超 
* @Date 2016-04-24 16:34:24 
*/ public class ExcelContactCount extends Configured implements Tool { 
 public static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> { 
         public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException { 
            Text pkey = new Text(); 
            Text pvalue = new Text(); // 1.0, 老爸, 13999123786, 2014-12-20 
            String line = value.toString(); 
             
            String[] records = line.split("\\s+"); // 获取月份 
            String[] months = records[3].split("-"); // 昵称+月份 
            pkey.set(records[1] + "\t" + months[1]); // 手机号 
            pvalue.set(records[2]); 
             
            context.write(pkey, pvalue); 
        } 
    } 
 public static class PhoneReducer extends Reducer<Text, Text, Text, Text> { 
         protected void reduce(Text Key, Iterable<Text> Values, Context context) throws IOException, InterruptedException { 
            Text phone = Values.iterator().next(); int phoneToal = 0; 
             for(java.util.Iterator<Text> its = Values.iterator();its.hasNext();its.next()){ 
                phoneToal++; 
            } 
             
            Text pvalue = new Text(phone + "\t" + phoneToal); 
             
            context.write(Key, pvalue); 
        } 
    } 
 
    @Override 
    @SuppressWarnings("deprecation") public int run(String[] args) throws Exception { // 读取配置文件 
        Configuration conf = new Configuration(); 
         // 判断输出路径,如果存在,则删除 
        Path mypath = new Path(args[1]); 
        FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { 
            hdfs.delete(mypath, true); 
        } 
         // 新建任务 
        Job job = new Job(conf,"Call Log"); 
        job.setJarByClass(ExcelContactCount.class); 
         // 输入路径 
        FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
         // Mapper 
        job.setMapperClass(PhoneMapper.class); // Reduce 
        job.setReducerClass(PhoneReducer.class); 
         // 输出key类型 
        job.setOutputKeyClass(Text.class); // 输出value类型 
        job.setOutputValueClass(Text.class); 
         // 自定义输入格式 
        job.setInputFormatClass(ExcelInputFormat.class); // 自定义输出格式 
        job.setOutputFormatClass(ExcelOutputFormat.class); 
         return job.waitForCompletion(true) ? 0:1; 
    } 
 public static void main(String[] args) throws Exception { 
        String[] args0 = {  "hdfs://ljc:9000/buaa/phone/phone.xls", "hdfs://ljc:9000/buaa/phone/out/"  
            }; int ec = ToolRunner.run(new Configuration(), new ExcelContactCount(), args0); 
        System.exit(ec); 
    } 
}

4、结果

    image

   通过这份数据很容易看出,刘超1月份与姐姐通话次数最多,19次

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

使用DBOutputFormat把MapReduce产生的结果集导入到mysql中详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。