用Hadoop的各种语言进行wordcount(1)

本文永久链接:https://www.askmaclean.com/archives/hadoop-wordcount-1.html

 

 

Hadoop的各种语言进行wordcount1

 

我稍微去调查了下Apache Crunch,顺便就在Hadoop中试着用各种语言来执行wordcount。首先是用MapReduceHadoopStreamingHivePig执行了wordcount

(追记):在github中放code:https://github.com/kawamon/wordcount.git

 

 

Wordcount的闲话

 

Wordcount经常在Hadoop的MapReduce的最开始的说明中使用,也有Hello World这样的意思。

 

Hadoop的MapReduce中,Wordcount作为样本拿来讲解的理由实在有点暧昧,大家肯定想问,为什么要拿wordcount来做样本呢。

 

现在处理所谓的量很多的大数据时,有两个问题。

  1. 为了将存储中保存的大量数据能用CPU来读入处理,移动数据是非常费时间的
  2. 用1台机器来执行耗费的时间太长(量大到内存上无法搭载,或者1台的CPU无法处理)

那么让我们试着使用之前安装的Cloudera Quickstart VM来执行吧。

 

准备

首先在HDFS中复制测试用的数据。这次使用的是crunch的样本,使用的是两个单纯的文件(file01, file02)(这是为了更容易比较结果)。

 

$ hadoop fs -cat input/file01
Hello World Bye World

$ hadoop fs -cat input/file02

Hello Hadoop Goodbye Hadoop

 

 

 

MapReduce (Java)

 

 

首先是MapReduce (Java)。New API的WordCount。我参考了下述教程,但因为是Old API,所以需要做少许变更,请不要使用StringTokenizer。

http://www.cloudera.com/content/cloudera/en/documentation/hadoop-tutorial/CDH5/Hadoop-Tutorial/ht_wordcount1_source.html

 

 

WordCount.java

 

package org.myorg;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class WordCount extends Configured implements Tool {



@Override

public int run(String[] args) throws Exception {



if (args.length != 2) {

System.out.printf(

"Usage: %s [generic options] <input dir> <output dir>\n", getClass()

.getSimpleName());

ToolRunner.printGenericCommandUsage(System.out);

return -1;

}



Job job = new Job(getConf());

job.setJarByClass(WordCount.class);

job.setJobName(this.getClass().getName());



FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));



job.setMapperClass(WordMapper.class);

job.setReducerClass(WordReducer.class);



job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);



job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);



if (job.waitForCompletion(true)) {

return 0;

}

return 1;

}



public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new WordCount(), args);

System.exit(exitCode);

}

}

 

WordMapper.java

 

package org.myorg;


import java.io.IOException;



import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;



public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();



public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String s = value.toString();

for (String w : s.split("\W+")) {

if (w.length() > 0) {

word.set(w);

context.write(word, one);

}

}

}

}

 

WordReducer.java

 

package org.myorg;


import java.io.IOException;



import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;



public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

int wordCount = 0;

for (IntWritable value : values) {

wordCount += value.get();

}

context.write(key, new IntWritable(wordCount));

}

}

 

Compile与执行

 

$ javac -classpath `hadoop classpath` org/myorg/*.java
Note: org/myorg/WordCount.java uses or overrides a deprecated API.

Note: Recompile with -Xlint:deprecation for details.

[cloudera@quickstart mr_java]$ jar cvf wc.jar org/myorg/*.class

added manifest

adding: org/myorg/WordCount.class(in = 2253) (out= 1132)(deflated 49%)

adding: org/myorg/WordMapper.class(in = 1915) (out= 810)(deflated 57%)

adding: org/myorg/WordReducer.class(in = 1602) (out= 670)(deflated 58%)

[cloudera@quickstart mr_java]$ hadoop jar wc.jar org.myorg.WordCount input output

14/12/14 05:08:37 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032

14/12/14 05:08:37 INFO input.FileInputFormat: Total input paths to process : 2

14/12/14 05:08:38 INFO mapreduce.JobSubmitter: number of splits:2

14/12/14 05:08:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0014

14/12/14 05:08:38 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0014

14/12/14 05:08:38 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0014/

14/12/14 05:08:38 INFO mapreduce.Job: Running job: job_1418545807639_0014

14/12/14 05:08:51 INFO mapreduce.Job: Job job_1418545807639_0014 running in uber mode : false

14/12/14 05:08:51 INFO mapreduce.Job:  map 0% reduce 0%

14/12/14 05:09:01 INFO mapreduce.Job:  map 100% reduce 0%

14/12/14 05:09:11 INFO mapreduce.Job:  map 100% reduce 100%

14/12/14 05:09:12 INFO mapreduce.Job: Job job_1418545807639_0014 completed successfully

14/12/14 05:09:12 INFO mapreduce.Job: Counters: 49

File System Counters

FILE: Number of bytes read=87

FILE: Number of bytes written=319063

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

HDFS: Number of bytes read=296

HDFS: Number of bytes written=41

HDFS: Number of read operations=9

HDFS: Number of large read operations=0

HDFS: Number of write operations=2

Job Counters

Launched map tasks=2

Launched reduce tasks=1

Data-local map tasks=2

Total time spent by all maps in occupied slots (ms)=13884

Total time spent by all reduces in occupied slots (ms)=4067

Total time spent by all map tasks (ms)=13884

Total time spent by all reduce tasks (ms)=4067

Total vcore-seconds taken by all map tasks=13884

Total vcore-seconds taken by all reduce tasks=4067

Total megabyte-seconds taken by all map tasks=14217216

Total megabyte-seconds taken by all reduce tasks=4164608

Map-Reduce Framework

Map input records=2

Map output records=8

Map output bytes=82

Map output materialized bytes=101

Input split bytes=246

Combine input records=0

Combine output records=0

Reduce input groups=5

Reduce shuffle bytes=101

Reduce input records=8

Reduce output records=5

Spilled Records=16

Shuffled Maps =2

Failed Shuffles=0

Merged Map outputs=2

GC time elapsed (ms)=232

CPU time spent (ms)=2460

Physical memory (bytes) snapshot=700850176

Virtual memory (bytes) snapshot=2683498496

Total committed heap usage (bytes)=510656512

Shuffle Errors

BAD_ID=0

CONNECTION=0

IO_ERROR=0

WRONG_LENGTH=0

WRONG_MAP=0

WRONG_REDUCE=0

File Input Format Counters

Bytes Read=50

File Output Format Counters

Bytes Written=41

 

 

结果

$ hadoop fs -cat output/part-r-00000
Bye 1

Goodbye 1

Hadoop  2

Hello   2

World   2

 

HadoopStreaming

 

mapper.pl

#!/usr/bin/env perl


while (<>) {

chomp();

(@words) = split /\W+/;

foreach $w (@words) {

print "$w\t1\n"

}

}

 

reduce.pl

#!/usr/bin/env perl


$sum = 0;

$last = "";

while(<>) {

chomp;

($key,$value) = split /\t/;

$last = $key if $last eq "";

if ($last ne $key) {

print "$last\t$sum\n";

$last = $key;

$sum = 0;

}

$sum += $value;

}

print "$key\t$sum\n";

 

执行

$ $ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -mapper mapper.pl -reducer reduce.pl -file mapper.pl -file reduce.pl -input input -output streamoutput
14/12/14 05:53:58 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.

packageJobJar: [mapper.pl, reduce.pl] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar] /tmp/streamjob8660928725375064201.jar tmpDir=null

14/12/14 05:53:59 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032

14/12/14 05:54:00 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032

14/12/14 05:54:01 INFO mapred.FileInputFormat: Total input paths to process : 2

14/12/14 05:54:01 INFO mapreduce.JobSubmitter: number of splits:3

14/12/14 05:54:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0025

14/12/14 05:54:01 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0025

14/12/14 05:54:01 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0025/

14/12/14 05:54:01 INFO mapreduce.Job: Running job: job_1418545807639_0025

14/12/14 05:54:13 INFO mapreduce.Job: Job job_1418545807639_0025 running in uber mode : false

14/12/14 05:54:13 INFO mapreduce.Job:  map 0% reduce 0%

14/12/14 05:54:26 INFO mapreduce.Job:  map 100% reduce 0%

14/12/14 05:54:36 INFO mapreduce.Job:  map 100% reduce 100%

14/12/14 05:54:37 INFO mapreduce.Job: Job job_1418545807639_0025 completed successfully

14/12/14 05:54:37 INFO mapreduce.Job: Counters: 49

File System Counters

FILE: Number of bytes read=83

FILE: Number of bytes written=437439

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

HDFS: Number of bytes read=383

HDFS: Number of bytes written=41

HDFS: Number of read operations=12

HDFS: Number of large read operations=0

HDFS: Number of write operations=2

Job Counters

Launched map tasks=3

Launched reduce tasks=1

Data-local map tasks=3

Total time spent by all maps in occupied slots (ms)=33174

Total time spent by all reduces in occupied slots (ms)=3734

Total time spent by all map tasks (ms)=33174

Total time spent by all reduce tasks (ms)=3734

Total vcore-seconds taken by all map tasks=33174

Total vcore-seconds taken by all reduce tasks=3734

Total megabyte-seconds taken by all map tasks=33970176

Total megabyte-seconds taken by all reduce tasks=3823616

Map-Reduce Framework

Map input records=2

Map output records=8

Map output bytes=66

Map output materialized bytes=119

Input split bytes=330

Combine input records=0

Combine output records=0

Reduce input groups=5

Reduce shuffle bytes=119

Reduce input records=8

Reduce output records=5

Spilled Records=16

Shuffled Maps =3

Failed Shuffles=0

Merged Map outputs=3

GC time elapsed (ms)=222

CPU time spent (ms)=3050

Physical memory (bytes) snapshot=967741440

Virtual memory (bytes) snapshot=3570974720

Total committed heap usage (bytes)=719847424

Shuffle Errors

BAD_ID=0

CONNECTION=0

IO_ERROR=0

WRONG_LENGTH=0

WRONG_MAP=0

WRONG_REDUCE=0

File Input Format Counters

Bytes Read=53

File Output Format Counters

Bytes Written=41

14/12/14 05:54:37 INFO streaming.StreamJob: Output directory: streamoutput

 

结果

$ hadoop fs -cat streamoutput/part-00000
Bye 1

Goodbye 1

Hadoop  2

Hello   2

World   2

 

Hive

 

接下来是hive。请参考programming hive。在hive中将text文件作为外部表。

HiveQL

DROP TABLE docs;

CREATE EXTERNAL TABLE docs (line STRING) LOCATION '/user/cloudera/input';

SELECT word, count(1) AS count FROM

(SELECT explode(split(line, ' ')) AS word FROM docs) w GROUP BY word ORDER BY word;

 

 

执行与结果

 

hive> SELECT word, count(1) AS count FROM
>   (SELECT explode(split(line, ' ')) AS word FROM docs) w GROUP BY word

> ORDER BY word;

Total jobs = 2

Launching Job 1 out of 2

Number of reduce tasks not specified. Estimated from input data size: 1

In order to change the average load for a reducer (in bytes):

set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

set mapreduce.job.reduces=<number>

Starting Job = job_1418545807639_0019, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0019/

Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1418545807639_0019

Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1

2014-12-14 05:23:53,107 Stage-1 map = 0%,  reduce = 0%

2014-12-14 05:24:00,570 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.29 sec

2014-12-14 05:24:10,901 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.49 sec

MapReduce Total cumulative CPU time: 2 seconds 490 msec

Ended Job = job_1418545807639_0019

Launching Job 2 out of 2

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

set mapreduce.job.reduces=<number>

Starting Job = job_1418545807639_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1418545807639_0020/

Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1418545807639_0020

Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1

2014-12-14 05:24:26,302 Stage-2 map = 0%,  reduce = 0%

2014-12-14 05:24:33,842 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 1.04 sec

2014-12-14 05:24:43,114 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.26 sec

MapReduce Total cumulative CPU time: 2 seconds 260 msec

Ended Job = job_1418545807639_0020

MapReduce Jobs Launched:

Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.49 sec   HDFS Read: 337 HDFS Write: 217 SUCCESS

Stage-Stage-2: Map: 1  Reduce: 1   Cumulative CPU: 2.26 sec   HDFS Read: 594 HDFS Write: 41 SUCCESS

Total MapReduce CPU Time Spent: 4 seconds 750 msec

OK

Bye 1

Goodbye 1

Hadoop  2

Hello   2

World   2

Time taken: 66.765 seconds, Fetched: 5 row(s)

hive>

 

Pig

 

然后是pig。

PigLatin Script

 

docs = LOAD '/user/cloudera/input' AS (line:chararray);
words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word;

groupd = GROUP words BY word;

wordcount = FOREACH groupd GENERATE group, COUNT(words);

DUMP wordcount;

 

执行与结果

grunt> docs = LOAD '/user/cloudera/input' AS (line:chararray);
grunt> words = FOREACH docs GENERATE FLATTEN(TOKENIZE(line)) AS word;

grunt> groupd = GROUP words BY word;

grunt> wordcount = FOREACH groupd GENERATE group, COUNT(words);

grunt> DUMP wordcount;

2014-12-14 05:27:00,067 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY

2014-12-14 05:27:00,112 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}

2014-12-14 05:27:00,230 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false

(略)

2014-12-14 05:27:42,341 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2014-12-14 05:27:42,341 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

(Bye,1)

(Hello,2)

(World,2)

(Hadoop,2)

(Goodbye,1)

grunt>

 

 

因为太长了,明天我将继续讲解。

 

Comment

*

沪ICP备14014813号

沪公网安备 31010802001379号