java 中自定义OutputFormat的实例详解
java中自定义OutputFormat的实例详解
实例代码:
packagecom.ccse.hadoop.outputformat; importjava.io.IOException; importjava.net.URI; importjava.net.URISyntaxException; importjava.util.StringTokenizer; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FSDataOutputStream; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.JobContext; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.OutputCommitter; importorg.apache.hadoop.mapreduce.OutputFormat; importorg.apache.hadoop.mapreduce.RecordWriter; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.TaskAttemptContext; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; publicclassMySelfOutputFormatApp{ publicfinalstaticStringINPUT_PATH="hdfs://chaoren1:9000/mapinput"; publicfinalstaticStringOUTPUT_PATH="hdfs://chaoren1:9000/mapoutput"; publicfinalstaticStringOUTPUT_FILENAME="/abc"; publicstaticvoidmain(String[]args)throwsIOException,URISyntaxException, ClassNotFoundException,InterruptedException{ Configurationconf=newConfiguration(); FileSystemfileSystem=FileSystem.get(newURI(OUTPUT_PATH),conf); fileSystem.delete(newPath(OUTPUT_PATH),true); Jobjob=newJob(conf,MySelfOutputFormatApp.class.getSimpleName()); job.setJarByClass(MySelfOutputFormatApp.class); FileInputFormat.setInputPaths(job,newPath(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(MyselfOutputFormat.class); job.waitForCompletion(true); } publicstaticclassMyMapperextendsMapper{ privateTextword=newText(); privateLongWritablewritable=newLongWritable(1); @Override protectedvoidmap(LongWritablekey,Textvalue, Mapper .Contextcontext) throwsIOException,InterruptedException{ if(value!=null){ Stringline=value.toString(); StringTokenizertokenizer=newStringTokenizer(line); while(tokenizer.hasMoreTokens()){ word.set(tokenizer.nextToken()); context.write(word,writable); } } } } publicstaticclassMyReducerextendsReducer { @Override protectedvoidreduce(Textkey,Iterable values, Reducer .Contextcontext) throwsIOException,InterruptedException{ longsum=0; for(LongWritablevalue:values){ sum+=value.get(); } context.write(key,newLongWritable(sum)); } } publicstaticclassMyselfOutputFormatextendsOutputFormat { privateFSDataOutputStreamoutputStream=null; @Override publicRecordWriter getRecordWriter( TaskAttemptContextcontext)throwsIOException, InterruptedException{ try{ FileSystemfileSystem=FileSystem.get(newURI(MySelfOutputFormatApp.OUTPUT_PATH),context.getConfiguration()); //指定文件的输出路径 finalPathpath=newPath(MySelfOutputFormatApp.OUTPUT_PATH +MySelfOutputFormatApp.OUTPUT_FILENAME); this.outputStream=fileSystem.create(path,false); }catch(URISyntaxExceptione){ e.printStackTrace(); } returnnewMySelfRecordWriter(outputStream); } @Override publicvoidcheckOutputSpecs(JobContextcontext)throwsIOException, InterruptedException{ } @Override publicOutputCommittergetOutputCommitter(TaskAttemptContextcontext) throwsIOException,InterruptedException{ returnnewFileOutputCommitter(newPath(MySelfOutputFormatApp.OUTPUT_PATH),context); } } publicstaticclassMySelfRecordWriterextendsRecordWriter { privateFSDataOutputStreamoutputStream=null; publicMySelfRecordWriter(FSDataOutputStreamoutputStream){ this.outputStream=outputStream; } @Override publicvoidwrite(Textkey,LongWritablevalue)throwsIOException, InterruptedException{ this.outputStream.writeBytes(key.toString()); this.outputStream.writeBytes("\t"); this.outputStream.writeLong(value.get()); } @Override publicvoidclose(TaskAttemptContextcontext)throwsIOException, InterruptedException{ this.outputStream.close(); } } }
2.OutputFormat是用于处理各种输出目的地的。
2.1OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。
2.2RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。
2.3RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。
以上就是java中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!