用Hadoop的各种语言进行wordcount(1)

本文永久链接:https://www.askmaclean.com/archives/hadoop-wordcount-1.html

dbDao.com 引导式IT在线教育 www.dbdao.com

dbdao技术论坛贴吧 tieba.baidu.com/dbdao

Hadoop 技术学习QQ群号 134115150

 

Hadoop的各种语言进行wordcount1

 

我稍微去调查了下Apache Crunch,顺便就在Hadoop中试着用各种语言来执行wordcount。首先是用MapReduceHadoopStreamingHivePig执行了wordcount

(追记):在github中放code:https://github.com/kawamon/wordcount.git

Wordcount的闲话

 

Wordcount经常在Hadoop的MapReduce的最开始的说明中使用,也有Hello World这样的意思。

 

Hadoop的MapReduce中,Wordcount作为样本拿来讲解的理由实在有点暧昧,大家肯定想问,为什么要拿wordcount来做样本呢(www.askmaclean.com)。

 

现在处理所谓的量很多的大数据时,有两个问题。

  1. 为了将存储中保存的大量数据能用CPU来读入处理,移动数据是非常费时间的
  2. 用1台机器来执行耗费的时间太长(量大到内存上无法搭载,或者1台的CPU无法处理)

那么让我们试着使用之前安装的Cloudera Quickstart VM来执行吧。

 

准备

首先在HDFS中复制测试用的数据。这次使用的是crunch的样本,使用的是两个单纯的文件(file01, file02)(这是为了更容易比较结果)。

 

$ hadoop fs -cat input/file01
Hello World Bye World

$ hadoop fs -cat input/file02

Hello Hadoop Goodbye Hadoop

 

 

 

MapReduce (Java)

 

 

首先是MapReduce (Java)。New API的WordCount。我参考了下述教程,但因为是Old API,所以需要做少许变更,请不要使用StringTokenizer(www.askmaclean.com)。

http://www.cloudera.com/content/cloudera/en/documentation/hadoop-tutorial/CDH5/Hadoop-Tutorial/ht_wordcount1_source.html

WordCount.java

 

package org.myorg;


import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;



import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



public class WordCount extends Configured implements Tool {



@Override

public int run(String[] args) throws Exception {



if (args.length != 2) {

System.out.printf(

"Usage: %s [generic options] <input dir> <output dir>\n", getClass()

.getSimpleName());

ToolRunner.printGenericCommandUsage(System.out);

return -1;

}



Job job = new Job(getConf());

job.setJarByClass(WordCount.class);

job.setJobName(this.getClass().getName());



FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));



job.setMapperClass(WordMapper.class);

job.setReducerClass(WordReducer.class);



job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);



job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);



if (job.waitForCompletion(true)) {

return 0;

}

return 1;

}



public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new WordCount(), args);

System.exit(exitCode);

}

}

 

WordMapper.java

 

package org.myorg;


import java.io.IOException;



import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;



public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();



public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String s = value.toString();

for (String w : s.split("\W+")) {

if (w.length() > 0) {

word.set(w);

context.write(word, one);

}

}

}

}

 

WordReducer.java

 

package org.myorg;


import java.io.IOException;



import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;



public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

int wordCount = 0;

for (IntWritable value : values) {

wordCount += value.get();

}

context.write(key, new IntWritable(wordCount));

}

}

 

Compile与执行

 

$ javac -classpath `hadoop classpath` org/myorg/*.java
Note: org/myorg/WordCount.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
[cloudera@quickstart mr_java]$ jar cvf wc.jar org/myorg/*.class
added manifest
adding: org/myorg/WordCount.class(in = 2253) (out= 1132)(deflated 49%)
adding: org/myorg/WordMapper.class(in = 1915) (out= 810)(deflated 57%)
adding: org/myorg/WordReducer.class(in = 1602) (out= 670)(deflated 58%)
[cloudera@quickstart mr_java]$ hadoop jar wc.jar org.myorg.WordCount input output
14/12/14 05:08:37 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 05:08:37 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 05:08:38 INFO mapreduce.JobSubmitter: number of splits:2
14/12/14 05:08:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0014
14/12/14 05:08:38 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0014
14/12/14 05:08:38 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0014/
14/12/14 05:08:38 INFO mapreduce.Job: Running job: job_1418545807639_0014
14/12/14 05:08:51 INFO mapreduce.Job: Job job_1418545807639_0014 running in uber mode : false
14/12/14 05:08:51 INFO mapreduce.Job:  map 0% reduce 0%
14/12/14 05:09:01 INFO mapreduce.Job:  map 100% reduce 0%
14/12/14 05:09:11 INFO mapreduce.Job:  map 100% reduce 100%
14/12/14 05:09:12 INFO mapreduce.Job: Job job_1418545807639_0014 completed successfully
14/12/14 05:09:12 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=87
FILE: Number of bytes written=319063
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=296
HDFS: Number of bytes written=41
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=13884
Total time spent by all reduces in occupied slots (ms)=4067
Total time spent by all map tasks (ms)=13884
Total time spent by all reduce tasks (ms)=4067
Total vcore-seconds taken by all map tasks=13884
Total vcore-seconds taken by all reduce tasks=4067
Total megabyte-seconds taken by all map tasks=14217216
Total megabyte-seconds taken by all reduce tasks=4164608
Map-Reduce Framework
Map input records=2
Map output records=8
Map output bytes=82
Map output materialized bytes=101
Input split bytes=246
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=101
Reduce input records=8
Reduce output records=5
Spilled Records=16
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=232
CPU time spent (ms)=2460
Physical memory (bytes) snapshot=700850176
Virtual memory (bytes) snapshot=2683498496
Total committed heap usage (bytes)=510656512
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=50
File Output Format Counters
Bytes Written=41

 

 

结果

$ hadoop fs -cat output/part-r-00000
Bye 1
Goodbye 1
Hadoop  2
Hello   2
World   2

 

HadoopStreaming

 

mapper.pl

#!/usr/bin/env perl
while (<>) {
chomp();
(@words) = split /\W+/;
foreach $w (@words) {
print "$w\t1\n"
}
}

 

reduce.pl

#!/usr/bin/env perl
$sum = 0;
$last = "";
while(<>) {
chomp;
($key,$value) = split /\t/;
$last = $key if $last eq "";
if ($last ne $key) {
print "$last\t$sum\n";
$last = $key;
$sum = 0;
}
$sum += $value;
}
print "$key\t$sum\n";

 

执行

$ $ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -mapper mapper.pl -reducer reduce.pl -file mapper.pl -file reduce.pl -input input -output streamoutput
14/12/14 05:53:58 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.pl, reduce.pl] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar] /tmp/streamjob8660928725375064201.jar tmpDir=null
14/12/14 05:53:59 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 05:54:00 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032
14/12/14 05:54:01 INFO mapred.FileInputFormat: Total input paths to process : 2
14/12/14 05:54:01 INFO mapreduce.JobSubmitter: number of splits:3
14/12/14 05:54:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0025
14/12/14 05:54:01 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0025
14/12/14 05:54:01 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0025/
14/12/14 05:54:01 INFO mapreduce.Job: Running job: job_1418545807639_0025
14/12/14 05:54:13 INFO mapreduce.Job: Job job_1418545807639_0025 running in uber mode : false
14/12/14 05:54:13 INFO mapreduce.Job:  map 0% reduce 0%
14/12/14 05:54:26 INFO mapreduce.Job:  map 100% reduce 0%
14/12/14 05:54:36 INFO mapreduce.Job:  map 100% reduce 100%
14/12/14 05:54:37 INFO mapreduce.Job: Job job_1418545807639_0025 completed successfully
14/12/14 05:54:37 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=83
FILE: Number of bytes written=437439
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=383
HDFS: Number of bytes written=41
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=33174
Total time spent by all reduces in occupied slots (ms)=3734
Total time spent by all map tasks (ms)=33174
Total time spent by all reduce tasks (ms)=3734
Total vcore-seconds taken by all map tasks=33174
Total vcore-seconds taken by all reduce tasks=3734
Total megabyte-seconds taken by all map tasks=33970176
Total megabyte-seconds taken by all reduce tasks=3823616
Map-Reduce Framework
Map input records=2
Map output records=8
Map output bytes=66
Map output materialized bytes=119
Input split bytes=330
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=119
Reduce input records=8
Reduce output records=5
Spilled Records=16
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=222
CPU time spent (ms)=3050
Physical memory (bytes) snapshot=967741440
Virtual memory (bytes) snapshot=3570974720
Total committed heap usage (bytes)=719847424
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=53
File Output Format Counters
Bytes Written=41
14/12/14 05:54:37 INFO streaming.StreamJob: Output directory: streamoutput

 

结果

$ hadoop fs -cat streamoutput/part-00000
Bye 1
Goodbye 1
Hadoop  2
Hello   2
World   2

 

Hive

 

接下来是hive。请参考programming hive。在hive中将text文件作为外部表(www.askmaclean.com)。

HiveQL

DROP TABLE docs;
CREATE EXTERNAL TABLE docs (line STRING) LOCATION '/user/cloudera/input';
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, ' ')) AS word FROM docs) w GROUP BY word ORDER BY word;

 

 

执行与结果

 

hive> SELECT word, count(1) AS count FROM
>   (SELECT explode(split(line, ' ')) AS word FROM docs) w GROUP BY word
> ORDER BY word;
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1418545807639_0019, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0019/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1418545807639_0019
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-12-14 05:23:53,107 Stage-1 map = 0%,  reduce = 0%
2014-12-14 05:24:00,570 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.29 sec
2014-12-14 05:24:10,901 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.49 sec
MapReduce Total cumulative CPU time: 2 seconds 490 msec
Ended Job = job_1418545807639_0019
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1418545807639_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1418545807639_0020
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2014-12-14 05:24:26,302 Stage-2 map = 0%,  reduce = 0%
2014-12-14 05:24:33,842 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 1.04 sec
2014-12-14 05:24:43,114 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.26 sec
MapReduce Total cumulative CPU time: 2 seconds 260 msec
Ended Job = job_1418545807639_0020
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.49 sec   HDFS Read: 337 HDFS Write: 217 SUCCESS
Stage-Stage-2: Map: 1  Reduce: 1   Cumulative CPU: 2.26 sec   HDFS Read: 594 HDFS Write: 41 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 750 msec
OK
Bye 1
Goodbye 1
Hadoop  2
Hello   2
World   2
Time taken: 66.765 seconds, Fetched: 5 row(s)
hive>

 

Pig

 

然后是pig。

PigLatin Script

 

docs = LOAD '/user/cloudera/input' AS (line:chararray);
words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word;
groupd = GROUP words BY word;
wordcount = FOREACH groupd GENERATE group, COUNT(words);
DUMP wordcount;

 

执行与结果

grunt> docs = LOAD '/user/cloudera/input' AS (line:chararray);
grunt> words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word;
grunt> groupd = GROUP words BY word;
grunt> wordcount = FOREACH groupd GENERATE group, COUNT(words);
grunt> DUMP wordcount;
2014-12-14 05:27:00,067 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY
2014-12-14 05:27:00,112 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
2014-12-14 05:27:00,230 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
(略)
2014-12-14 05:27:42,341 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2014-12-14 05:27:42,341 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(Bye,1)
(Hello,2)
(World,2)
(Hadoop,2)
(Goodbye,1)
grunt>

 

 

因为太长了,明天我将继续讲解。

 

Comment

*

沪ICP备14014813号

沪公网安备 31010802001379号