一、MapJoin-DistributedCache 应用

     1、mapreduce join 介绍

在各种实际业务场景中,按照某个关键字对两份数据进行连接是非常常见的。如果两份数据 都比较小,那么可以直接在内存中完成连接。如果是大数据量的呢? 显然,在内存中进行连 接会发生 OOM。 MapReduce 可以用来解决大数据量的链接
MapReduce Join 操作主要分两类: MapJoin ReduceJoin

先看 ReduceJoin:
(1)map 阶段,两份数据 data1 data2 会被 map 分别读入,解析成以链接字段为 key 以查 询字段为 value 的 key-value 对,并标明数据来源是 data1 还是 data2
(2)reduce 阶段, reducetask 会接收来自 data1 data2 的相同 key 的数据,在 reduce 端进 行乘积链接, 最直接的影响是很消耗内存,导致 OOM 

再看 MapJoin:
MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存 当中,按链接关键字建立索引。 然后大份数据就作为 MapTask 的输入,对 map()方法的每次 输入都去内存当中直接去匹配连接。 然后把连接结果按 key 输出,这种方法要使用 hadoop
中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需 要加载该数据到内存,并且按连接关键字建立索引 
(map读的是大表数据,在读大表之前,把小表数据放到内存当中,用setup方法)

    2、需求

现有两份数据 movies.dat ratings.dat 数据样式分别为:

Movies.dat:
     1::Toy Story (1995)::Animation|Children's|Comedy
     2::Jumanji (1995)::Adventure|Children's|Fantasy
     3::Grumpier Old Men (1995)::Comedy|Romance
     字段含义: movieid, moviename, movietype

Ratings.dat
    1::1193::5::978300760
    1::661::3::978302109
    1::914::3::978301968

字段含义: userid, movieid, rate, timestamp

现要求对两表进行连接,要求输出最终的结果有以上六个字段:
movieid, userid, rate, moviename, movietype, timestamp

 

     3、实现

第一步:封装 MovieRate,方便数据的排序和序列化
    

package com.ghgj.mr.mymapjoin; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.hadoop.io.WritableComparable; 
public class MovieRate implements WritableComparable<MovieRate>{ 
private String movieid; 
private String userid; 
private int rate; 
private String movieName; 
private String movieType; 
private long ts; 
public String getMovieid() { 
return movieid; 
} 
public void setMovieid(String movieid) { 
this.movieid = movieid; 
} 
public String getUserid() { 
return userid; 
} 
public void setUserid(String userid) { 
this.userid = userid; 
} 
public int getRate() { 
return rate; 
} 
public void setRate(int rate) { 
this.rate = rate; 
} 
public String getMovieName() { 
return movieName; 
} 
public void setMovieName(String movieName) { 
this.movieName = movieName; 
} 
public String getMovieType() { 
return movieType; 
} 
public void setMovieType(String movieType) { 
this.movieType = movieType; 
} 
public long getTs() { 
return ts; 
} 
public void setTs(long ts) { 
this.ts = ts; 
} 
public MovieRate() { 
} 
public MovieRate(String movieid, String userid, int rate, String movieName, 
String movieType, long ts) { 
this.movieid = movieid; 
this.userid = userid; 
this.rate = rate; 
this.movieName = movieName; 
this.movieType = movieType; 
this.ts = ts; 
} 
@Override 
public String toString() { 
return movieid + "\t" + userid + "\t" + rate + "\t" + movieName 
+ "\t" + movieType + "\t" + ts; 
} 
@Override 
public void write(DataOutput out) throws IOException { 
out.writeUTF(movieid); 
out.writeUTF(userid); 
out.writeInt(rate); 
out.writeUTF(movieName); 
out.writeUTF(movieType); 
out.writeLong(ts); 
} 
@Override 
public void readFields(DataInput in) throws IOException { 
this.movieid = in.readUTF(); 
this.userid = in.readUTF(); 
this.rate = in.readInt(); 
this.movieName = in.readUTF(); 
this.movieType = in.readUTF(); 
this.ts = in.readLong(); 
} 
@Override 
public int compareTo(MovieRate mr) { 
int it = mr.getMovieid().compareTo(this.movieid); 
if(it == 0){ 
return mr.getUserid().compareTo(this.userid); 
}else{ 
return it; 
} 
} 
} 

第二步:编写mapreduce程序

package com.ghgj.mr.mymapjoin; 
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URI; 
import java.util.HashMap; 
import java.util.Map; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.filecache.DistributedCache; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
public class MovieRatingMapJoinMR { 
public static void main(String[] args) throws Exception { 
Configuration conf = new Configuration(); 
conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); 
System.setProperty("HADOOP_USER_NAME","hadoop");

Job job = Job.getInstance(conf);
// job.setJarByClass(MovieRatingMapJoinMR.class);
job.setJar("/home/hadoop/mrmr.jar");
job.setMapperClass(MovieRatingMapJoinMRMapper.class);
job.setMapOutputKeyClass(MovieRate.class);
job.setMapOutputValueClass(NullWritable.class);
// job.setReducerClass(MovieRatingMapJoinMReducer.class);
// job.setOutputKeyClass(MovieRate.class);
// job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
String minInput = args[0];
String maxInput = args[1];
String output = args[2];
FileInputFormat.setInputPaths(job, new Path(maxInput));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
URI uri = new Path(minInput).toUri();
job.addCacheFile(uri);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
}

  

二、自定义 OutputFormat—数据分类输出

实现:自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write()

package com.ghgj.mr.score_outputformat; 
 
import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
 
public class MyScoreOutputFormat extends TextOutputFormat<Text, NullWritable>{ 
 
	@Override 
	public RecordWriter<Text, NullWritable> getRecordWriter( 
			TaskAttemptContext job) throws IOException, InterruptedException { 
		Configuration configuration = job.getConfiguration(); 
		 
		FileSystem fs = FileSystem.get(configuration); 
		Path p1 = new Path("/score1/outpu1"); 
		Path p2 = new Path("/score2/outpu2"); 
		 
		if(fs.exists(p1)){ 
			fs.delete(p1, true); 
		} 
		if(fs.exists(p2)){ 
			fs.delete(p2, true); 
		} 
		 
		FSDataOutputStream fsdout1 = fs.create(p1); 
		FSDataOutputStream fsdout2 = fs.create(p2); 
		return new MyRecordWriter(fsdout1, fsdout2); 
	} 
	 
	static class MyRecordWriter extends RecordWriter<Text, NullWritable>{ 
 
		FSDataOutputStream dout1 = null; 
		FSDataOutputStream dout2 = null; 
		 
		public MyRecordWriter(FSDataOutputStream dout1, FSDataOutputStream dout2) { 
			super(); 
			this.dout1 = dout1; 
			this.dout2 = dout2; 
		} 
 
		@Override 
		public void write(Text key, NullWritable value) throws IOException, 
				InterruptedException { 
			// TODO Auto-generated method stub 
			 
			String[] strs = key.toString().split("::"); 
			if(strs[0].equals("1")){ 
				dout1.writeBytes(strs[1]+"\n"); 
			}else{ 
				dout2.writeBytes(strs[1]+"\n"); 
			} 
		} 
 
		@Override 
		public void close(TaskAttemptContext context) throws IOException, 
				InterruptedException { 
			IOUtils.closeStream(dout2); 
			IOUtils.closeStream(dout1);  
		} 
	} 
} 

  

package com.ghgj.mr.score_outputformat; 
 
import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
public class ScoreOutputFormatMR extends Configured implements Tool{ 
 
	// 这个run方法就相当于Driver 
	@Override 
	public int run(String[] args) throws Exception { 
		 
		Configuration conf = new Configuration(); 
		conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); 
		System.setProperty("HADOOP_USER_NAME", "hadoop"); 
		Job job = Job.getInstance(conf); 
		 
		job.setMapperClass(ScoreOutputFormatMRMapper.class); 
		job.setMapOutputKeyClass(Text.class); 
		job.setMapOutputValueClass(NullWritable.class); 
		 
		job.setNumReduceTasks(0); 
		 
		// 这就是默认的输入输出组件 
		job.setInputFormatClass(TextInputFormat.class); 
		// 这是默认往外输出数据的组件 
//		job.setOutputFormatClass(TextOutputFormat.class); 
		job.setOutputFormatClass(MyScoreOutputFormat.class); 
		 
		FileInputFormat.setInputPaths(job, new Path("/scorefmt")); 
		Path output = new Path("/scorefmt/output"); 
		FileSystem fs = FileSystem.get(conf); 
		if(fs.exists(output)){ 
			fs.delete(output, true); 
		} 
		FileOutputFormat.setOutputPath(job, output); 
		 
		boolean status = job.waitForCompletion(true); 
		return status?0:1; 
	} 
 
	public static void main(String[] args) throws Exception { 
		 
		int run = new ToolRunner().run(new ScoreOutputFormatMR(), args); 
		System.exit(run); 
	} 
	 
	static class ScoreOutputFormatMRMapper extends Mapper<LongWritable,  Text, Text, NullWritable>{ 
		@Override 
		protected void map(LongWritable key, Text value, 
				Mapper<LongWritable, Text, Text, NullWritable>.Context context) 
				throws IOException, InterruptedException { 
			 
			String[] split = value.toString().split("\t"); 
			if(split.length-2 >= 6){ 
				context.write(new Text("1::"+value.toString()), NullWritable.get()); 
			}else{ 
				context.write(new Text("2::"+value.toString()), NullWritable.get()); 
			} 
		} 
	} 
} 

三、自定义 InputFormat—小文件合并

      第一步:自定义InputFormat

package com.ghgj.mr.format.input; 
 
import java.io.IOException; 
 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 
public class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> { 
	// 设置每个小文件不可分片,保证一个小文件生成一个key-value键值对 
	@Override 
	protected boolean isSplitable(JobContext context, Path file) { 
		return false; 
	} 
 
	@Override 
	public RecordReader<NullWritable, Text> createRecordReader( 
			InputSplit split, TaskAttemptContext context) throws IOException, 
			InterruptedException { 
		WholeFileRecordReader reader = new WholeFileRecordReader(); 
		reader.initialize(split, context); 
		return reader; 
	} 
} 

  第二步:编写自定义的 RecordReader

package com.ghgj.mr.format.input; 
 
import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
 
class WholeFileRecordReader extends RecordReader<NullWritable, Text> { 
	private FileSplit fileSplit; 
	private Configuration conf; 
	private Text value = new Text(); 
	private boolean processed = false; 
 
	@Override 
	public void initialize(InputSplit split, TaskAttemptContext context) 
			throws IOException, InterruptedException { 
		this.fileSplit = (FileSplit) split; 
		this.conf = context.getConfiguration(); 
	} 
 
	@Override 
	public boolean nextKeyValue() throws IOException, InterruptedException { 
		if (!processed) { 
			// 获取 输入逻辑切片的 字节数组 
			byte[] contents = new byte[(int) fileSplit.getLength()]; 
			// 通过 filesplit获取该逻辑切片在文件系统的位置 
			Path file = fileSplit.getPath(); 
			FileSystem fs = file.getFileSystem(conf); 
			FSDataInputStream in = null; 
			try { 
				// 文件系统对象fs打开一个file的输入流 
				in = fs.open(file); 
				/** 
				 *  in是输入流 
				 *  contents是存这个流读取的到数的数据的字节数组 
				 *   
				 */ 
				IOUtils.readFully(in, contents, 0, contents.length); 
				 
				value.set(contents, 0, contents.length); 
				 
			} finally { 
				IOUtils.closeStream(in); 
			} 
			processed = true; 
			return true; 
		} 
		return false; 
	} 
 
	@Override 
	public NullWritable getCurrentKey() throws IOException, InterruptedException { 
		return NullWritable.get(); 
	} 
 
	@Override 
	public Text getCurrentValue() throws IOException, InterruptedException { 
		return value; 
	} 
 
	@Override 
	public float getProgress() throws IOException { 
		return processed ? 1.0f : 0.0f; 
	} 
 
	@Override 
	public void close() throws IOException { 
		// do nothing 
	} 
} 

   第三步:编写mapreduce程序

package com.ghgj.mr.format.input; 
 
import java.io.IOException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
 
public class SmallFilesConvertToBigMR extends Configured implements Tool { 
	 
	public static void main(String[] args) throws Exception { 
		int exitCode = ToolRunner.run(new SmallFilesConvertToBigMR(), args); 
		System.exit(exitCode); 
	} 
 
	static class SmallFilesConvertToBigMRMapper extends 
			Mapper<NullWritable, Text, Text, Text> { 
		 
		private Text filenameKey; 
		@Override 
		protected void setup(Context context) throws IOException, 
				InterruptedException { 
			InputSplit split = context.getInputSplit(); 
			Path path = ((FileSplit) split).getPath(); 
			filenameKey = new Text(path.toString()); 
		} 
 
		@Override 
		protected void map(NullWritable key, Text value, Context context) 
				throws IOException, InterruptedException { 
			context.write(filenameKey, value); 
		} 
	} 
 
	static class SmallFilesConvertToBigMRReducer extends 
			Reducer<Text, Text, NullWritable, Text> { 
		@Override 
		protected void reduce(Text filename, Iterable<Text> bytes, 
				Context context) throws IOException, InterruptedException { 
			context.write(NullWritable.get(), bytes.iterator().next()); 
		} 
	} 
 
	@Override 
	public int run(String[] args) throws Exception { 
		Configuration conf = new Configuration(); 
		conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); 
		System.setProperty("HADOOP_USER_NAME", "hadoop"); 
		Job job = Job.getInstance(conf, "combine small files to bigfile"); 
		 
		job.setJarByClass(SmallFilesConvertToBigMR.class); 
		 
		job.setMapOutputKeyClass(Text.class); 
		job.setMapOutputValueClass(Text.class); 
		job.setMapperClass(SmallFilesConvertToBigMRMapper.class); 
 
		job.setOutputKeyClass(NullWritable.class); 
		job.setOutputValueClass(Text.class); 
		job.setReducerClass(SmallFilesConvertToBigMRReducer.class); 
 
		// TextInputFormat是默认的数据读取组件 
//		job.setInputFormatClass(TextInputFormat.class); 
		// 不是用默认的读取数据的Format,我使用自定义的 WholeFileInputFormat 
		job.setInputFormatClass(WholeFileInputFormat.class); 
		 
		 
		Path input = new Path("/smallfiles"); 
		Path output = new Path("/bigfile"); 
		FileInputFormat.setInputPaths(job, input); 
		FileSystem fs = FileSystem.get(conf); 
		if (fs.exists(output)) { 
			fs.delete(output, true); 
		} 
		FileOutputFormat.setOutputPath(job, output); 
 
		int status = job.waitForCompletion(true) ? 0 : 1; 
		return status; 
	} 
} 

  


 

 

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

MapReduce(五) mapreduce的shuffle机制 与 Yarn详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。