機械学習に適した大規模分散計算環境Apache Spark

大規模分散計算環境のApache Sparkは、HadoopのMacReduceに比べてメモリ内で効率的に処理を行うことが特長で、機械学習ストリーム処理グラフ解析SQLデータ分析などの機能ライブラリがあります。Spark自体はScala言語で実装されていますが、Scalaの他にPython用のAPIや対話的なシェルも用意されていて、どちらの言語でもプログラミングや動作確認が可能です。

動作確認は簡単にできて、Java6以上がインストールされていれば、Downloadsページから適当なビルド済みパッケージをダウンロードして解凍すると、ScalaまたはPythonの対話的シェルが起動できます:

$ curl -O http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop1.tgz
$ tar zxf spark-1.0.2-bin-hadoop1.tgz
$ cd spark-1.0.2-bin-hadoop1
$ bin/pyspark
..(中略)..
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.0.2
      /_/

Using Python version 2.7.8 (default, Jul 13 2014 17:11:32)
SparkContext available as sc.
>>> 

ここではPythonの対話的シェルを起動しています。

Sparkは、処理対象のデータをResilient Distributed Dataset (RDD)とよばれる並列処理用のデータ形式に変換します。RDDの内容そのものではなく適用する処理手順を記録しておき、どこかの計算機ノードで処理が失敗しても、直前のRDDの内容を復元して別のノードで処理を継続します。対話的シェルで動作を確認すると、値が必要になるまで実際の処理を行っていないことが分かります。

たとえば、README.mdファイルを読み込んで各行の文字数を合計してみます:

>>> lines = sc.textFile("README.md")
14/08/11 18:54:09 INFO MemoryStore: ensureFreeSpace(32856) called with curMem=0, maxMem=309225062
14/08/11 18:54:09 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.1 KB, free 294.9 MB)
>>> lengths = lines.map(lambda s: len(s))
>>> lengths.reduce(lambda a, b: a + b)
14/08/11 18:55:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/08/11 18:55:03 WARN LoadSnappy: Snappy native library not loaded
14/08/11 18:55:03 INFO FileInputFormat: Total input paths to process : 1
14/08/11 18:55:03 INFO SparkContext: Starting job: reduce at <stdin>:1
14/08/11 18:55:03 INFO DAGScheduler: Got job 0 (reduce at <stdin>:1) with 2 output partitions (allowLocal=false)
14/08/11 18:55:03 INFO DAGScheduler: Final stage: Stage 0(reduce at <stdin>:1)
14/08/11 18:55:03 INFO DAGScheduler: Parents of final stage: List()
14/08/11 18:55:03 INFO DAGScheduler: Missing parents: List()
14/08/11 18:55:03 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[2] at RDD at PythonRDD.scala:37), which has no missing parents
14/08/11 18:55:03 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[2] at RDD at PythonRDD.scala:37)
14/08/11 18:55:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/08/11 18:55:03 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/08/11 18:55:03 INFO TaskSetManager: Serialized task 0.0:0 as 2999 bytes in 3 ms
14/08/11 18:55:03 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/08/11 18:55:03 INFO TaskSetManager: Serialized task 0.0:1 as 2999 bytes in 1 ms
14/08/11 18:55:03 INFO Executor: Running task ID 0
14/08/11 18:55:03 INFO Executor: Running task ID 1
14/08/11 18:55:03 INFO BlockManager: Found block broadcast_0 locally
14/08/11 18:55:03 INFO BlockManager: Found block broadcast_0 locally
14/08/11 18:55:04 INFO HadoopRDD: Input split: file:/.../spark-1.0.2-bin-hadoop1/README.md:2110+2111
14/08/11 18:55:04 INFO HadoopRDD: Input split: file:/.../spark-1.0.2-bin-hadoop1/README.md:0+2110
14/08/11 18:55:04 INFO PythonRDD: Times: total = 1345, boot = 1314, init = 30, finish = 1
14/08/11 18:55:04 INFO PythonRDD: Times: total = 1352, boot = 1310, init = 38, finish = 4
14/08/11 18:55:04 INFO Executor: Serialized size of result for 1 is 638
14/08/11 18:55:04 INFO Executor: Sending result for 1 directly to driver
14/08/11 18:55:04 INFO Executor: Finished task ID 1
14/08/11 18:55:04 INFO Executor: Serialized size of result for 0 is 638
14/08/11 18:55:04 INFO Executor: Sending result for 0 directly to driver
14/08/11 18:55:04 INFO Executor: Finished task ID 0
14/08/11 18:55:04 INFO TaskSetManager: Finished TID 1 in 1427 ms on localhost (progress: 1/2)
14/08/11 18:55:04 INFO DAGScheduler: Completed ResultTask(0, 1)
14/08/11 18:55:04 INFO TaskSetManager: Finished TID 0 in 1456 ms on localhost (progress: 2/2)
14/08/11 18:55:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/08/11 18:55:04 INFO DAGScheduler: Completed ResultTask(0, 0)
14/08/11 18:55:04 INFO DAGScheduler: Stage 0 (reduce at <stdin>:1) finished in 1.475 s
14/08/11 18:55:04 INFO SparkContext: Job finished: reduce at <stdin>:1, took 1.656086 s
4094

Sparkに同梱されている機械学習ライブラリはMLlibとよばれていますが、現時点で、線形SVM、ロジスティック回帰、決定木、ナイーブベイズ協調フィルタリング、k-meansクラスタリングなどの機能が提供されています。SparkのGithubにはサンプルのスクリプトとデータが登録されているので、動作確認をする場合はこちらをチェックアウトするとよいでしょう。

なお、Hadoopとの連携など大規模並列処理向け機械学習ライブラリのMahoutは、今後MapReduceアルゴリズムを新規に実装することはやめて、Sparkに対応することになっています。