概要

本記事はCHD3を使ってSnappy(犬種はビーグル、趣味は変装)と少しばかり戯れた際のメモ書きです。

尚、本文中に出てくる実行時間はCore2DuoやAthllonⅡなどの割と貧弱なCPUによって実行されています。高性能なCPUで測ったらかなり数字は変わるはずなのでご注意ください。

@Date 2012/04/17
@Versions CDH3u3

Snappyの特徴

SnappyはGoogleが公開したオープンソースの圧縮ライブラリ。

圧縮率はイマイチだけど圧縮・伸長の速度は速い、Hadoopと相性の良い子。

ベンチマークを見た限りでは、圧縮率や速度はLZOと割と似た感じの数値になることが多い。

SnappyとLZOの一番の違いはライセンス。LZOはGPLなのでApacheライセンスのHadoopとは食べ合わせが悪い。対するSnappyはNew BSDなので同梱しやすい。

CDH3u3にはSnappyCodecが入っているので、特に追加のインストール作業をしなくてもHadoopで利用することができる。LZOは別途インストールをしないと使えない。

但しLZOのようなsplittableな圧縮は用意されてないらしいので、Snappyで圧縮したファイルをそのままMapReduceに食べさせるとMapperが1つしか動かない状態になる。まぁ、その辺はシーケンスファイルにするなりなんなりで。

snappy(google code)
http://code.google.com/p/snappy/

Snappy and Hadoop(Cloudera)
http://www.cloudera.com/blog/2011/09/snappy-and-hadoop/

日本語文字列を圧縮してみる

個人的に一番良く使うのは日本語文書なので、まずはその辺と触れ合ってみる。サンプルファイルはTwitterから収集した日本語Tweet843万件(941MB)。

CDH3のSnappyCodecを使う場合、Javaで下記のように書くと圧縮できる。SnappyCodecはJNIを使っているので、snappy.soが存在するパスがクラスパスに含まれている必要がある。

Configuration conf = new Configuration();
SnappyCodec codec = new SnappyCodec();
codec.setConf( conf );
InputStream is = new FileInputStream("sample.txt");
OutputStream os = codec.createOutputStream(new FileOutputStream("sample.snappy"));
IOUtils.copyBytes( is, os, conf );

当該ファイルを圧縮してみたところ、所要時間は下記のようになった。参考までにHadoopにデフォルトで用意されてるGzipCodecとBZip2Codecの数字も並べておく。実行したマシンのCPUはAthlonII X4 640。

形式所要時間サイズ圧縮率
SnappyCodec8.00秒611MB64.9%
GzipCodec93.49秒401MB42.6%
BZip2Codec666.66秒275MB29.2%

SnappyCodecでは圧縮にかかった時間は僅か8秒。速い。試しにcpコマンドで元ファイルをそのままコピーしてみたところ、8.9秒かかった。つまり、圧縮した方がコピーするだけより速かった(JVMの起動時間は除く)。

圧縮するファイルの内容によって圧縮率は変化するので一概には言えないけど、Snappyの圧縮速度は秒間250MB/s出る場合もあるとか。HDDのシーケンシャルなWriteはモノによるけど120MB/s程度だったはずなので、HDDの書き込み速度より圧縮速度が速くなっても不思議ではない。

Bzip2はやけに遅い。この辺はNative Library使えば速くなる気もするし、貧弱なCPUのせいという気もする。

伸長性能を見てみる

逆に今圧縮したものを伸長してみる。先ほど圧縮したものをCodecを使ってInputStreamに入れて最後まで読み終わるまでの時間を計測する。

コード的にはこんな感じ。

Configuration conf = new Configuration();
SnappyCodec codec = new SnappyCodec();
codec.setConf(conf);
BufferedReader reader = new BufferedReader(new InputStreamReader(codec.createInputStream(
    new FileInputStream("sample.snappy"))));

String line = null;
while ((line = reader.readLine()) != null) { }

実行結果はこんな感じ。

形式所要時間
SnappyCodec9.21秒
GzipCodec14.04秒
Bzip2Codec123.34秒

GzipCodecは割と頑張っている。CPUの性能次第では逆転するんじゃないだろうか。

MapReduceのMapperの出力ファイルとして使ってみる

なんとなく性能は分かったので、次はMapReduceの中間ファイルとして使ってみる。

出力する情報量が多くなりそうな処理として、ソート処理を行なってみる。TaskTracker2台の貧弱構成で実行。

SnappyをMapperの出力に使うには、mapred-site.xmlに以下のように記述すれば良いらしい。

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
</property>
<property>
  <name>mapred.map.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

今回はCodecを替えながら実行したいので、圧縮形式をコマンドライン引数で指定することにする。ソートするファイルは上でも使ったTwitterのファイル。

$ hadoop jar my.jar -Dmapred.compress.map.output=true -Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec samle.Sort

結果、圧縮しなかった場合と比べて実行時間はほとんど変化しなかった。小さな構成でやってるのでこれがもう少し大きくなった場合にはどうなるだろうか。

Mapperが出力したデータサイズは、圧縮しなかった場合が1,763,804,020、圧縮した場合が1,258,530,991と、約30%削減されていた。

DefaultCodeやBZip2Codeを利用した場合はより情報量は小さくなるけど、実行時間は逆に長くなっていた。

多段MapReduceの中間ファイルとして使ってみる

多段でMapReduceをかける際の中間ファイルをSnappyで圧縮したシーケンスファイルにしてみる。ブロックで圧縮したシーケンスファイルならMapReduceの入力ファイルとして利用した際に、ちゃんとMapperが複数働いてくれる。

試しにテキストファイルを読み込んでそのまま出力するという無意味な行為を2段階実行するコードを書いてみる。

// 第一弾出力
Job job1 = new Job(conf, "job1");
job1.setJarByClass(Sample1.class);
job1.setMapperClass(Mapper.class);
job1.setOutputKeyClass(LongWritable.class);
job1.setOutputValueClass(Text.class);
job1.setNumReduceTasks(4);
FileInputFormat.addInputPath(job1, new Path("data/in/tweet.txt"));
// 出力にSnappyで圧縮したシーケンスファイルを指定
SequenceFileOutputFormat.setOutputPath(job1, new Path("data/out/temp1"));
SequenceFileOutputFormat.setOutputCompressionType(job1, CompressionType.BLOCK);
SequenceFileOutputFormat.setOutputCompressorClass(job1, SnappyCodec.class);
job1.waitForCompletion(true);

// 結果ファイルの名前変更
FileSystem fs = FileSystem.get(conf);
FileStatus[] states = fs.listStatus(new Path("data/out/temp1"));
for (FileStatus status : states) {
  Path path = status.getPath();
  String name = path.getName();
  if (name.endsWith(".snappy"))
    fs.rename(path, new Path(path.getParent(), name.substring(0, name.length() - 7)));
}

// 第二弾出力
Job job2 = new Job(conf, "job2");
job2.setJarByClass(Sample1.class);
job2.setMapperClass(Mapper.class);
job2.setOutputKeyClass(LongWritable.class);
job2.setOutputValueClass(Text.class);
// 入力に第一弾が出力したシーケンスファイルを指定
SequenceFileInputFormat.addInputPath(job2, new Path("data/out/temp1"));
FileOutputFormat.setOutputPath(job2, new Path("data/out/finish1"));
job2.waitForCompletion(true);

途中でファイルの名前を変えているのは、ファイル名の末尾が「.snappy」になってると圧縮ファイルだと判断されてMapperが1つしか走らなかったため。なんか他に書きようがあるような気がする。

このコードを実行した場合、一段目のWriteするデータサイズは、圧縮しない場合は942,509,025だったのが、657,959,732に減少した(約30%減)。

二段目のReadするファイルのサイズも当然同じように減少する。実行速度も若干速くなっていた。

感想

どの圧縮形式が適切かはその時々で変わるのだろうけど、Snappyは少ない負荷でファイルを圧縮伸長できるので、あまり頭を使わず「とりあえず使っとくか」的なノリで使えそうな気がした。