概要

sshadoopはScalaでHadoopを使う際のコードを簡略化するために書いた簡易なライブラリです。

現在、そこはかとなく開発途中です。

@Date 2012/03/13
@Versions CDH3, Scala2.9.1

サンプルコード

簡単なWordCountはこんな風になります。

import org.apache.hadoop.io._
import jp.mwsoft.sshadoop.mapreduce._
import jp.mwsoft.sshadoop.util.STool

object WordCountSample extends STool {

  override def runJob(args: Array[String]): Boolean = {
    new SJob(getConf(), "jobName")
      .mapper(new MyMapper())
      .reducer(new MyReducer())
      .combiner(new MyReducer())
      .fileInputPaths( "data/in" )
      .fileOutputPath( "data/out" )
      .waitForCompletion(true)
  }

  class MyMapper extends SMapper[LongWritable, Text, Text, LongWritable] {
    override def map(key: LongWritable, value: Text, context: Context) {
      for (str <- value.split("\\s")) context.write(str, 1L)
    }
  }

  class MyReducer extends SReducer[Text, LongWritable, Text, LongWritable] {
    override def reduce(key: Text, values: Iterator[LongWritable], context: Context) {
      context.write(key, values.map(_.get).sum)
    }
  }
}

イメージ的にはStreamingでMapReduceを実行するのに近い感じのコードにしたかった。まぁ、だいぶ近づけた感じはする。

ちなみにHadoop Streamingを呼び出す場合は下記のような感じになる。

hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /home/user/foo \
    -reducer /home/user/bar \
    -combiner /home/user/baz

導入

とりあえずコードとjarだけ転がしておきます。

コード : この辺

jar : この辺

やってること

たいしたことはしてません。

SJob
・Jobを継承してsetterメソッド全部にthisを返すメソッドを加えた

SMapper/SReducer
・ValueのInとOutのクラスを取得できる
・SJobに渡すとOutputKey(Value)ClassやJarByClassが自動で設定される
・LongWritable⇔LongやText⇔Stringの暗黙の型変換が定義されてる

STool
・ConfiguredとToolを継承したクラス
・LongWritable⇔LongやText⇔Stringの暗黙の型変換が定義されてる

問題点

SMapperやSReducerで定義するクラスは、INとOUTの型が固定である必要がある。下記のようなクラスは使えない。

class MyMapper[T] extends SMapper[T, LongWritable, T, LongWritable] 

この場合はSJobのmapperやreducerメソッドを使わずに、普通にmapperClassやreducerClassで設定する。

// 引数はMapperClass, OutputKeyClass, OutputValueClass
job.mapperClass( classOf[MyMapper], classOf[Text], classOf[LongWritable] )

あとMapperやReducerはインスタンスじゃなく、Manifestで渡した方がいいような気がしたようなしなかったような。

少し複雑なサンプルコード

下記はhadoopもexampleに入ってるgrepを移植したコードの中の、MapReduce処理を実行している部分。grepとsortの2つの処理を呼び出している。

    // run grep job
    val grepResult = new SJob(conf, "grep-search")
      .mapper(new GrepMapper())
      .reducer(new LongSumReducer[Text]())
      .outputKeyValueClass(clsText, clsLongWritable)
      .combinerClass(classOf[LongSumReducer[_]])
      .fileInputPaths(args(0))
      .fileOutputPath(tempPath)
      .outputFormatClass(classOf[SequenceFileOutputFormat[_, _]])
      .waitForCompletion(true)

    // run sort job
    grepResult && new SJob(conf, "grep-sort")
      .numReduceTasks(1)
      .mapperClass(classOf[InverseMapper[Text, LongWritable]])
      .fileInputPaths(tempPath)
      .inputFormatClass(classOf[SequenceFileInputFormat[_, _]])
      .fileOutputPath(args(1))
      .sortComparatorClass(classOf[LongWritable.DecreasingComparator])
      .waitForCompletion(true)

Job、FileInputFormat、FileOutputFormat、SequenceFileOutputFormatなどのsetterメソッドはだいたいSJobで連結して呼び出せるようになっている。