機械学習に適した大規模分散計算環境Apache Spark

大規模分散計算環境のApache Sparkは、HadoopのMacReduceに比べてメモリ内で効率的に処理を行うことが特長で、機械学習ストリーム処理グラフ解析SQLデータ分析などの機能ライブラリがあります。Spark自体はScala言語で実装されていますが、Scalaの他にPython用のAPIや対話的なシェルも用意されていて、どちらの言語でもプログラミングや動作確認が可能です。


$ curl -O http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop1.tgz
$ tar zxf spark-1.0.2-bin-hadoop1.tgz
$ cd spark-1.0.2-bin-hadoop1
$ bin/pyspark
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.0.2

Using Python version 2.7.8 (default, Jul 13 2014 17:11:32)
SparkContext available as sc.


Sparkは、処理対象のデータをResilient Distributed Dataset (RDD)とよばれる並列処理用のデータ形式に変換します。RDDの内容そのものではなく適用する処理手順を記録しておき、どこかの計算機ノードで処理が失敗しても、直前のRDDの内容を復元して別のノードで処理を継続します。対話的シェルで動作を確認すると、値が必要になるまで実際の処理を行っていないことが分かります。


>>> lines = sc.textFile("README.md")
14/08/11 18:54:09 INFO MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=309225062
14/08/11 18:54:09 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.1 KB, free 294.9 MB)
>>> lengths = lines.map(lambda s: len(s))
>>> lengths.reduce(lambda a, b: a + b)
14/08/11 18:55:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/08/11 18:55:03 WARN LoadSnappy: Snappy native library not loaded
14/08/11 18:55:03 INFO FileInputFormat: Total input paths to process : 1
14/08/11 18:55:03 INFO SparkContext: Starting job: reduce at <stdin>:1
14/08/11 18:55:03 INFO DAGScheduler: Got job 0 (reduce at <stdin>:1) with 2 output partitions (allowLocal=false)
14/08/11 18:55:03 INFO DAGScheduler: Final stage: Stage 0(reduce at <stdin>:1)
14/08/11 18:55:03 INFO DAGScheduler: Parents of final stage: List()
14/08/11 18:55:03 INFO DAGScheduler: Missing parents: List()
14/08/11 18:55:03 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[2] at RDD at PythonRDD.scala:37), which has no missing parents
14/08/11 18:55:03 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[2] at RDD at PythonRDD.scala:37)
14/08/11 18:55:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/08/11 18:55:03 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/08/11 18:55:03 INFO TaskSetManager: Serialized task 0.0:0 as 2999 bytes in 3 ms
14/08/11 18:55:03 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/08/11 18:55:03 INFO TaskSetManager: Serialized task 0.0:1 as 2999 bytes in 1 ms
14/08/11 18:55:03 INFO Executor: Running task ID 0
14/08/11 18:55:03 INFO Executor: Running task ID 1
14/08/11 18:55:03 INFO BlockManager: Found block broadcast_0 locally
14/08/11 18:55:03 INFO BlockManager: Found block broadcast_0 locally
14/08/11 18:55:04 INFO HadoopRDD: Input split: file:/.../spark-1.0.2-bin-hadoop1/README.md:2110+2111
14/08/11 18:55:04 INFO HadoopRDD: Input split: file:/.../spark-1.0.2-bin-hadoop1/README.md:0+2110
14/08/11 18:55:04 INFO PythonRDD: Times: total = 1345, boot = 1314, init = 30, finish = 1
14/08/11 18:55:04 INFO PythonRDD: Times: total = 1352, boot = 1310, init = 38, finish = 4
14/08/11 18:55:04 INFO Executor: Serialized size of result for 1 is 638
14/08/11 18:55:04 INFO Executor: Sending result for 1 directly to driver
14/08/11 18:55:04 INFO Executor: Finished task ID 1
14/08/11 18:55:04 INFO Executor: Serialized size of result for 0 is 638
14/08/11 18:55:04 INFO Executor: Sending result for 0 directly to driver
14/08/11 18:55:04 INFO Executor: Finished task ID 0
14/08/11 18:55:04 INFO TaskSetManager: Finished TID 1 in 1427 ms on localhost (progress: 1/2)
14/08/11 18:55:04 INFO DAGScheduler: Completed ResultTask(0, 1)
14/08/11 18:55:04 INFO TaskSetManager: Finished TID 0 in 1456 ms on localhost (progress: 2/2)
14/08/11 18:55:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/08/11 18:55:04 INFO DAGScheduler: Completed ResultTask(0, 0)
14/08/11 18:55:04 INFO DAGScheduler: Stage 0 (reduce at <stdin>:1) finished in 1.475 s
14/08/11 18:55:04 INFO SparkContext: Job finished: reduce at <stdin>:1, took 1.656086 s

