Hadoop MapReduce多输出详细介绍
HadoopMapReduce多输出
FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制或让每个reducer输出多个文件。MapReduce为此提供了MultipleOutputFormat类。
MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的mapper)创建多个文件。采用name-r-nnnnn形式的文件名用于map输出,name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指名块号的整数(从0开始)。块号保证从不同块(mapper或者reducer)写的输出在相同名字情况下不会冲突。
1.重定义输出文件名
我们可以对输出的文件名进行控制。考虑这样一个需求:按男女性别来区分度假订单数据。这需要运行一个作业,作业的输出是男女各一个文件,此文件包含男女性别的所有数据记录。
这个需求可以使用MultipleOutputs来实现:
packagecom.sjf.open.test; importjava.io.IOException; importorg.apache.commons.lang3.StringUtils; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.conf.Configured; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.NullWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.io.compress.CompressionCodec; importorg.apache.hadoop.io.compress.GzipCodec; importorg.apache.hadoop.mapred.JobPriority; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.input.FileSplit; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.mapreduce.lib.output.MultipleOutputs; importorg.apache.hadoop.util.Tool; importorg.apache.hadoop.util.ToolRunner; importcom.sjf.open.utils.ConfigUtil; /** *Createdbyxiaosion16-11-7. */ publicclassVacationOrderBySexextendsConfiguredimplementsTool{ publicstaticvoidmain(String[]args)throwsException{ intstatus=ToolRunner.run(newVacationOrderBySex(),args); System.exit(status); } publicstaticclassVacationOrderBySexMapperextendsMapper<LongWritable,Text,Text,Text>{ publicStringfInputPath=""; @Override protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{ super.setup(context); fInputPath=((FileSplit)context.getInputSplit()).getPath().toString(); } @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ Stringline=value.toString(); if(fInputPath.contains("vacation_hot_country_order")){ String[]params=line.split("\t"); Stringsex=params[2]; if(StringUtils.isBlank(sex)){ return; } context.write(newText(sex.toLowerCase()),value); } } } publicstaticclassVacationOrderBySexReducerextendsReducer<Text,Text,NullWritable,Text>{ privateMultipleOutputs<NullWritable,Text>multipleOutputs; @Override protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{ multipleOutputs=newMultipleOutputs<NullWritable,Text>(context); } @Override protectedvoidreduce(Textkey,Iterable<Text>values,Contextcontext) throwsIOException,InterruptedException{ for(Textvalue:values){ multipleOutputs.write(NullWritable.get(),value,key.toString()); } } @Override protectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{ multipleOutputs.close(); } } @Override publicintrun(String[]args)throwsException{ if(args.length!=2){ System.err.println("./run<input><output>"); System.exit(1); } StringinputPath=args[0]; StringoutputPath=args[1]; intnumReduceTasks=16; Configurationconf=this.getConf(); conf.setBoolean("mapred.output.compress",true); conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class); Jobjob=Job.getInstance(conf); job.setJobName("vacation_order_by_jifeng.si"); job.setJarByClass(VacationOrderBySex.class); job.setMapperClass(VacationOrderBySexMapper.class); job.setReducerClass(VacationOrderBySexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,newPath(outputPath)); job.setNumReduceTasks(numReduceTasks); booleansuccess=job.waitForCompletion(true); returnsuccess?0:1; } }
在生成输出的reduce中,在setup()方法中构造一个MultipleOutputs的实例并将它赋予一个实例变量。在reduce()方法中使用MultipleOutputs实例来写输出,而不是context。write()方法作用于键,值和名字。这里使用的是性别作为名字,因此最后产生的输出名称的形式为sex-r-nnnnn:
-rw-r--r--3wirelessdevwirelessdev02016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r--3wirelessdevwirelessdev885742016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz -rw-r--r--3wirelessdevwirelessdev609652016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0610:41tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz
我们可以看到在输出文件中不仅有我们想要的输出文件类型,还有part-r-nnnnn形式的文件,但是文件内没有信息,这是程序默认的输出文件。所以我们在指定输出文件名称时(name-r-nnnnn),不要指定name为part,因为它已经被使用为默认值了。
2.多目录输出
在MultipleOutputs的write()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录。例如,我们改动上面的需求:按男女性别来区分度假订单数据,不同性别数据位于不同子目录(例如:sex=f/part-r-00000)。
publicstaticclassVacationOrderBySexReducerextendsReducer<Text,Text,NullWritable,Text>{ privateMultipleOutputs<NullWritable,Text>multipleOutputs; @Override protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{ multipleOutputs=newMultipleOutputs<NullWritable,Text>(context); } @Override protectedvoidreduce(Textkey,Iterable<Text>values,Contextcontext) throwsIOException,InterruptedException{ for(Textvalue:values){ StringbasePath=String.format("sex=%s/part",key.toString()); multipleOutputs.write(NullWritable.get(),value,basePath); } } @Override protectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{ multipleOutputs.close(); } }
后产生的输出名称的形式为sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:
-rw-r--r--3wirelessdevwirelessdev02016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz -rw-r--r--3wirelessdevwirelessdev202016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz drwxr-xr-x-wirelessdevwirelessdev02016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f drwxr-xr-x-wirelessdevwirelessdev02016-12-0612:26tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m
3.延迟输出
FileOutputFormat的子类会产生输出文件(part-r-nnnnn),即使文件是空的,也会产生。我们有时候不想要这些空的文件,我们可以使用LazyOutputFormat进行处理。它是一个封装输出格式,可以指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法即可:
Configurationconf=this.getConf(); Jobjob=Job.getInstance(conf); LazyOutputFormat.setOutputFormatClass(job,TextOutputFormat.class);
再次检查一下我们的输出文件(第一个例子):
sudo-uwirelessdevhadoopfs-lstmp/data_group/order/vacation_hot_country_order_by_sex/ Found3items -rw-r--r--3wirelessdevwirelessdev02016-12-0613:36tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r--3wirelessdevwirelessdev885742016-12-0613:36tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz -rw-r--r--3wirelessdevwirelessdev609652016-12-0613:36tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!