Hadoop编程基于MR程序实现倒排索引示例
相信接触过搜索引擎开发的同学对倒排索引并不陌生,谷歌、百度等搜索引擎都是用的倒排索引,关于倒排索引的有关知识,这里就不再深入讲解,有兴趣的同学到网上了解一下。这篇博文就带着大家一起学习下如何利用Hadoop的MR程序来实现倒排索引的功能。
一、数据准备
1、输入文件数据
这里我们准备三个输入文件,分别如下所示
a.txt
hellotom hellojerry hellotom
b.txt
hellojerry hellojerry tomjerry
c.txt
hellojerry hellotom
2、最终输出文件数据
最终输出文件的结果为:
[plain]viewplaincopy helloc.txt-->2b.txt-->2a.txt-->3 jerryc.txt-->1b.txt-->3a.txt-->1 tomc.txt-->1b.txt-->1a.txt-->2
二、倒排索引过程分析
根据输入文件数据和最终的输出文件结果可知,此程序需要利用两个MR实现,具体流程可总结归纳如下:
-------------第一步Mapper的输出结果格式如下:-------------------- context.wirte("hello->a.txt","1") context.wirte("hello->a.txt","1") context.wirte("hello->a.txt","1") context.wirte("hello->b.txt","1") context.wirte("hello->b.txt","1") context.wirte("hello->c.txt","1") context.wirte("hello->c.txt","1") -------------第一步Reducer的得到的输入数据格式如下:------------- <"hello->a.txt",{1,1,1}> <"hello->b.txt",{1,1}> <"hello->c.txt",{1,1}> -------------第一步Reducer的输出数据格式如下--------------------- context.write("hello->a.txt","3") context.write("hello->b.txt","2") context.write("hello->c.txt","2") -------------第二步Mapper得到的输入数据格式如下:----------------- context.write("hello->a.txt","3") context.write("hello->b.txt","2") context.write("hello->c.txt","2") -------------第二步Mapper输出的数据格式如下:-------------------- context.write("hello","a.txt->3") context.write("hello","b.txt->2") context.write("hello","c.txt->2") -------------第二步Reducer得到的输入数据格式如下:----------------- <"hello",{"a.txt->3","b.txt->2","c.txt->2"}> -------------第二步Reducer输出的数据格式如下:----------------- context.write("hello","a.txt->3b.txt->2c.txt->2") 最终结果为: helloa.txt->3b.txt->2c.txt->2
三、程序开发
3.1、第一步MR程序与输入输出
packagecom.lyz.hdfs.mr.ii; importjava.io.IOException; importorg.apache.commons.lang.StringUtils; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.input.FileSplit; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** *倒排索引第一步MapReduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中 *@authorliuyazhuang * */ publicclassInverseIndexStepOne{ /** *完成倒排索引第一步的mapper程序 *@authorliuyazhuang * */ publicstaticclassStepOneMapperextendsMapper{ @Override protectedvoidmap(LongWritablekey,Textvalue,Mapper .Contextcontext) throwsIOException,InterruptedException{ //获取一行数据 Stringline=value.toString(); //切分出每个单词 String[]fields=StringUtils.split(line,""); //获取数据的切片信息 FileSplitfileSplit=(FileSplit)context.getInputSplit(); //根据切片信息获取文件名称 StringfileName=fileSplit.getPath().getName(); for(Stringfield:fields){ context.write(newText(field+"-->"+fileName),newLongWritable(1)); } } } /** *完成倒排索引第一步的Reducer程序 *最终输出结果为: *hello-->a.txt3 hello-->b.txt2 hello-->c.txt2 jerry-->a.txt1 jerry-->b.txt3 jerry-->c.txt1 tom-->a.txt2 tom-->b.txt1 tom-->c.txt1 *@authorliuyazhuang * */ publicstaticclassStepOneReducerextendsReducer { @Override protectedvoidreduce(Textkey,Iterable values, Reducer .Contextcontext)throwsIOException,InterruptedException{ longcounter=0; for(LongWritablevalue:values){ counter+=value.get(); } context.write(key,newLongWritable(counter)); } } //运行第一步的MR程序 publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf); job.setJarByClass(InverseIndexStepOne.class); job.setMapperClass(StepOneMapper.class); job.setReducerClass(StepOneReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job,newPath("D:/hadoop_data/ii")); FileOutputFormat.setOutputPath(job,newPath("D:/hadoop_data/ii/result")); job.waitForCompletion(true); } }
3.1.1输入数据
a.txt
hellotom hellojerry hellotom
b.txt
hellojerry hellojerry tomjerry
c.txt
hellojerry hellotom
3.1.2
输出结果:
hello-->a.txt3 hello-->b.txt2 hello-->c.txt2 jerry-->a.txt1 jerry-->b.txt3 jerry-->c.txt1 tom-->a.txt2 tom-->b.txt1 tom-->c.txt1
3.2第二步MR程序与输入输出
packagecom.lyz.hdfs.mr.ii; importjava.io.IOException; importorg.apache.commons.lang.StringUtils; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** *倒排索引第二步MapReduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中 *@authorliuyazhuang * */ publicclassInverseIndexStepTwo{ /** *完成倒排索引第二步的mapper程序 * *从第一步MR程序中得到的输入信息为: *hello-->a.txt3 hello-->b.txt2 hello-->c.txt2 jerry-->a.txt1 jerry-->b.txt3 jerry-->c.txt1 tom-->a.txt2 tom-->b.txt1 tom-->c.txt1 *@authorliuyazhuang * */ publicstaticclassStepTwoMapperextendsMapper{ @Override protectedvoidmap(LongWritablekey,Textvalue,Mapper .Contextcontext) throwsIOException,InterruptedException{ Stringline=value.toString(); String[]fields=StringUtils.split(line,"\t"); String[]wordAndFileName=StringUtils.split(fields[0],"-->"); Stringword=wordAndFileName[0]; StringfileName=wordAndFileName[1]; longcounter=Long.parseLong(fields[1]); context.write(newText(word),newText(fileName+"-->"+counter)); } } /** *完成倒排索引第二步的Reducer程序 *得到的输入信息格式为: *<"hello",{"a.txt->3","b.txt->2","c.txt->2"}>, *最终输出结果如下: *helloc.txt-->2b.txt-->2a.txt-->3 jerryc.txt-->1b.txt-->3a.txt-->1 tomc.txt-->1b.txt-->1a.txt-->2 *@authorliuyazhuang * */ publicstaticclassStepTwoReducerextendsReducer { @Override protectedvoidreduce(Textkey,Iterable values,Reducer .Contextcontext) throwsIOException,InterruptedException{ Stringresult=""; for(Textvalue:values){ result+=value+""; } context.write(key,newText(result)); } } //运行第一步的MR程序 publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf); job.setJarByClass(InverseIndexStepTwo.class); job.setMapperClass(StepTwoMapper.class); job.setReducerClass(StepTwoReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,newPath("D:/hadoop_data/ii/result/part-r-00000")); FileOutputFormat.setOutputPath(job,newPath("D:/hadoop_data/ii/result/final")); job.waitForCompletion(true); } }
3.2.1输入数据
hello-->a.txt3 hello-->b.txt2 hello-->c.txt2 jerry-->a.txt1 jerry-->b.txt3 jerry-->c.txt1 tom-->a.txt2 tom-->b.txt1 tom-->c.txt1
3.2.2输出结果
helloc.txt-->2b.txt-->2a.txt-->3 jerryc.txt-->1b.txt-->3a.txt-->1 tomc.txt-->1b.txt-->1a.txt-->2
以上就是本文关于Hadoop编程基于MR程序实现倒排索引示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Hadoop对文本文件的快速全局排序实现方法及分析、hadoop重新格式化HDFS步骤解析、浅谈七种常见的Hadoop和Spark项目案例等,有什么问题可以直接留言,小编会及时回复大家的。感谢朋友们对本站的支持!