概要

Hadoop0.20(CDH3)で日本語のWordCountをさらっと走らせてみる。形態素解析はKuromoji。

サンプルコードはScalaだけどもちろんJavaでも可。

@Date 2012/03/13
@Versions CDH3, Scala2.9.1, Eclipse3.7, Maven2

Kuromojiについて

KuromojiはJavaの形態素解析器。jarに辞書が内包されており、Mavenのレポジトリも公開されているので導入はとても手軽。

詳細については下記参照。

Java製形態素解析器「Kuromoji」を試してみる
http://www.mwsoft.jp/programming/lucene/kuromoji.html

今回はデフォルトの辞書(IPA)をそのまま利用。辞書を編集したい場合はresourcesにユーザ辞書を入れて呼び出すことにする。

導入

Mavenを利用する場合は下記のようなpom.xmlを書いて、assemblyプラグインで実行jarを作ると楽でした。
https://github.com/mwsoft/sample/blob/master/shadoop/pom.xml

Maven等のパッケージ管理ツールを使用しない場合は、普通にHadoopの環境を用意して、あとはKuromojiのjarを落としてきてlib配下とかに配布します。
http://atilika.org/confluence/pages/viewpage.action?pageId=131141

我が家では下記のような環境で実行してます。

ScalaでMapReduce(導入編)
http://www.mwsoft.jp/programming/hadoop/scala_mapreduce_setup.html

JobTracker1台、TaskTracker2台の貧弱構成でテストしてます。

WordCount Mapper

Mapperはこんな感じで。

class WordCountMapper extends Mapper[LongWritable, Text, Text, LongWritable] {
  type Context = Mapper[LongWritable, Text, Text, LongWritable]#Context

  val tokenizer = Tokenizer.builder().build()
  val one = new LongWritable( 1L )

  override def map( key: LongWritable, value: Text, context: Context ) {
    // 申し訳程度にNFKCでUnicode正規化しとく
    val text = Normalizer.normalize( value.toString, Normalizer.Form.NFKC )

    // Kuromojiを使って形態素解析
    for ( token <- tokenizer.tokenize( text ).iterator() ) {
      // テキトーに助詞と接続詞を省いてみる
      val features = token.getAllFeatures()
      if ( !features.startsWith( "助詞" ) && !features.startsWith( "接続詞" ) )
        context.write( new Text( token.getSurfaceForm() ), one )
    }
  }
}

ユーザ辞書を使う場合はresourcesに辞書を入れて、Tokenizerの初期化を以下のようにすればうまくいきました。

// ユーザ辞書のファイル名がuser.dicの場合
val tokenizer = Tokenizer.builder().
  userDictionary( getClass.getResourceAsStream( "user.dic" ) ).build()

今回は例としてTwitterから収集したテキストに対してワードカウントをかける予定なので、試しにそこ界隈で使われている「よるほー」という言葉をユーザ辞書に入れておきます。

ユーザ辞書の書き方はこんな感じ。

よるほー,よるほー,ヨルホウ,カスタム名詞

WordCount Reducer

Reducerは普通にLongSumReducerを使っておいても良いところですが、全件出すと多いので出現回数が5件より少ない単語は出力しないようにしてみます。

class WordCountReducer extends Reducer[Text, LongWritable, Text, LongWritable] {
  type Context = Reducer[Text, LongWritable, Text, LongWritable]#Context

  override def reduce( key: Text, values: java.lang.Iterable[LongWritable], context: Context ) {
    var sum = 0L
    for ( l <- values.iterator() )
      sum += l.get

    // 全部吐くと多いから5件以上出現する単語だけにしてみる
    if ( sum >= 5 ) context.write( key, new LongWritable( sum ) )
  }
}

実行してみる

上のコードの全文はこんな感じになります。

コード全文はこちら

この手の処理はMapperとReducerだけでは不安なので、Combinerも入れてあります。in-mapper combiningを使うケースな気もするけど。

とりあえずassemblyプラグインでjarにして分散環境で動かしてみます。

$ mvn scala:compile
$ mvn assembly:assembly
$ hadoop jar target/shadoop-0.0.1-jar-with-dependencies.jar jp.mwsoft.sample.scala.hadoop.JaWordCount

TwitterのStreamingAPIで集めた1ヶ月分のツイートに対してワードカウントをしてみたところ、上位10件はこんな結果になりました。

$ hadoop fs -get data/out/part-r-00000 .
$ sort -nrk2 part-r-00000 | head -10

@	4489893
た	3207752
_	1876372
!	1852520
し	1537389
だ	1399632
ない	1320674
(	1186401
/	1181717
.	1137139

上位で取るだけだとだいたいこんな感じの面白みのない結果になります。

12月のデータを解析してみたところ、中位のあたりにクリスマス(77073回)やプレゼント(33622回)が登場してました。時期に応じた特徴的な言葉の抽出とかには使えそうです。

ちなみに辞書に入れた「よるほー」は1585回(1日平均50回強)登場してました。StreamingAPI(全体の1%くらい)で日に50回出現ってことは全体では1日5000くらいですかね。そこそこの出現率かと。

おまけ

最近はScala+MapReduceのコードを短縮できるようなライブラリを自前で作って遊んでます。

sshadoop

WordCountなら下記くらいの長さまでなら短縮できるようになりました。

import org.apache.hadoop.io._
import jp.mwsoft.sshadoop.mapreduce._
import jp.mwsoft.sshadoop.util.STool

object WordCountSample extends STool {

  override def runJob( args: Array[String] ): Boolean = {
    new SJob( getConf(), "jobName" )
      .mapper( new MyMapper() )
      .reducer( new MyReducer() )
      .combiner( new MyReducer() )
      .fileInputPaths( "data/in" )
      .fileOutputPath( "data/out" )
      .waitForCompletion( true )
  }

  class MyMapper extends SMapper[LongWritable, Text, Text, LongWritable] {
    override def map( key: LongWritable, value: Text, context: Context ) =
      for ( str <- value.split( "\\s" ) ) context.write( str, 1 )
  }

  class MyReducer extends SReducer[Text, LongWritable, Text, LongWritable] {
    override def reduce( key: Text, values: Iterator[LongWritable], context: Context ) =
      context.write( key, values.map( _.get ).sum )
  }
}