ScalaからHBaseを使ってみる(0.20.6)

package jp.mwsoft.common.hbase

import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.{ HTable, HBaseAdmin, Get, Put, Delete, Scan, Result }
import org.apache.hadoop.hbase.{
    HBaseConfiguration,
    HTableDescriptor,
    HColumnDescriptor
}

/** HBase操作用クラス */
class HBase(tableName: String, confPath: String = null) {
    /** テーブルのインスタンス */
    val table = new HTable(HBase.getConf(confPath), tableName)

    /** データの登録 */
    def put(row: Any, family: Any, qualifier: Any, value: Any) {
        val put = new Put(HBase.toBytes(row))
        put.add(HBase.toBytes(family), HBase.toBytes(qualifier), HBase.toBytes(value))
        table.put(put)
    }

    /** データのインクリメント */
    def incr(row: Any, family: Any, qualifier: Any, value: Long) {
        table.incrementColumnValue(HBase.toBytes(row), HBase.toBytes(family),
            HBase.toBytes(qualifier), value)
    }

    /** データの取得 */
    def get(row: Any, family: Any, qualifier: Any): Array[Byte] = {
        val result = table.get(new Get(HBase.toBytes(row)))
        return result.getValue(HBase.toBytes(family), HBase.toBytes(qualifier))
    }

    /** 行の存在チェック */
    def exists(row: Any): Boolean = table.exists(new Get(HBase.toBytes(row)))

    /** 行の存在チェック */
    def exists(row: Any, family: Any, qualifier: Any): Boolean = {
        val get = new Get(HBase.toBytes(row))
        get.addColumn(HBase.toBytes(family), HBase.toBytes(qualifier))
        table.exists(get)
    }

    /** Long型でデータを取得する */
    def getLong(row: Any, family: Any, qualifier: Any): Long =
        Bytes.toLong(get(row, family, qualifier))

    /** Double型でデータを取得する */
    def getDouble(row: Any, family: Any, qualifier: Any): Double =
        Bytes.toDouble(get(row, family, qualifier))

    /** String型でデータを取得する */
    def getString(row: Any, family: Any, qualifier: Any): String =
        Bytes.toString(get(row, family, qualifier))

    /** 指定rowのFamily, Qualifierを全て取得する */
    def get(row: Any,
        func: (Array[Byte], Array[Byte], Array[Byte]) => Unit) {
        for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
            func(kv.getFamily, kv.getQualifier, kv.getValue)
        }
    }

    /** 指定rowのFamily, Qualifierを全て取得する */
    def getBytes(row: Any,
        func: (String, String, Array[Byte]) => Unit) {
        for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
            func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier), kv.getValue)
        }
    }

    /** 指定rowのFamily, Qualifierを全て取得する */
    def getString(row: Any,
        func: (String, String, String) => Unit) {
        for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
            func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier),
                Bytes.toString(kv.getValue))
        }
    }

    /** 指定rowのFamily, Qualifierを全て取得する */
    def getLong(row: Any,
        func: (String, String, Long) => Unit) {
        for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
            func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier),
                Bytes.toLong(kv.getValue))
        }
    }

    /** 指定rowのFamily, Qualifierを全て取得する */
    def getDouble(row: Any,
        func: (String, String, Double) => Unit) {
        for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
            func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier),
                Bytes.toDouble(kv.getValue))
        }
    }

    /** 範囲指定して取得 */
    def scan(startRow: Any, endRow: Any,
        func: (Array[Byte], Array[Byte], Array[Byte], Array[Byte]) => Unit) {
        val rs = table.getScanner(new Scan(HBase.toBytes(startRow), HBase.toBytes(endRow)))
        try {
            for (item <- rs) {
                for (kv <- item.raw) {
                    func(item.getRow, kv.getFamily, kv.getQualifier, kv.getValue)
                }
            }
        } finally {
            rs.close
        }
    }

    /** 範囲指定して取得 */
    def scan(startRow: Any,
        func: (Array[Byte], Array[Byte], Array[Byte], Array[Byte]) => Unit) {
        scan(startRow, getNextBytes(HBase.toBytes(startRow)), func)
    }

    /** 範囲を指定して値(Array[Byte]型)を取得 */
    def scanBytes(startRow: Any, endRow: Any,
        func: (String, String, String, Array[Byte]) => Unit) {
        scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
            qualifier: Array[Byte], value: Array[Byte]) => {
            func(Bytes.toString(row), Bytes.toString(family),
                Bytes.toString(qualifier), value)
        })
    }

    /** 範囲指定して取得 */
    def scanBytes(startRow: Any,
        func: (String, String, String, Array[Byte]) => Unit) {
        scanBytes(startRow, getNextBytes(HBase.toBytes(startRow)), func)
    }

    /** 範囲を指定して値(String型)を取得 */
    def scanString(startRow: Any, endRow: Any,
        func: (String, String, String, String) => Unit) {
        scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
            qualifier: Array[Byte], value: Array[Byte]) => {
            func(Bytes.toString(row), Bytes.toString(family),
                Bytes.toString(qualifier), Bytes.toString(value))
        })
    }

    /** 範囲指定して取得 */
    def scanString(startRow: Any,
        func: (String, String, String, String) => Unit) {
        scanString(startRow, getNextBytes(HBase.toBytes(startRow)), func)
    }

    /** 範囲を指定して値(Long型)を取得 */
    def scanLong(startRow: Any, endRow: Any,
        func: (String, String, String, Long) => Unit) {
        scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
            qualifier: Array[Byte], value: Array[Byte]) => {
            func(Bytes.toString(row), Bytes.toString(family),
                Bytes.toString(qualifier), Bytes.toLong(value))
        })
    }

    /** 範囲指定して取得 */
    def scanLong(startRow: Any,
        func: (String, String, String, Long) => Unit) {
        scanLong(startRow, getNextBytes(HBase.toBytes(startRow)), func)
    }

    /** 範囲を指定して値(Double型)を取得 */
    def scanDouble(startRow: Any, endRow: Any,
        func: (String, String, String, Double) => Unit) {
        scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
            qualifier: Array[Byte], value: Array[Byte]) => {
            func(Bytes.toString(row), Bytes.toString(family),
                Bytes.toString(qualifier), Bytes.toDouble(value))
        })
    }

    /** RowごとにScanする */
    def scanRow(startRow: Any, endRow: Any, func: (Result) => Unit) {
        table.getScanner(new Scan(HBase.toBytes(startRow), HBase.toBytes(endRow)))
        val rs = table.getScanner(new Scan(HBase.toBytes(startRow), HBase.toBytes(endRow)))
        try {
            for (item <- rs)
                func(item)
        } finally {
            rs.close
        }
    }

    /** RowごとにScanする */
    def scanRow(startRow: Any, func: (Result) => Unit) {
        val startBytes = HBase.toBytes(startRow)
        scanRow(startBytes, getNextBytes(startBytes), func)
    }

    /** 範囲指定して取得 */
    def scanDouble(startRow: Any,
        func: (String, String, String, Double) => Unit) {
        scanDouble(startRow, getNextBytes(HBase.toBytes(startRow)), func)
    }

    /** Rowの削除 */
    def delete(row: Any) {
        val delete = new Delete(HBase.toBytes(row))
        table.delete(delete)
    }

    /** Row+Familyの削除 */
    def delete(row: Any, family: Any) {
        val delete = new Delete(HBase.toBytes(row))
        delete.deleteFamily(HBase.toBytes(family))
        table.delete(delete)
    }

    /** Columnの削除 */
    def delete(row: Any, family: Any, qualifier: Any) {
        val delete = new Delete(HBase.toBytes(row))
        delete.deleteColumn(HBase.toBytes(family), HBase.toBytes(qualifier))
        table.delete(delete)
    }

    /** コミット */
    def commit = table.flushCommits

    /** close */
    def close = table.close

    /** 指定されたArray[Byte]の末尾に1を加算した配列を返す */
    private def getNextBytes(bytes: Array[Byte]): Array[Byte] = {
        var nextBytes = new Array[Byte](bytes.length)
        nextBytes(nextBytes.length - 1) = (nextBytes(nextBytes.length - 1) + 1).toByte
        nextBytes
    }

}

/** HBase操作用オブジェクト */
object HBase {
    // Configurationの設定
    val DEFAULT_CONF_PATH = "conf/hbase-site.xml"
    val DEFAULT_CONF = new HBaseConfiguration
    DEFAULT_CONF.addResource(new Path(DEFAULT_CONF_PATH))

    /**
     * Bytesに変換する。
     * 面倒なのでIntはLongに、FloatはDoubleに変換して入れる型のパターンを減らす。
     */
    def toBytes(value: Any): Array[Byte] = {
        if (value == null)
            return null
        if (value.isInstanceOf[Int])
            return Bytes.toBytes(value.asInstanceOf[Int].toLong)
        if (value.isInstanceOf[Long])
            return Bytes.toBytes(value.asInstanceOf[Long])
        if (value.isInstanceOf[Float])
            return Bytes.toBytes(value.asInstanceOf[Float].toDouble)
        if (value.isInstanceOf[Double])
            return Bytes.toBytes(value.asInstanceOf[Double])

        return Bytes.toBytes(value.toString)
    }

    /** Configurationの取得 */
    def getConf(confPath: String = null): HBaseConfiguration = {
        if (confPath == null) {
            DEFAULT_CONF
        } else {
            val conf = new HBaseConfiguration()
            conf.addResource(new Path(confPath))
            conf
        }
    }

    /** テーブル生成 */
    def create(tableName: String, families: List[String],
        conf: HBaseConfiguration = getConf()) {

        val hAdmin = new HBaseAdmin(conf)
        val descriptor = new HTableDescriptor(tableName)
        for (family <- families)
            descriptor.addFamily(new HColumnDescriptor(Bytes.toBytes(family)))

        hAdmin.createTable(descriptor)
    }

    /** テーブル削除 */
    def drop(tableName: String, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        hAdmin.disableTable(tableName)
        hAdmin.deleteTable(tableName)
    }

    /** テーブルを再作成する。データは全て削除される。 */
    def recreate(tableName: String, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        val descriptor = hAdmin.getTableDescriptor(Bytes.toBytes(tableName))
        drop(tableName)
        hAdmin.createTable(descriptor)
    }

    /** テーブルの存在チェック */
    def exists(tableName: String, conf: HBaseConfiguration = getConf()): Boolean = {
        val hAdmin = new HBaseAdmin(conf)
        hAdmin.tableExists(tableName)
    }

    /** Desable */
    def disable(tableName: String, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        hAdmin.disableTable(tableName)
    }

    /** Enable */
    def enable(tableName: String, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        hAdmin.enableTable(tableName)
    }

    /** Disable後に指定関数を実行、Enableに戻す */
    def disableWork(tableName: String, func: () => Unit, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        hAdmin.disableTable(tableName)
        func.apply
        hAdmin.enableTable(tableName)
    }

    /** テーブル一覧の取得 */
    def list(conf: HBaseConfiguration = getConf()): List[String] = {
        val hAdmin = new HBaseAdmin(conf)
        var nameList = List[String]()
        hAdmin.listTables.foreach(table => { nameList +:= table.getNameAsString })
        nameList
    }

    /** カラム追加 */
    def addColumn(tableName: String, familyName: String, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        val column = new HColumnDescriptor(familyName)
        hAdmin.disableTable(tableName)
        hAdmin.addColumn(tableName, column)
        hAdmin.enableTable(tableName)
    }

    /** カラム削除 */
    def deleteColumn(tableName: String, familyName: String, conf: HBaseConfiguration = getConf()) {
        val hAdmin = new HBaseAdmin(conf)
        hAdmin.disableTable(tableName)
        hAdmin.deleteColumn(tableName, familyName)
        hAdmin.enableTable(tableName)
    }

    def main(args: Array[String]) {
    }
}