Hadoop streaming详细介绍
Hadoopstreaming
Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是HadoopstreamingAPI。Hadoopstreaming使用Unix的standardstreams作为我们mapreduce程序和MapReduce框架之间的接口。所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standardinput/output上进行读写。
streamming是天然适用于文字处理的(textprocessing),当然,也仅适用纯文本的处理,对于需要对象和序列化的场景,hadoopstreaming无能为力。它力图使我们能够快捷的通过各种脚本语言,快速的处理大量的文本文件。以下是steaming的一些特点:
- Map函数的输入是通过standinput一行一行的接收数据的。(不像JavaAPI,通过InputFormat类做预处理,使得Map函数的输入是有Key和value的)
- Map函数的output则必须限定为key-valuepair,key和value之间用\t分开。(MapReduce框架在处理intermediate的Map输出时,必须做sort和partition,即shuffle)
- Reduce函数的input是Map函数的output也是key-valuepair,key和value之间用\t分开。
常用的Streaming编程语言:
- bashshell
- ruby
- python
Ruby
下面是一个Ruby编写的MapReduce程序的示例:
map
max_temperature_map.rb:
ruby #!/usr/bin/envruby STDIN.each_linedo|line| val=line year,temp,q=val[15,4],val[87,5],val[92,1] puts"#{year}\t#{temp}"if(temp!="+9999"&&q=~/[01459]/) end
- 从标准输入读入一行data。
- 处理数据之后,生成一个键值对,用\t分隔,输出到标准输出
reduce
max_temperature_reduce.rb:
ruby #!/usr/bin/envruby last_key,max_val=nil,-1000000 STDIN.each_linedo|line| key,val=line.split("\t") iflast_key&&last_key!=key puts"#{last_key}\t#{max_val}" last_key,max_val=key,val.to_i else last_key,max_val=key,[max_val,val.to_i].max end end puts"#{last_key}\t#{max_val}"iflast_key
- 从标准输入读入一行数据
- 数据是用\t分隔的键值对
- 数据是被MapReduce根据key排序之后顺序一行一行读入
- reduce函数对数据进行处理,并输出,输出仍是用\t分隔的键值对
运行
%hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar\ -inputinput/ncdc/sample.txt\ -outputoutput\ -mapperch02/src/main/ruby/max_temperature_map.rb\ -reducerch02/src/main/ruby/max_temperature_reduce.rb
- hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar指明了使用hadoopstreaming
- hadoop-*-streaming.jar会将input里的文件,一行一行的输出到标准输出。
- 用-mapper指定Map函数。类似于通过管道将数据传给rb文件:data|ch02/src/main/ruby/max_temperature_map.rb
- -reducer指定Reduce函数。
Python
Map
#!/usr/bin/envpython importre importsys forlineinsys.stdin: val=line.strip() (year,temp,q)=(val[15:19],val[87:92],val[92:93]) if(temp!="+9999"andre.match("[01459]",q)): print"%s\t%s"%(year,temp)
Reduce
#!/usr/bin/envpython importsys (last_key,max_val)=(None,-sys.maxint) forlineinsys.stdin: (key,val)=line.strip().split("\t") iflast_keyandlast_key!=key: print"%s\t%s"%(last_key,max_val) (last_key,max_val)=(key,int(val)) else: (last_key,max_val)=(key,max(max_val,int(val))) iflast_key: print"%s\t%s"%(last_key,max_val)
运行
%hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar\ -inputinput/ncdc/sample.txt\ -outputoutput\ -mapperch02/src/main/ruby/max_temperature_map.py\ -reducerch02/src/main/ruby/max_temperature_reduce.py
Bashshell
Map
#!/usr/bin/envbash #NLineInputFormatgivesasingleline:keyisoffset,valueisS3URI readoffsets3file #RetrievefilefromS3tolocaldisk echo"reporter:status:Retrieving$s3file">&2 $HADOOP_INSTALL/bin/hadoopfs-get$s3file. #Un-bzipandun-tarthelocalfile target=`basename$s3file.tar.bz2` mkdir-p$target echo"reporter:status:Un-tarring$s3fileto$target">&2 tarjxf`basename$s3file`-C$target #Un-gzipeachstationfileandconcatintoonefile echo"reporter:status:Un-gzipping$target">&2 forfilein$target/*/* do gunzip-c$file>>$target.all echo"reporter:status:Processed$file">&2 done #PutgzippedversionintoHDFS echo"reporter:status:Gzipping$targetandputtinginHDFS">&2 gzip-c$target.all|$HADOOP_INSTALL/bin/hadoopfs-put-gz/$target.gz
运行
%hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar\ -Dmapred.reduce.tasks=0\ -Dmapred.map.tasks.speculative.execution=false\ -Dmapred.task.timeout=12000000\ -inputncdc_files.txt\ -inputformatorg.apache.hadoop.mapred.lib.NLineInputFormat\ -outputoutput\ -mapperload_ncdc_map.sh\ -fileload_ncdc_map.sh
- 这里的-Dmapred.reduce.tasks=0将reducetask观掉,因此也不需要设置-reducer
- 只使用Mapper,可以通过MapReduce帮助我们并行的完成一些平时只能串行的shell脚本
- 注意这里的-file,在集群模式下,需要并行运行时,需要-file把文件传输到其他节点
Combiner
在streaming模式下,仍然可以运行Combiner,两种方法:
- 通过Java编写一个combiner的函数,并使用-combineroption
- 以命令行的管道模式完成combiner的任务
这里具体解释第二种方法:
%hadoopjar$HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar\ -inputinput/ncdc/all\ -outputoutput\ -mapper"ch02/src/main/ruby/max_temperature_map.rb|sort| ch02/src/main/ruby/max_temperature_reduce.rb"\ -reducerch02/src/main/ruby/max_temperature_reduce.rb\ -filech02/src/main/ruby/max_temperature_map.rb\ -filech02/src/main/ruby/max_temperature_reduce.rb
注意看-mapper这一行,通关管道的方式,把mapper的临时输出文件(intermediatefile,Map完成后的临时文件)作为输入,送到sort进行排序,然后送到reduce脚本,来完成类似于combiner的工作。这时候的输出才真正的作为shuffle的输入,被分组并在网络上发送到Reduce
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!