我有一个将文件作为输入并给出输出文件的外部程序
//for example
input file: IN_FILE
output file: OUT_FILE
//Run External program
./vx < ${IN_FILE} > ${OUT_FILE}
我想要HDFS中的输入和输出文件
我有 8 个节点的集群。我有 8 个输入文件,每个文件有 1 行
//1 input file : 1.txt
1:0,0,0
//2 input file : 2.txt
2:0,0,128
//3 input file : 3.txt
3:0,128,0
//5 input file : 4.txt
4:0,128,128
//5 input file : 5.txt
5:128,0,0
//6 input file : 6.txt
6:128,0,128
//7 input file : 7.txt
7:128,128,0
//8 input file : 8.txt
8:128,128,128
我正在使用 KeyValueTextInputFormat
key :file name
value: initial coordinates
例如第5个文件
key :5
value:128,0,0
每个 map task 根据其初始坐标生成大量数据。
现在我想在每个 map task 中运行外部程序并生成输出文件。
但我对如何处理 HDFS 中的文件感到困惑。
I can use zero reducer and create file in HDFS
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path outFile;
outFile = new Path(INPUT_FILE_NAME);
FSDataOutputStream out = fs.create(outFile);
//generating data ........ and writing to HDFS
out.writeUTF(lon + ";" + lat + ";" + depth + ";");
我很困惑如何在不将文件放入本地目录的情况下使用 HDFS 文件运行外部程序。
with dfs -get
在不使用 MR 的情况下,我使用 shell 脚本得到如下结果
#!/bin/bash
if [ $# -lt 2 ]; then
printf "Usage: %s: <infile> <outfile> \n" $(basename $0) >&2
exit 1
fi
IN_FILE=/Users/x34/data/$1
OUT_FILE=/Users/x34/data/$2
cd "/Users/x34/Projects/externalprogram/model/"
./vx < ${IN_FILE} > ${OUT_FILE}
paste ${IN_FILE} ${OUT_FILE} | awk '{print $1,"\t",$2,"\t",$3,"\t",$4,"\t",$5,"\t",$22,"\t",$23,"\t",$24}' > /Users/x34/data/combined
if [ $? -ne 0 ]; then
exit 1
fi
exit 0
然后我运行它
ProcessBuilder pb = new ProcessBuilder("SHELL_SCRIPT","in", "out");
Process p = pb.start();
如果知道如何使用 hadoop 流或任何其他方式运行外部程序,我将不胜感激。我希望 HDFS 中的 INPUT 和 OUTPUT 文件进行进一步处理。
请帮忙
请您参考如下方法:
因此假设您的外部程序不知道如何识别或读取 hdfs,那么您要做的就是从 java 加载文件并将其作为输入直接传递给程序
Path path = new Path("hdfs/path/to/input/file");
FileSystem fs = FileSystem.get(configuration);
FSDataInputStream fin = fs.open(path);
ProcessBuilder pb = new ProcessBuilder("SHELL_SCRIPT");
Process p = pb.start();
OutputStream os = p.getOutputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(fin));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os));
String line = null;
while ((line = br.readLine())!=null){
writer.write(line);
}
可以反向进行输出。从进程中获取 InputStream,并制作一个 FSDataOutputStream 以写入 hdfs。
从本质上讲,您的程序与这两个东西一起成为一个适配器,将 HDFS 转换为输入并输出回 HDFS。