IT虾米网

hadoop小文件合并

mate10pro 2018年06月04日 大数据 1584 0

1、背景

  在实际项目中,输入数据往往是由许多小文件组成,这里的小文件是指小于HDFS系统Block大小的文件(默认128M), 然而每一个存储在HDFS中的文件、目录和块都映射为一个对象,存储在NameNode服务器内存中,通常占用150个字节。 如果有1千万个文件,就需要消耗大约3G的内存空间。如果是10亿个文件呢,简直不可想象。所以在项目开始前, 我们选择一种适合的方案来解决本项目的小文件问题

2、介绍

  本地 D:\data目录下有 2012-09-17 至 2012-09-23 一共7天的数据集,我们需要将这7天的数据集按日期合并为7个大文件上传至 HDFS

3、数据

  本地 D:\data目录下的所有数据,如下图所示

  

4、分析

  基于项目的需求,我们通过下面几个步骤完成

  1、获取 D:\data目录下的所有日期路径,循环所有日期路径,通过globStatus()方法获取所有txt格式文件路径。

  2、最后通过IOUtils.copyBytes(in, out, 4096, false)方法将数据集合并为大文件,并上传至 HDFS

5、实现

  自定义RegexAcceptPathFilter类实现 PathFilter,比如只接受D:\data\2012-09-17日期目录下txt格式的文件

 1 /**   2 * @ProjectName FileMerge 
 3 * @PackageName com.buaa 
 4 * @ClassName RegexAcceptPathFilter 
 5 * @Description 接受 regex 格式的文件 
 6 * @Author 刘吉超 
 7 * @Date 2016-04-18 21:58:07 
 8 */  9 public static class RegexAcceptPathFilter implements PathFilter { 
10     private final String regex; 
11  12     public RegexAcceptPathFilter(String regex) { 
13         this.regex = regex; 
14     } 
15  16     @Override 
17     public boolean accept(Path path) { 
18         boolean flag = path.toString().matches(regex); 
19         return flag; 
20     } 
21 }

  实现主程序 merge 方法,完成数据集的合并,并上传至 HDFS

 1 /**  2  * 合并 
 3  *  
 4  * @param srcPath 源目录 
 5  * @param destPath 目标目录 
 6  */  7 public static void merge(String srcPath,String destPath) { 
 8     try{ 
 9         // 读取hadoop文件系统的配置 10         Configuration conf = new Configuration(); 
11          12         // 获取远端文件系统 13         URI uri = new URI(HDFSUri); 
14         FileSystem remote = FileSystem.get(uri, conf); 
15              16         // 获得本地文件系统 17         FileSystem local = FileSystem.getLocal(conf); 
18              19         // 获取data目录下的所有文件路径 20         Path[] dirs = FileUtil.stat2Paths(local.globStatus(new Path(srcPath))); 
21              22         FSDataOutputStream out = null; 
23         FSDataInputStream in = null; 
24          25         for (Path dir : dirs) { 
26             // 文件名称 27             String fileName = dir.getName().replace("-", ""); 
28             // 只接受目录下的.txt文件 29             FileStatus[] localStatus = local.globStatus(new Path(dir + "/*"), new RegexAcceptPathFilter("^.*.txt$")); 
30             // 获得目录下的所有文件 31             Path[] listedPaths = FileUtil.stat2Paths(localStatus); 
32             // 输出路径 33             Path block = new Path(destPath + "/" + fileName + ".txt"); 
34             // 打开输出流 35             out = remote.create(block);             
36             for (Path p : listedPaths) { 
37                 // 打开输入流 38                 in = local.open(p); 
39                 // 复制数据 40                 IOUtils.copyBytes(in, out, 4096, false); 
41                 // 关闭输入流 42                 in.close(); 
43             } 
44             if (out != null) { 
45                 // 关闭输出流 46                 out.close(); 
47             } 
48         } 
49     }catch(Exception e){ 
50         logger.error("", e); 
51     } 
52 }

6、一些运行代码

1 /** 2  * main方法 
3  *  
4  * @param args 
5  */ 6 public static void main(String[] args) { 
7     merge("D:\\data\\*","/buaa/tv"); 
8 }

7、结果

    

评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!