IT虾米网

java之"Pivot"使用 Hadoop 的表

dflying 2023年09月08日 编程语言 129 0

(免责声明:我是 Hadoop 和 Java 的新手)

作为输入,有一个具有简单键值结构的表:

key1  value1 
key2  value2 
key3  value3 
key2  value4 
key1  value5 
key1  value6 

作为输出,我想为每个键收集属于特定键的所有值,所以像这样:

key1, value1 value5 value6 
key2, value2 value4 
key3, value3 

这是我的映射器:

public class WordMapper extends Mapper<Object, Text, Text, Text> { 
 
 @Override 
 public void map(Object key, Text value, 
   Context context) throws IOException, InterruptedException { 
 
    String[] fields = value.toString().split("\\t", -1);  
    for (int i = 0; i < fields.length; ++i) { 
        if ("".equals(fields[i])) fields[i] = null; 
    } 
    List<String> fields_list = Arrays.asList(fields); 
    Text textKey = new Text(fields_list.get(0)); 
    Text textValue = new Text(fields_list.get(1)); 
    context.write(textKey,textValue); 
    } 
 } 

这是 reducer :

public class SumReducer extends Reducer<Text, TextArrayWritable, Text, TextArrayWritable> { 
    private TextArrayWritable valuesTotal = new TextArrayWritable(); 
 
    public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException { 
        ArrayList<Text> values_list = new ArrayList<Text>(); 
 
        for (Text value : values) { 
             values_list.add(value); 
    } 
        Text[] values_arr = new Text[values_list.size()]; 
        values_arr = values_list.toArray(values_arr); 
 
         valuesTotal.setFields(values_arr); 
         context.write(key, valuesTotal); 
} 
} 

出于某种原因,我无法从我的程序中获得任何输出。它只是终止,输出文件夹中没有任何内容。我的问题是什么?

(我使用 Hadoop 2.2.0 和 Eclipse + hadoop 插件。WordCount 示例运行没有问题。)

请您参考如下方法:

问题解决了。启用日志记录后,很明显我的数据包含第 4 列中缺少值的行,因此我添加了空检查 if (fields[4] != null) 并且它起作用了。此外,我摆脱了数组以列出 TextArrayWritable 自定义类的转换和用法

映射器:

@Override 
 public void map(Object key, Text value, 
   Context context) throws IOException, InterruptedException { 
 
    String[] fields = value.toString().split("\\t", -1);  
    for (int i = 0; i < fields.length; ++i) { 
        if ("".equals(fields[i])) fields[i] = null; 
    } 
    if (fields[4] != null) { 
    System.out.println(fields[0]); 
    System.out.println(fields[4]); 
    context.write(new Text(fields[0]),new Text(fields[4])); 
    } 
    } 
} 

reducer :

public class SongsReducer extends Reducer<Text, Text, Text, Text> {  
    public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException { 
        boolean first = true; 
        StringBuilder songs = new StringBuilder();; 
        for (Text val : values){ 
              if (!first) 
                songs.append(","); 
              first=false; 
              songs.append(val.toString()); 
            } 
 
        context.write(key, new Text(songs.toString())); 
} 
} 


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!