用Hadoop的各种语言进行wordcount(2):Apache Spark

本文永久链接地址:https://www.askmaclean.com/archives/spark-wordcount-2.html

 

dbDao.com 引导式IT在线教育
Hadoop 技术学习QQ群号  : 134115150

 

继续昨天的内容,今天也是进行wordcount。今天是用Apache Spark (ScalaPythonJava来执行wordcount。

Spark是用Scala、Python、Java来进行wordcount。Scala与Python是用REPL,Java是用Spark应用来执行。

Spark中的wordcount是在spark站点张有的样本,我参考了Cloudera的博客。

https://spark.apache.org/examples.html

http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/

github 上的位置 https://github.com/kawamon/wordcount.git

 

Spark (Scala)

 

首先从Scala开始。

 

Cloudera Quickstart VM的Spark有版本问题,在spark-shell启动时会出现版本错误。

参考信息:http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/5-1-Quickstart-VM-issue/td-p/16684

这次我们就无视安全性,来进行以下变更。

 

$ sudo -u hdfs hdfs dfs -chmod -R 777 /user/spark

 

 

另外,终止Quickstart VM进行启动的情况下,会有不能顺利与Spark Maste连接的情况。这时请试着重启Master与Worker(与History Server)

 

 

代码

 

val file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _, 1)  

counts.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark_scala.output")

spark-shell(REPL)执行

$ spark-shell --master spark://quickstart.cloudera:7077
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/assembly/lib/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/14 21:57:22 INFO SecurityManager: Changing view acls to: cloudera
14/12/14 21:57:22 INFO SecurityManager: Changing modify acls to: cloudera
14/12/14 21:57:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
14/12/14 21:57:22 INFO HttpServer: Starting HTTP Server
14/12/14 21:57:22 INFO Utils: Successfully started service 'HTTP class server' on port 35439.
Welcome to
____              __
/ __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.1.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
14/12/14 21:57:27 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.2.219 instead (on interface eth1)
14/12/14 21:57:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
(略)
14/12/14 21:57:34 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@192.168.2.219:48741/user/Executor#1486090800] with ID 0
14/12/14 21:57:34 INFO BlockManagerMasterActor: Registering block manager 192.168.2.219:52876 with 265.4 MB RAM
scala>
scala> val file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
14/12/14 22:15:47 INFO MemoryStore: ensureFreeSpace(258443) called with curMem=279030, maxMem=278302556
14/12/14 22:15:47 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 252.4 KB, free 264.9 MB)
14/12/14 22:15:47 INFO MemoryStore: ensureFreeSpace(20659) called with curMem=537473, maxMem=278302556
14/12/14 22:15:47 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.2 KB, free 264.9 MB)
14/12/14 22:15:47 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.219:52096 (size: 20.2 KB, free: 265.4 MB)
14/12/14 22:15:47 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
file: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/input MappedRDD[5] at textFile at <console>:12
scala> val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _, 1)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:14
scala> counts.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark_scala.output")
14/12/14 22:15:47 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/12/14 22:15:47 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/12/14 22:15:47 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/12/14 22:15:47 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/12/14 22:15:47 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/12/14 22:15:47 INFO SparkContext: Starting job: saveAsTextFile at <console>:17
14/12/14 22:15:47 INFO FileInputFormat: Total input paths to process : 2
14/12/14 22:15:48 INFO DAGScheduler: Registering RDD 7 (map at <console>:14)
14/12/14 22:15:48 INFO DAGScheduler: Got job 0 (saveAsTextFile at <console>:17) with 1 output partitions (allowLocal=false)
14/12/14 22:15:48 INFO DAGScheduler: Final stage: Stage 0(saveAsTextFile at <console>:17)
14/12/14 22:15:48 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/12/14 22:15:48 INFO DAGScheduler: Missing parents: List(Stage 1)
14/12/14 22:15:48 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[7] at map at <console>:14), which has no missing parents
14/12/14 22:15:48 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=558132, maxMem=278302556
14/12/14 22:15:48 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.4 KB, free 264.9 MB)
14/12/14 22:15:48 INFO MemoryStore: ensureFreeSpace(2074) called with curMem=561580, maxMem=278302556
14/12/14 22:15:48 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.0 KB, free 264.9 MB)
14/12/14 22:15:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.219:52096 (size: 2.0 KB, free: 265.4 MB)
14/12/14 22:15:48 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
14/12/14 22:15:48 INFO DAGScheduler: Submitting 3 missing tasks from Stage 1 (MappedRDD[7] at map at <console>:14)
14/12/14 22:15:48 INFO TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
14/12/14 22:15:48 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 192.168.2.219, ANY, 1198 bytes)
14/12/14 22:15:48 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, 192.168.2.219, ANY, 1198 bytes)
14/12/14 22:15:48 INFO ConnectionManager: Accepted connection from [192.168.2.219/192.168.2.219:49660]
14/12/14 22:15:48 INFO SendingConnection: Initiating connection to [/192.168.2.219:58599]
14/12/14 22:15:48 INFO SendingConnection: Connected to [/192.168.2.219:58599], 1 messages pending
14/12/14 22:15:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.219:58599 (size: 2.0 KB, free: 265.4 MB)
14/12/14 22:15:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.219:58599 (size: 20.2 KB, free: 265.4 MB)
14/12/14 22:15:50 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 2, 192.168.2.219, ANY, 1198 bytes)
14/12/14 22:15:50 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 2152 ms on 192.168.2.219 (1/3)
14/12/14 22:15:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 2172 ms on 192.168.2.219 (2/3)
14/12/14 22:15:50 INFO DAGScheduler: Stage 1 (map at <console>:14) finished in 2.182 s
14/12/14 22:15:50 INFO DAGScheduler: looking for newly runnable stages
14/12/14 22:15:50 INFO DAGScheduler: running: Set()
14/12/14 22:15:50 INFO DAGScheduler: waiting: Set(Stage 0)
14/12/14 22:15:50 INFO DAGScheduler: failed: Set()
14/12/14 22:15:50 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 2) in 57 ms on 192.168.2.219 (3/3)
14/12/14 22:15:50 INFO DAGScheduler: Missing parents for Stage 0: List()
14/12/14 22:15:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/14 22:15:50 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[9] at saveAsTextFile at <console>:17), which is now runnable
14/12/14 22:15:50 INFO MemoryStore: ensureFreeSpace(65904) called with curMem=563654, maxMem=278302556
14/12/14 22:15:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 64.4 KB, free 264.8 MB)
14/12/14 22:15:50 INFO MemoryStore: ensureFreeSpace(23147) called with curMem=629558, maxMem=278302556
14/12/14 22:15:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 22.6 KB, free 264.8 MB)
14/12/14 22:15:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.219:52096 (size: 22.6 KB, free: 265.3 MB)
14/12/14 22:15:50 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
14/12/14 22:15:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[9] at saveAsTextFile at <console>:17)
14/12/14 22:15:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/12/14 22:15:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 3, 192.168.2.219, PROCESS_LOCAL, 948 bytes)
14/12/14 22:15:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.219:58599 (size: 22.6 KB, free: 265.4 MB)
14/12/14 22:15:50 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@192.168.2.219:38221
14/12/14 22:15:50 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 144 bytes
14/12/14 22:15:50 INFO DAGScheduler: Stage 0 (saveAsTextFile at <console>:17) finished in 0.392 s
14/12/14 22:15:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 3) in 391 ms on 192.168.2.219 (1/1)
14/12/14 22:15:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/12/14 22:15:50 INFO SparkContext: Job finished: saveAsTextFile at <console>:17, took 2.786326655 s

 

 

 

結果

hadoop fs -cat spark_scala.output/part-00000
(Bye,1)
(Goodbye,1)
(Hello,2)
(World,2)
(Hadoop,2)

 

 

Spark (Python)

 

与Scala相同,用pyspark进行对话性地执行。

 

 

 

Code

file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1)
counts.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/pyspark.output")

 

 

pyspark(REPL)执行

$ MASTER=spark://quickstart.cloudera:7077 pyspark
Python 2.6.6 (r266:84292, Jan 22 2014, 09:42:36)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/assembly/lib/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/14 22:30:23 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.2.219 instead (on interface eth1)
(略)
Welcome to
____              __
/ __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/
/__ / .__/\_,_/_/ /_/\_\   version 1.1.0
/_/
Using Python version 2.6.6 (r266:84292, Jan 22 2014 09:42:36)
SparkContext available as sc.
>>> 14/12/14 22:30:26 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141214223026-0005
14/12/14 22:30:26 INFO AppClient$ClientActor: Executor added: app-20141214223026-0005/0 on worker-20141214215634-192.168.2.219-7078 (192.168.2.219:7078) with 2 cores
14/12/14 22:30:26 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141214223026-0005/0 on hostPort 192.168.2.219:7078 with 2 cores, 512.0 MB RAM
14/12/14 22:30:26 INFO AppClient$ClientActor: Executor updated: app-20141214223026-0005/0 is now LOADING
14/12/14 22:30:26 INFO AppClient$ClientActor: Executor updated: app-20141214223026-0005/0 is now RUNNING
14/12/14 22:30:29 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@192.168.2.219:48258/user/Executor#133680320] with ID 0
14/12/14 22:30:29 INFO BlockManagerMasterActor: Registering block manager 192.168.2.219:40153 with 265.4 MB RAM
>>>
>>> file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
14/12/14 22:32:12 INFO MemoryStore: ensureFreeSpace(258371) called with curMem=0, maxMem=278302556
14/12/14 22:32:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 252.3 KB, free 265.2 MB)
14/12/14 22:32:13 INFO MemoryStore: ensureFreeSpace(20662) called with curMem=258371, maxMem=278302556
14/12/14 22:32:13 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.2 KB, free 265.1 MB)
14/12/14 22:32:13 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.219:43600 (size: 20.2 KB, free: 265.4 MB)
14/12/14 22:32:13 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
>>> counts = file.flatMap(lambda line: line.split(" ")) \
...              .map(lambda word: (word, 1)) \
...              .reduceByKey(lambda a, b: a + b, 1)
>>> counts.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/pyspark.output")
14/12/14 22:32:14 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/12/14 22:32:14 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/12/14 22:32:14 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/12/14 22:32:14 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/12/14 22:32:14 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/12/14 22:32:14 INFO SparkContext: Starting job: saveAsTextFile at NativeMethodAccessorImpl.java:-2
14/12/14 22:32:14 INFO FileInputFormat: Total input paths to process : 2
14/12/14 22:32:14 INFO DAGScheduler: Registering RDD 3 (RDD at PythonRDD.scala:261)
14/12/14 22:32:14 INFO DAGScheduler: Got job 0 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) with 1 output partitions (allowLocal=false)
14/12/14 22:32:14 INFO DAGScheduler: Final stage: Stage 0(saveAsTextFile at NativeMethodAccessorImpl.java:-2)
14/12/14 22:32:14 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/12/14 22:32:14 INFO DAGScheduler: Missing parents: List(Stage 1)
14/12/14 22:32:14 INFO DAGScheduler: Submitting Stage 1 (PairwiseRDD[3] at RDD at PythonRDD.scala:261), which has no missing parents
14/12/14 22:32:14 INFO MemoryStore: ensureFreeSpace(7728) called with curMem=279033, maxMem=278302556
14/12/14 22:32:14 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 7.5 KB, free 265.1 MB)
14/12/14 22:32:14 INFO MemoryStore: ensureFreeSpace(4967) called with curMem=286761, maxMem=278302556
14/12/14 22:32:14 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.9 KB, free 265.1 MB)
14/12/14 22:32:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.219:43600 (size: 4.9 KB, free: 265.4 MB)
14/12/14 22:32:14 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
14/12/14 22:32:14 INFO DAGScheduler: Submitting 3 missing tasks from Stage 1 (PairwiseRDD[3] at RDD at PythonRDD.scala:261)
14/12/14 22:32:14 INFO TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
14/12/14 22:32:15 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 192.168.2.219, ANY, 1198 bytes)
14/12/14 22:32:15 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, 192.168.2.219, ANY, 1198 bytes)
14/12/14 22:32:15 INFO ConnectionManager: Accepted connection from [192.168.2.219/192.168.2.219:56227]
14/12/14 22:32:15 INFO SendingConnection: Initiating connection to [/192.168.2.219:40153]
14/12/14 22:32:15 INFO SendingConnection: Connected to [/192.168.2.219:40153], 1 messages pending
14/12/14 22:32:15 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.219:40153 (size: 4.9 KB, free: 265.4 MB)
14/12/14 22:32:15 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.219:40153 (size: 20.2 KB, free: 265.4 MB)
14/12/14 22:32:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 2, 192.168.2.219, ANY, 1198 bytes)
14/12/14 22:32:18 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 3098 ms on 192.168.2.219 (1/3)
14/12/14 22:32:18 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 3132 ms on 192.168.2.219 (2/3)
14/12/14 22:32:18 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 2) in 77 ms on 192.168.2.219 (3/3)
14/12/14 22:32:18 INFO DAGScheduler: Stage 1 (RDD at PythonRDD.scala:261) finished in 3.157 s
14/12/14 22:32:18 INFO DAGScheduler: looking for newly runnable stages
14/12/14 22:32:18 INFO DAGScheduler: running: Set()
14/12/14 22:32:18 INFO DAGScheduler: waiting: Set(Stage 0)
14/12/14 22:32:18 INFO DAGScheduler: failed: Set()
14/12/14 22:32:18 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/14 22:32:18 INFO DAGScheduler: Missing parents for Stage 0: List()
14/12/14 22:32:18 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which is now runnable
14/12/14 22:32:18 INFO MemoryStore: ensureFreeSpace(69336) called with curMem=291728, maxMem=278302556
14/12/14 22:32:18 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 67.7 KB, free 265.1 MB)
14/12/14 22:32:18 INFO MemoryStore: ensureFreeSpace(25656) called with curMem=361064, maxMem=278302556
14/12/14 22:32:18 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.1 KB, free 265.0 MB)
14/12/14 22:32:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.219:43600 (size: 25.1 KB, free: 265.4 MB)
14/12/14 22:32:18 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
14/12/14 22:32:18 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2)
14/12/14 22:32:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/12/14 22:32:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 3, 192.168.2.219, PROCESS_LOCAL, 948 bytes)
14/12/14 22:32:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.219:40153 (size: 25.1 KB, free: 265.4 MB)
14/12/14 22:32:18 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@192.168.2.219:35770
14/12/14 22:32:18 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 144 bytes
14/12/14 22:32:19 INFO DAGScheduler: Stage 0 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) finished in 0.859 s
14/12/14 22:32:19 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 3) in 852 ms on 192.168.2.219 (1/1)
14/12/14 22:32:19 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/12/14 22:32:19 INFO SparkContext: Job finished: saveAsTextFile at NativeMethodAccessorImpl.java:-2, took 4.276595226 s
>>>

 

结果

$ hdfs dfs -cat pyspark.output/part-00000
(u'World', 2)
(u'Bye', 1)
(u'Hello', 2)
(u'Goodbye', 1)
(u'Hadoop', 2)

 

 

Spark (Java)

 

最后是Java。因为java无法在对话性地(REPL)执行,所以就作为spark应用来进行构建,来执行。

另外,这次文章参考了下述代码与pom.xml。
http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/

 

 

pom.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
Cloudera, Inc. licenses this file to you under the Apache License,
Version 2.0 (the "License"). You may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the
License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudera.sparkwordcount</groupId>
<artifactId>sparkwordcount</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>"Spark Word Count"</name>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>maven-hadoop</id>
<name>Hadoop Releases</name>
<url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
<repository>
<id>cloudera-repos</id>
<name>Cloudera Repos</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0-cdh5.2.1</version>
</dependency>
</dependencies>
</project>

 

 

 

src/main/java/com/example/sparkwordcount/JavaWordCount.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.sparkwordcount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class JavaWordCount {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
// split each document into words
JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
}
);
// count the occurrence of each word
JavaPairRDD<String, Integer> counts = tokenized.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}
).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}, 1      //number of reducers = 1
);
counts.sortByKey(true).saveAsTextFile(args[1]);
System.exit(0);
}
}

 

 

 

构建

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building "Spark Word Count" 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ sparkwordcount ---
[debug] execute contextualize
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/cloudera/work/spark/java/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ sparkwordcount ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /home/cloudera/work/spark/java/target/classes
[INFO]
[INFO] --- maven-scala-plugin:2.15.2:compile (default) @ sparkwordcount ---
[INFO] Checking for multiple versions of scala
[WARNING]  Expected all dependencies to require Scala version: 2.10.4
[WARNING]  com.cloudera.sparkwordcount:sparkwordcount:0.0.1-SNAPSHOT requires scala version: 2.10.4
[WARNING]  com.twitter:chill_2.10:0.3.6 requires scala version: 2.10.3
[WARNING] Multiple versions of scala libraries detected!
[INFO] includes = [**/*.scala,**/*.java,]
[INFO] excludes = []
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ sparkwordcount ---
[debug] execute contextualize
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/cloudera/work/spark/java/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ sparkwordcount ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ sparkwordcount ---
[INFO] No tests to run.
[INFO] Surefire report directory: /home/cloudera/work/spark/java/target/surefire-reports
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Results :
Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ sparkwordcount ---
[INFO] Building jar: /home/cloudera/work/spark/java/target/sparkwordcount-0.0.1-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 8.589s
[INFO] Finished at: Mon Dec 15 00:16:23 PST 2014
[INFO] Final Memory: 33M/364M
[INFO] ------------------------------------------------------------------------

 

执行

$ spark-submit --class com.example.sparkwordcount.JavaWordCount target/sparkwordcount-0.0.1-SNAPSHOT.jar hdfs://quickstart.cloudera/user/cloudera/input hdfs://quickstart.cloudera/user/cloudera/javaoutput
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/assembly/lib/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/15 00:17:48 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.2.219 instead (on interface eth1)
14/12/15 00:17:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/12/15 00:17:48 INFO SecurityManager: Changing view acls to: cloudera
14/12/15 00:17:48 INFO SecurityManager: Changing modify acls to: cloudera
14/12/15 00:17:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
14/12/15 00:17:48 INFO Slf4jLogger: Slf4jLogger started
14/12/15 00:17:48 INFO Remoting: Starting remoting
14/12/15 00:17:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.2.219:60706]
14/12/15 00:17:48 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.2.219:60706]
14/12/15 00:17:48 INFO Utils: Successfully started service 'sparkDriver' on port 60706.
14/12/15 00:17:48 INFO SparkEnv: Registering MapOutputTracker
14/12/15 00:17:48 INFO SparkEnv: Registering BlockManagerMaster
14/12/15 00:17:49 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141215001749-261d
14/12/15 00:17:49 INFO Utils: Successfully started service 'Connection manager for block manager' on port 41211.
14/12/15 00:17:49 INFO ConnectionManager: Bound socket to port 41211 with id = ConnectionManagerId(192.168.2.219,41211)
14/12/15 00:17:49 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
14/12/15 00:17:49 INFO BlockManagerMaster: Trying to register BlockManager
14/12/15 00:17:49 INFO BlockManagerMasterActor: Registering block manager 192.168.2.219:41211 with 265.4 MB RAM
14/12/15 00:17:49 INFO BlockManagerMaster: Registered BlockManager
14/12/15 00:17:49 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e161b29e-2a24-489a-9b0c-0268ee515c24
14/12/15 00:17:49 INFO HttpServer: Starting HTTP Server
14/12/15 00:17:49 INFO Utils: Successfully started service 'HTTP file server' on port 41763.
14/12/15 00:17:49 INFO Utils: Successfully started service 'SparkUI' on port 4040.
14/12/15 00:17:49 INFO SparkUI: Started SparkUI at http://192.168.2.219:4040
14/12/15 00:17:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/12/15 00:17:51 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/spark-count-1418631470263
14/12/15 00:17:51 INFO SparkContext: Added JAR file:/home/cloudera/work/spark/java/target/sparkwordcount-0.0.1-SNAPSHOT.jar at http://192.168.2.219:41763/jars/sparkwordcount-0.0.1-SNAPSHOT.jar with timestamp 1418631471473
14/12/15 00:17:51 INFO AppClient$ClientActor: Connecting to master spark://quickstart.cloudera:7077...
14/12/15 00:17:51 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
14/12/15 00:17:52 INFO MemoryStore: ensureFreeSpace(258371) called with curMem=0, maxMem=278302556
14/12/15 00:17:52 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 252.3 KB, free 265.2 MB)
14/12/15 00:17:52 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141215001752-0012
14/12/15 00:17:52 INFO AppClient$ClientActor: Executor added: app-20141215001752-0012/0 on worker-20141214215634-192.168.2.219-7078 (192.168.2.219:7078) with 2 cores
14/12/15 00:17:52 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141215001752-0012/0 on hostPort 192.168.2.219:7078 with 2 cores, 512.0 MB RAM
14/12/15 00:17:52 INFO AppClient$ClientActor: Executor updated: app-20141215001752-0012/0 is now LOADING
14/12/15 00:17:52 INFO MemoryStore: ensureFreeSpace(20659) called with curMem=258371, maxMem=278302556
14/12/15 00:17:52 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.2 KB, free 265.1 MB)
14/12/15 00:17:52 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.219:41211 (size: 20.2 KB, free: 265.4 MB)
14/12/15 00:17:52 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/15 00:17:52 INFO AppClient$ClientActor: Executor updated: app-20141215001752-0012/0 is now RUNNING
14/12/15 00:17:52 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/12/15 00:17:52 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/12/15 00:17:52 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/12/15 00:17:52 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/12/15 00:17:52 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/12/15 00:17:52 INFO SparkContext: Starting job: saveAsTextFile at JavaWordCount.java:61
14/12/15 00:17:52 INFO FileInputFormat: Total input paths to process : 2
14/12/15 00:17:52 INFO DAGScheduler: Registering RDD 3 (mapToPair at JavaWordCount.java:45)
14/12/15 00:17:52 INFO DAGScheduler: Registering RDD 4 (reduceByKey at JavaWordCount.java:45)
14/12/15 00:17:52 INFO DAGScheduler: Got job 0 (saveAsTextFile at JavaWordCount.java:61) with 1 output partitions (allowLocal=false)
14/12/15 00:17:52 INFO DAGScheduler: Final stage: Stage 0(saveAsTextFile at JavaWordCount.java:61)
14/12/15 00:17:52 INFO DAGScheduler: Parents of final stage: List(Stage 2)
14/12/15 00:17:52 INFO DAGScheduler: Missing parents: List(Stage 2)
14/12/15 00:17:52 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[3] at mapToPair at JavaWordCount.java:45), which has no missing parents
14/12/15 00:17:52 INFO MemoryStore: ensureFreeSpace(4160) called with curMem=279030, maxMem=278302556
14/12/15 00:17:52 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 265.1 MB)
14/12/15 00:17:52 INFO MemoryStore: ensureFreeSpace(2461) called with curMem=283190, maxMem=278302556
14/12/15 00:17:52 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 265.1 MB)
14/12/15 00:17:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.219:41211 (size: 2.4 KB, free: 265.4 MB)
14/12/15 00:17:52 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
14/12/15 00:17:52 INFO DAGScheduler: Submitting 3 missing tasks from Stage 1 (MappedRDD[3] at mapToPair at JavaWordCount.java:45)
14/12/15 00:17:52 INFO TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
14/12/15 00:17:55 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@192.168.2.219:47684/user/Executor#2106146677] with ID 0
14/12/15 00:17:55 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 192.168.2.219, ANY, 1273 bytes)
14/12/15 00:17:55 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, 192.168.2.219, ANY, 1273 bytes)
14/12/15 00:17:55 INFO BlockManagerMasterActor: Registering block manager 192.168.2.219:53891 with 265.4 MB RAM
14/12/15 00:17:56 INFO ConnectionManager: Accepted connection from [192.168.2.219/192.168.2.219:49202]
14/12/15 00:17:56 INFO SendingConnection: Initiating connection to [/192.168.2.219:53891]
14/12/15 00:17:56 INFO SendingConnection: Connected to [/192.168.2.219:53891], 1 messages pending
14/12/15 00:17:56 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.219:53891 (size: 2.4 KB, free: 265.4 MB)
14/12/15 00:17:56 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.219:53891 (size: 20.2 KB, free: 265.4 MB)
14/12/15 00:17:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 2, 192.168.2.219, ANY, 1273 bytes)
14/12/15 00:17:57 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 2378 ms on 192.168.2.219 (1/3)
14/12/15 00:17:57 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 2472 ms on 192.168.2.219 (2/3)
14/12/15 00:17:57 INFO DAGScheduler: Stage 1 (mapToPair at JavaWordCount.java:45) finished in 5.202 s
14/12/15 00:17:57 INFO DAGScheduler: looking for newly runnable stages
14/12/15 00:17:57 INFO DAGScheduler: running: Set()
14/12/15 00:17:57 INFO DAGScheduler: waiting: Set(Stage 0, Stage 2)
14/12/15 00:17:57 INFO DAGScheduler: failed: Set()
14/12/15 00:17:57 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 2) in 54 ms on 192.168.2.219 (3/3)
14/12/15 00:17:58 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/15 00:17:58 INFO DAGScheduler: Missing parents for Stage 0: List(Stage 2)
14/12/15 00:17:58 INFO DAGScheduler: Missing parents for Stage 2: List()
14/12/15 00:17:58 INFO DAGScheduler: Submitting Stage 2 (ShuffledRDD[4] at reduceByKey at JavaWordCount.java:45), which is now runnable
14/12/15 00:17:58 INFO MemoryStore: ensureFreeSpace(3032) called with curMem=285651, maxMem=278302556
14/12/15 00:17:58 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.0 KB, free 265.1 MB)
14/12/15 00:17:58 INFO MemoryStore: ensureFreeSpace(1892) called with curMem=288683, maxMem=278302556
14/12/15 00:17:58 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1892.0 B, free 265.1 MB)
14/12/15 00:17:58 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.219:41211 (size: 1892.0 B, free: 265.4 MB)
14/12/15 00:17:58 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
14/12/15 00:17:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (ShuffledRDD[4] at reduceByKey at JavaWordCount.java:45)
14/12/15 00:17:58 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
14/12/15 00:17:58 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, 192.168.2.219, PROCESS_LOCAL, 1012 bytes)
14/12/15 00:17:58 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.219:53891 (size: 1892.0 B, free: 265.4 MB)
14/12/15 00:17:58 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to sparkExecutor@192.168.2.219:53792
14/12/15 00:17:58 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 144 bytes
14/12/15 00:17:58 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 3) in 115 ms on 192.168.2.219 (1/1)
14/12/15 00:17:58 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/12/15 00:17:58 INFO DAGScheduler: Stage 2 (reduceByKey at JavaWordCount.java:45) finished in 0.112 s
14/12/15 00:17:58 INFO DAGScheduler: looking for newly runnable stages
14/12/15 00:17:58 INFO DAGScheduler: running: Set()
14/12/15 00:17:58 INFO DAGScheduler: waiting: Set(Stage 0)
14/12/15 00:17:58 INFO DAGScheduler: failed: Set()
14/12/15 00:17:58 INFO DAGScheduler: Missing parents for Stage 0: List()
14/12/15 00:17:58 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at saveAsTextFile at JavaWordCount.java:61), which is now runnable
14/12/15 00:17:58 INFO MemoryStore: ensureFreeSpace(66464) called with curMem=290575, maxMem=278302556
14/12/15 00:17:58 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 64.9 KB, free 265.1 MB)
14/12/15 00:17:58 INFO MemoryStore: ensureFreeSpace(23685) called with curMem=357039, maxMem=278302556
14/12/15 00:17:58 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 23.1 KB, free 265.0 MB)
14/12/15 00:17:58 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.219:41211 (size: 23.1 KB, free: 265.4 MB)
14/12/15 00:17:58 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
14/12/15 00:17:58 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[6] at saveAsTextFile at JavaWordCount.java:61)
14/12/15 00:17:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/12/15 00:17:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 4, 192.168.2.219, PROCESS_LOCAL, 1023 bytes)
14/12/15 00:17:58 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.219:53891 (size: 23.1 KB, free: 265.4 MB)
14/12/15 00:17:58 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@192.168.2.219:53792
14/12/15 00:17:58 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 131 bytes
14/12/15 00:17:58 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 4) in 754 ms on 192.168.2.219 (1/1)
14/12/15 00:17:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/12/15 00:17:58 INFO DAGScheduler: Stage 0 (saveAsTextFile at JavaWordCount.java:61) finished in 0.748 s
14/12/15 00:17:58 INFO SparkContext: Job finished: saveAsTextFile at JavaWordCount.java:61, took 6.452954756 s

 

 

 

结果

 

$ hadoop fs -cat javaoutput/part-00000
(Bye,1)
(Goodbye,1)
(Hadoop,2)
(Hello,2)
(World,2)

 

今天到此为止。明天继续!

 

Comment

*

沪ICP备14014813号

沪公网安备 31010802001379号