hadoop的wordcount实例代码
可以通过一个简单的例子来说明MapReduce到底是什么:
我们要统计一个大文件中的各个单词出现的次数。由于文件太大。我们把这个文件切分成如果小文件,然后安排多个人去统计。这个过程就是”Map”。然后把每个人统计的数字合并起来,这个就是“Reduce"。
上面的例子如果在MapReduce去做呢,就需要创建一个任务job,由job把文件切分成若干独立的数据块,并分布在不同的机器节点中。然后通过分散在不同节点中的Map任务以完全并行的方式进行处理。MapReduce会对Map的输出地行收集,再将结果输出送给Reduce进行下一步的处理。
对于一个任务的具体执行过程,会有一个名为"JobTracker"的进程负责协调MapReduce执行过程中的所有任务。若干条TaskTracker进程用来运行单独的Map任务,并随时将任务的执行情况汇报给JobTracker。如果一个TaskTracker汇报任务失败或者长时间未对本身任务进行汇报,JobTracker会启动另外一个TaskTracker重新执行单独的Map任务。
下面的具体的代码实现:
1.编写wordcount的相关job
(1)eclipse下创建相关maven项目,依赖jar包如下(也可参照hadoop源码包下的hadoop-mapreduce-examples项目的pom配置)
注意:要配置一个maven插件maven-jar-plugin,并指定mainClass
junit junit 4.11 org.apache.hadoop hadoop-mapreduce-client-core 2.5.2 org.apache.hadoop hadoop-common 2.5.2 org.apache.maven.plugins maven-jar-plugin com.xxx.demo.hadoop.wordcount.WordCount
(2)根据MapReduce的运行机制,一个job至少要编写三个类分别用来完成Map逻辑、Reduce逻辑、作业调度这三件事。
Map的代码可继承org.apache.hadoop.mapreduce.Mapper类
publicstaticclassTokenizerMapper extendsMapper
Reduce的代码可继承org.apache.hadoop.mapreduce.Reducer类
publicclassIntSumReducer extendsReducer{ privateIntWritableresult=newIntWritable(); publicvoidreduce(Textkey,Iterable values, Contextcontext )throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum); context.write(key,result); } }
编写main方法进行作业调度
publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,newPath(args[0])); FileOutputFormat.setOutputPath(job,newPath(args[1])); job.waitForCompletion(true); //System.exit(job.waitForCompletion(true)?0:1); }
2.上传数据文件到hadoop集群环境
执行mvninstall把项目打成jar文件然后上传到linux集群环境,使用hdfsdfs-mkdir命令在hdfs文件系统中创建相应的命令,使用hdfsdfs-put把需要处理的数据文件上传到hdfs系统中,示例:hdfsdfs-put${linux_path/数据文件}${hdfs_path}
3.执行job
在集群环境中执行命令:hadoopjar${linux_path}/wordcount.jar${hdfs_input_path}${hdfs_output_path}
4.查看统计结果
hdfsdfs-cat${hdfs_output_path}/输出文件名
以上的方式在未启动hadoop集群环境时,是以Local模式运行,此时HDFS和YARN都不起作用。下面是在伪分布式模式下执行mapreducejob时需要做的工作,先把官网上列的步骤摘录出来:
配置主机名
#vi/etc/sysconfig/network
例如:
NETWORKING=yes HOSTNAME=master vi/etc/hosts
填入以下内容
127.0.0.1localhost
配置ssh免密码互通
ssh-keygen-trsa
#cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys
配置core-site.xml文件(位于${HADOOP_HOME}/etc/hadoop/
fs.defaultFS hdfs://localhost:9000
配置hdfs-site.xml文件
dfs.replication 1
下面的命令可以在单机伪分布模式下运行mapreduce的job
1.Formatthefilesystem:
$bin/hdfsnamenode-format
2.StartNameNodedaemonandDataNodedaemon:
$sbin/start-dfs.sh
3.Thehadoopdaemonlogoutputiswrittentothe$HADOOP_LOG_DIRdirectory(defaultsto$HADOOP_HOME/logs).4.BrowsethewebinterfacefortheNameNode;bydefaultitisavailableat:
NameNode-http://localhost:50070/
MaketheHDFSdirectoriesrequiredtoexecuteMapReducejobs:
$bin/hdfsdfs-mkdir/user
$bin/hdfsdfs-mkdir/user/
5.Copytheinputfilesintothedistributedfilesystem:
$bin/hdfsdfs-putetc/hadoopinput
6.Runsomeoftheexamplesprovided:
$bin/hadoopjarshare/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jargrepinputoutput'dfs[a-z.]+'
7.Examinetheoutputfiles:
Copytheoutputfilesfromthedistributedfilesystemtothelocalfilesystemandexaminethem:$bin/hdfsdfs-getoutputoutput
$catoutput/*
orViewtheoutputfilesonthedistributedfilesystem:
$bin/hdfsdfs-catoutput/*
8.Whenyou'redone,stopthedaemonswith:
$sbin/stop-dfs.sh
总结
以上就是本文关于hadoop的wordcount实例代码的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!