sshadoopはScalaでHadoopを使う際のコードを簡略化するために書いた簡易なライブラリです。
現在、そこはかとなく開発途中です。
簡単なWordCountはこんな風になります。
import org.apache.hadoop.io._
import jp.mwsoft.sshadoop.mapreduce._
import jp.mwsoft.sshadoop.util.STool
object WordCountSample extends STool {
override def runJob(args: Array[String]): Boolean = {
new SJob(getConf(), "jobName")
.mapper(new MyMapper())
.reducer(new MyReducer())
.combiner(new MyReducer())
.fileInputPaths( "data/in" )
.fileOutputPath( "data/out" )
.waitForCompletion(true)
}
class MyMapper extends SMapper[LongWritable, Text, Text, LongWritable] {
override def map(key: LongWritable, value: Text, context: Context) {
for (str <- value.split("\\s")) context.write(str, 1L)
}
}
class MyReducer extends SReducer[Text, LongWritable, Text, LongWritable] {
override def reduce(key: Text, values: Iterator[LongWritable], context: Context) {
context.write(key, values.map(_.get).sum)
}
}
}
イメージ的にはStreamingでMapReduceを実行するのに近い感じのコードにしたかった。まぁ、だいぶ近づけた感じはする。
ちなみにHadoop Streamingを呼び出す場合は下記のような感じになる。
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /home/user/foo \
-reducer /home/user/bar \
-combiner /home/user/baz
たいしたことはしてません。
SJob
・Jobを継承してsetterメソッド全部にthisを返すメソッドを加えた
SMapper/SReducer
・ValueのInとOutのクラスを取得できる
・SJobに渡すとOutputKey(Value)ClassやJarByClassが自動で設定される
・LongWritable⇔LongやText⇔Stringの暗黙の型変換が定義されてる
STool
・ConfiguredとToolを継承したクラス
・LongWritable⇔LongやText⇔Stringの暗黙の型変換が定義されてる
SMapperやSReducerで定義するクラスは、INとOUTの型が固定である必要がある。下記のようなクラスは使えない。
class MyMapper[T] extends SMapper[T, LongWritable, T, LongWritable]
この場合はSJobのmapperやreducerメソッドを使わずに、普通にmapperClassやreducerClassで設定する。
// 引数はMapperClass, OutputKeyClass, OutputValueClass
job.mapperClass( classOf[MyMapper], classOf[Text], classOf[LongWritable] )
あとMapperやReducerはインスタンスじゃなく、Manifestで渡した方がいいような気がしたようなしなかったような。
下記はhadoopもexampleに入ってるgrepを移植したコードの中の、MapReduce処理を実行している部分。grepとsortの2つの処理を呼び出している。
// run grep job
val grepResult = new SJob(conf, "grep-search")
.mapper(new GrepMapper())
.reducer(new LongSumReducer[Text]())
.outputKeyValueClass(clsText, clsLongWritable)
.combinerClass(classOf[LongSumReducer[_]])
.fileInputPaths(args(0))
.fileOutputPath(tempPath)
.outputFormatClass(classOf[SequenceFileOutputFormat[_, _]])
.waitForCompletion(true)
// run sort job
grepResult && new SJob(conf, "grep-sort")
.numReduceTasks(1)
.mapperClass(classOf[InverseMapper[Text, LongWritable]])
.fileInputPaths(tempPath)
.inputFormatClass(classOf[SequenceFileInputFormat[_, _]])
.fileOutputPath(args(1))
.sortComparatorClass(classOf[LongWritable.DecreasingComparator])
.waitForCompletion(true)
Job、FileInputFormat、FileOutputFormat、SequenceFileOutputFormatなどのsetterメソッドはだいたいSJobで連結して呼び出せるようになっている。