package jp.mwsoft.common.hbase
import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.{ HTable, HBaseAdmin, Get, Put, Delete, Scan, Result }
import org.apache.hadoop.hbase.{
HBaseConfiguration,
HTableDescriptor,
HColumnDescriptor
}
/** HBase操作用クラス */
class HBase(tableName: String, confPath: String = null) {
/** テーブルのインスタンス */
val table = new HTable(HBase.getConf(confPath), tableName)
/** データの登録 */
def put(row: Any, family: Any, qualifier: Any, value: Any) {
val put = new Put(HBase.toBytes(row))
put.add(HBase.toBytes(family), HBase.toBytes(qualifier), HBase.toBytes(value))
table.put(put)
}
/** データのインクリメント */
def incr(row: Any, family: Any, qualifier: Any, value: Long) {
table.incrementColumnValue(HBase.toBytes(row), HBase.toBytes(family),
HBase.toBytes(qualifier), value)
}
/** データの取得 */
def get(row: Any, family: Any, qualifier: Any): Array[Byte] = {
val result = table.get(new Get(HBase.toBytes(row)))
return result.getValue(HBase.toBytes(family), HBase.toBytes(qualifier))
}
/** 行の存在チェック */
def exists(row: Any): Boolean = table.exists(new Get(HBase.toBytes(row)))
/** 行の存在チェック */
def exists(row: Any, family: Any, qualifier: Any): Boolean = {
val get = new Get(HBase.toBytes(row))
get.addColumn(HBase.toBytes(family), HBase.toBytes(qualifier))
table.exists(get)
}
/** Long型でデータを取得する */
def getLong(row: Any, family: Any, qualifier: Any): Long =
Bytes.toLong(get(row, family, qualifier))
/** Double型でデータを取得する */
def getDouble(row: Any, family: Any, qualifier: Any): Double =
Bytes.toDouble(get(row, family, qualifier))
/** String型でデータを取得する */
def getString(row: Any, family: Any, qualifier: Any): String =
Bytes.toString(get(row, family, qualifier))
/** 指定rowのFamily, Qualifierを全て取得する */
def get(row: Any,
func: (Array[Byte], Array[Byte], Array[Byte]) => Unit) {
for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
func(kv.getFamily, kv.getQualifier, kv.getValue)
}
}
/** 指定rowのFamily, Qualifierを全て取得する */
def getBytes(row: Any,
func: (String, String, Array[Byte]) => Unit) {
for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier), kv.getValue)
}
}
/** 指定rowのFamily, Qualifierを全て取得する */
def getString(row: Any,
func: (String, String, String) => Unit) {
for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier),
Bytes.toString(kv.getValue))
}
}
/** 指定rowのFamily, Qualifierを全て取得する */
def getLong(row: Any,
func: (String, String, Long) => Unit) {
for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier),
Bytes.toLong(kv.getValue))
}
}
/** 指定rowのFamily, Qualifierを全て取得する */
def getDouble(row: Any,
func: (String, String, Double) => Unit) {
for (kv <- table.get(new Get(HBase.toBytes(row))).raw) {
func(Bytes.toString(kv.getFamily), Bytes.toString(kv.getQualifier),
Bytes.toDouble(kv.getValue))
}
}
/** 範囲指定して取得 */
def scan(startRow: Any, endRow: Any,
func: (Array[Byte], Array[Byte], Array[Byte], Array[Byte]) => Unit) {
val rs = table.getScanner(new Scan(HBase.toBytes(startRow), HBase.toBytes(endRow)))
try {
for (item <- rs) {
for (kv <- item.raw) {
func(item.getRow, kv.getFamily, kv.getQualifier, kv.getValue)
}
}
} finally {
rs.close
}
}
/** 範囲指定して取得 */
def scan(startRow: Any,
func: (Array[Byte], Array[Byte], Array[Byte], Array[Byte]) => Unit) {
scan(startRow, getNextBytes(HBase.toBytes(startRow)), func)
}
/** 範囲を指定して値(Array[Byte]型)を取得 */
def scanBytes(startRow: Any, endRow: Any,
func: (String, String, String, Array[Byte]) => Unit) {
scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
qualifier: Array[Byte], value: Array[Byte]) => {
func(Bytes.toString(row), Bytes.toString(family),
Bytes.toString(qualifier), value)
})
}
/** 範囲指定して取得 */
def scanBytes(startRow: Any,
func: (String, String, String, Array[Byte]) => Unit) {
scanBytes(startRow, getNextBytes(HBase.toBytes(startRow)), func)
}
/** 範囲を指定して値(String型)を取得 */
def scanString(startRow: Any, endRow: Any,
func: (String, String, String, String) => Unit) {
scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
qualifier: Array[Byte], value: Array[Byte]) => {
func(Bytes.toString(row), Bytes.toString(family),
Bytes.toString(qualifier), Bytes.toString(value))
})
}
/** 範囲指定して取得 */
def scanString(startRow: Any,
func: (String, String, String, String) => Unit) {
scanString(startRow, getNextBytes(HBase.toBytes(startRow)), func)
}
/** 範囲を指定して値(Long型)を取得 */
def scanLong(startRow: Any, endRow: Any,
func: (String, String, String, Long) => Unit) {
scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
qualifier: Array[Byte], value: Array[Byte]) => {
func(Bytes.toString(row), Bytes.toString(family),
Bytes.toString(qualifier), Bytes.toLong(value))
})
}
/** 範囲指定して取得 */
def scanLong(startRow: Any,
func: (String, String, String, Long) => Unit) {
scanLong(startRow, getNextBytes(HBase.toBytes(startRow)), func)
}
/** 範囲を指定して値(Double型)を取得 */
def scanDouble(startRow: Any, endRow: Any,
func: (String, String, String, Double) => Unit) {
scan(startRow, endRow, (row: Array[Byte], family: Array[Byte],
qualifier: Array[Byte], value: Array[Byte]) => {
func(Bytes.toString(row), Bytes.toString(family),
Bytes.toString(qualifier), Bytes.toDouble(value))
})
}
/** RowごとにScanする */
def scanRow(startRow: Any, endRow: Any, func: (Result) => Unit) {
table.getScanner(new Scan(HBase.toBytes(startRow), HBase.toBytes(endRow)))
val rs = table.getScanner(new Scan(HBase.toBytes(startRow), HBase.toBytes(endRow)))
try {
for (item <- rs)
func(item)
} finally {
rs.close
}
}
/** RowごとにScanする */
def scanRow(startRow: Any, func: (Result) => Unit) {
val startBytes = HBase.toBytes(startRow)
scanRow(startBytes, getNextBytes(startBytes), func)
}
/** 範囲指定して取得 */
def scanDouble(startRow: Any,
func: (String, String, String, Double) => Unit) {
scanDouble(startRow, getNextBytes(HBase.toBytes(startRow)), func)
}
/** Rowの削除 */
def delete(row: Any) {
val delete = new Delete(HBase.toBytes(row))
table.delete(delete)
}
/** Row+Familyの削除 */
def delete(row: Any, family: Any) {
val delete = new Delete(HBase.toBytes(row))
delete.deleteFamily(HBase.toBytes(family))
table.delete(delete)
}
/** Columnの削除 */
def delete(row: Any, family: Any, qualifier: Any) {
val delete = new Delete(HBase.toBytes(row))
delete.deleteColumn(HBase.toBytes(family), HBase.toBytes(qualifier))
table.delete(delete)
}
/** コミット */
def commit = table.flushCommits
/** close */
def close = table.close
/** 指定されたArray[Byte]の末尾に1を加算した配列を返す */
private def getNextBytes(bytes: Array[Byte]): Array[Byte] = {
var nextBytes = new Array[Byte](bytes.length)
nextBytes(nextBytes.length - 1) = (nextBytes(nextBytes.length - 1) + 1).toByte
nextBytes
}
}
/** HBase操作用オブジェクト */
object HBase {
// Configurationの設定
val DEFAULT_CONF_PATH = "conf/hbase-site.xml"
val DEFAULT_CONF = new HBaseConfiguration
DEFAULT_CONF.addResource(new Path(DEFAULT_CONF_PATH))
/**
* Bytesに変換する。
* 面倒なのでIntはLongに、FloatはDoubleに変換して入れる型のパターンを減らす。
*/
def toBytes(value: Any): Array[Byte] = {
if (value == null)
return null
if (value.isInstanceOf[Int])
return Bytes.toBytes(value.asInstanceOf[Int].toLong)
if (value.isInstanceOf[Long])
return Bytes.toBytes(value.asInstanceOf[Long])
if (value.isInstanceOf[Float])
return Bytes.toBytes(value.asInstanceOf[Float].toDouble)
if (value.isInstanceOf[Double])
return Bytes.toBytes(value.asInstanceOf[Double])
return Bytes.toBytes(value.toString)
}
/** Configurationの取得 */
def getConf(confPath: String = null): HBaseConfiguration = {
if (confPath == null) {
DEFAULT_CONF
} else {
val conf = new HBaseConfiguration()
conf.addResource(new Path(confPath))
conf
}
}
/** テーブル生成 */
def create(tableName: String, families: List[String],
conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
val descriptor = new HTableDescriptor(tableName)
for (family <- families)
descriptor.addFamily(new HColumnDescriptor(Bytes.toBytes(family)))
hAdmin.createTable(descriptor)
}
/** テーブル削除 */
def drop(tableName: String, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
hAdmin.disableTable(tableName)
hAdmin.deleteTable(tableName)
}
/** テーブルを再作成する。データは全て削除される。 */
def recreate(tableName: String, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
val descriptor = hAdmin.getTableDescriptor(Bytes.toBytes(tableName))
drop(tableName)
hAdmin.createTable(descriptor)
}
/** テーブルの存在チェック */
def exists(tableName: String, conf: HBaseConfiguration = getConf()): Boolean = {
val hAdmin = new HBaseAdmin(conf)
hAdmin.tableExists(tableName)
}
/** Desable */
def disable(tableName: String, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
hAdmin.disableTable(tableName)
}
/** Enable */
def enable(tableName: String, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
hAdmin.enableTable(tableName)
}
/** Disable後に指定関数を実行、Enableに戻す */
def disableWork(tableName: String, func: () => Unit, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
hAdmin.disableTable(tableName)
func.apply
hAdmin.enableTable(tableName)
}
/** テーブル一覧の取得 */
def list(conf: HBaseConfiguration = getConf()): List[String] = {
val hAdmin = new HBaseAdmin(conf)
var nameList = List[String]()
hAdmin.listTables.foreach(table => { nameList +:= table.getNameAsString })
nameList
}
/** カラム追加 */
def addColumn(tableName: String, familyName: String, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
val column = new HColumnDescriptor(familyName)
hAdmin.disableTable(tableName)
hAdmin.addColumn(tableName, column)
hAdmin.enableTable(tableName)
}
/** カラム削除 */
def deleteColumn(tableName: String, familyName: String, conf: HBaseConfiguration = getConf()) {
val hAdmin = new HBaseAdmin(conf)
hAdmin.disableTable(tableName)
hAdmin.deleteColumn(tableName, familyName)
hAdmin.enableTable(tableName)
}
def main(args: Array[String]) {
}
}