ScalaからHBaseのテーブル追加、ファミリー追加、検索、行追加などをしてみる。
1. HBaseが起動していること
2. 以下のjarがクラスパスに入っていること(バージョンは適宜読み替え)
・hbase-0.90.4-cdh3u3.jar
・hadoop-0.20.2-cdh3u3.jar
・commons-logging-1.0.4.jar
・log4j-1.2.15.jar
主に利用するクラスは以下の2つ
1. HBaseAdmin
http://hbase.apache.org/docs/current/api/org/apache/hadoop/hbase/client/HBaseAdmin.html
2. HTable
http://hbase.apache.org/docs/current/api/org/apache/hadoop/hbase/client/HTable.html
HBaseAdminはテーブルの生成、削除、Disable、Enable、Familyの追加、テーブル一覧の取得、サーバの状態確認や終了処理など、管理っぽい処理を行う。SQLで言うと、CREATEやDROP、ALTER TABLEあたり。
HTableは1行検索、範囲検索、行の追加、削除などを行う。SQLで言うと、SELECT、INSERT、DELETEあたり。
まずはHBaseAdminの初期化。hbase-site.xmlを読み込む場合は下記のような感じ。
// Configurationにhbase-site.xmlのパスを指定する
val conf = HBaseConfiguration.create()
conf.addResource( new Path( "conf/hbase-site.xml" ) )
val admin = new HBaseAdmin(conf)
リモートのHBaseに繋ぐ場合は、下記のような感じ。
// Configurationに対象サーバを指定
val conf = HBaseConfiguration.create()
conf.set( "hbase.zookeeper.quorum", "nezumi" );
conf.set( "hbase.zookeeper.property.clientPort", "2181" );
val admin = new HBaseAdmin( conf )
テーブルを作成する。tableNameというテーブル名で、2つのFamilyを持つテーブルを生成してみる。
val descriptor = new HTableDescriptor( "tableName" )
descriptor.addFamily( new HColumnDescriptor( "family1" ) )
descriptor.addFamily( new HColumnDescriptor( "family2" ) )
admin.createTable( descriptor )
テーブルを削除する。削除はdeleteTableで行う。けど、その前にdisableTableでテーブル操作を無効状態にしておかないとエラーになる。
admin.disableTable( "tableName" )
admin.deleteTable( "tableName" )
disableTableでテーブルを無効(takes it off-line)にする
admin.disableTable( "tableName" )
enableTableで無効になったテーブルを有効(on-line)にする。
admin.enableTable( "tableName" )
listTablesでテーブルの一覧を取得する。戻り値はCREATEでも使った、HTableDescriptorというテーブルの情報を保持するクラスの配列で返る。
for( descriptor <- admin.listTables )
println( descriptor.getNameAsString )
テーブルを再作成してみる。getTableDescriptorでテーブル情報を取得し、それを利用してテーブルを削除後に再作成。
val descriptor = admin.getTableDescriptor( Bytes.toBytes( "tableName" ) )
admin.disableTable( "tableName" )
admin.deleteTable( "tableName" )
admin.createTable( descriptor )
tableExistsでテーブルの存在チェック。存在すればtrueが、存在しなければfalseが返る。
hAdmin.tableExists("tableName")
addColumnでカラム追加をする。追加する際はテーブルのdisableが必要。
val column = new HColumnDescriptor( "family3" )
admin.disableTable( "tableName" )
admin.addColumn( "tableName", column )
admin.enableTable( "tableName" )
deleteColumnでカラムの削除。削除する際はテーブルのdisableが必要。
admin.disableTable( "tableName" )
admin.deleteColumn( "tableName", "family3" )
admin.enableTable( "tableName" )
この他にもサーバのshutdownとか、flushとかcompactなんかも実行できる。でも、compactとmajorCompactの差がイマイチわかってない。いつかちゃんと勉強する。
HBaseに値を渡す時はByteの配列に変換してから渡す。変換にはorg.apache.hadoop.hbase.util.Bytesを使う。
// Bytes.toBytesでArray[Byte]に変換される
val bytes = Bytes.toBytes( "abc" )
bytes foreach println
//=> 97\n98\n99
// 元に戻す時はBytes.toString
println( Bytes.toString( bytes ) )
//=> abc
String以外にも、BigDecimal, Char, Array[Char], Double, Float, Int, Long, Shortの変換ができる。
http://hbase.apache.org/docs/current/api/org/apache/hadoop/hbase/util/Bytes.html
毎度変換するのは面倒なので、下記のような暗黙の型変換を定義しておくとそこはかとなく便利。
implicit def boolean2hbaseBytes( value: Boolean ) = Bytes.toBytes( value )
implicit def double2hbaseBytes( value: Double ) = Bytes.toBytes( value )
implicit def float2hbaseBytes( value: Float ) = Bytes.toBytes( value )
implicit def int2hbaseBytes( value: Int ) = Bytes.toBytes( value )
implicit def long2hbaseBytes( value: Long ) = Bytes.toBytes( value )
implicit def short2hbaseBytes( value: Short ) = Bytes.toBytes( value )
implicit def string2hbaseBytes( value: String ) = Bytes.toBytes( value )
implicit def hbaseBytes2boolean( value: Array[Byte] ) = Bytes.toBoolean( value )
implicit def hbaseBytes2double( value: Array[Byte] ) = Bytes.toDouble( value )
implicit def hbaseBytes2float( value: Array[Byte] ) = Bytes.toFloat( value )
implicit def hbaseBytes2int( value: Array[Byte] ) = Bytes.toInt( value )
implicit def hbaseBytes2long( value: Array[Byte] ) = Bytes.toLong( value )
implicit def hbaseBytes2short( value: Array[Byte] ) = Bytes.toShort( value )
implicit def hbaseBytes2string( value: Array[Byte] ) = Bytes.toString( value )
HBaseAdminと同じくHBaseConfigurationを引数に指定する。
val conf = HBaseConfiguration.create()
val table = new HTable( conf, "tableName" )
データの登録にはPutクラスとputメソッドを使う。
val put = new Put( Bytes.toBytes( "row" ) )
put.add( Bytes.toBytes( "family1" ), Bytes.toBytes( "qualifier" ), Bytes.toBytes( "value" ) )
table.put( put )
checkAndPutを使うと、指定したカラムが指定値の場合のみ、Putが実行される。指定した値でなかった場合はPutは実行されずfalseが返る。
val put = new Put(Bytes.toBytes("row"))
put.add(Bytes.toBytes("family1"), Bytes.toBytes("qualifierB"), Bytes.toBytes("valueB"))
// カラム(row/family1/qualifier)の値がvalueの場合のみ、putが実行される
table.checkAndPut( Byts.toBytes( "row" ), Byts.toBytes( "family1" ), Byts.toBytes( "qualifier" ),
Byts.toBytes( "value" ), put)
指定したカラムのデータを取得する場合は、Getクラスとgetメソッドを使う。
val result = table.get( new Get( Bytes.toBytes( "row" ) ) )
val value = result.getColumnLatest( Bytes.toBytes( "family1" ), Bytes.toBytes( "qualifier" ) )
println( Bytes.toString( value.getValue() ) )
//=> value
incrementColumnValueで指定カラムのインクリメントができる。カラムの内容はLong型である必要があるらしい。データが存在しない場合は新規にレコードが登録される。下記の例では値が3インクリメントされる。
table.incrementColumnValue( Bytes.toBytes( "row" ),
Bytes.toBytes( "family1" ), Bytes.toBytes( "qualifier" ), 3L )
existsで行の存在チェックを行う。指定したRowが存在すればtrue、存在しなければfalse。
table.exists(new Get(Bytes.toBytes("row")))
deleteで指定したRowやカラムを削除する。
// Row全体を削除
val delete = new Delete( Bytes.toBytes( "row" ) )
table.delete( delete )
// 指定カラムを削除
val delete = new Delete( Bytes.toBytes( "row" ) )
delete.deleteFamily( Bytes.toBytes( "family" ) )
table.delete( delete )
scanを使うと範囲検索ができる。startRow〜endRowを範囲に指定する。startRowは指定した値自体が範囲に入る。endRowは指定した値自体は範囲に入らない。つまりstartに10、endに15を指定すると、10,11,12,13,14が範囲になる。
val scan = new Scan( Bytes.toBytes( "startRow" ), Bytes.toBytes( "endRow" )
val resultScanner = table.getScanner( scan )
for ( result <- resultScanner ) {
for ( keyValue <- result.raw ) {
println( Bytes.toString( result.getRow ) )
println( Bytes.toString( keyValue.getFamily ) )
println( Bytes.toString( keyValue.getQualifier ) )
println( Bytes.toString( keyValue.getValue ) )
}
}
flushCommitsかcloseを実行するとPutを実行した内容が反映される。autoCommitがtrueになってる場合は、自動的に実行される。
// コミットする
table.flushCommits
// ソースを見る限り、closeもflushCommitsと同じ処理
table.close
この他にもgetTableNameとか、setAutoFlushなどのメソッドもある。