Impalaさんと親しくなろうという事で(この子を仕事で使う機会があるかは知らない)、とりあえずimpala-shellから「あれやってこれやって」と命令を発してみる。
適当なCSVファイルをHDFSに置いて、そこからテーブルを作ってSELECTしてみる。CSVの内容は下記みたいな感じにしておく。
1,文字列1,true,101.2,2013-10-24 10:20:05 2,文字列2,false,53.15,2013-10-25 10:20:05 3,文字列3,true,-18.5,2013-10-26 10:20:05 4,文字列4,false,0,2013-10-27 10:20:05 5,文字列5,true,-12.35,2013-10-28 10:20:05
test1.csvとでも名付けて、HDFSにあげて、CREATE TABLEする。
// 適当なディレクトリ作って $ hadoop fs -mkdir /user/cloudera/tmp // ファイルを上げて $ hadoop fs -put test1.csv /user/cloudera/tmp/ // ちゃんと上がったか確認して $ hadoop fs -ls /user/cloudera/tmp/ // impala-shellに入って $ impala-shell // LOCATIONや区切り文字を指定しつつCREATE文を投げる CREATE EXTERNAL TABLE example_table1 ( id INT, str_column STRING, bool_column BOOLEAN, double_column DOUBLE, date_column TIMESTAMP ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/cloudera/tmp/';
LOCATION(CSVファイルを置いた場所)はディレクトリで指定する。指定されたディレクトリ配下は全部データとして見られる。
EXTERNAL TABLEはCSVなどの外部ファイルから作成する時に利用する。外部ファイルが変わったら、 refreshコマンドを打つと反映される。
んじゃ、SELECTしてみましょか。
> SELECT * FROM example_table1 WHERE id = 3; +----+------------+-------------+---------------+---------------------+ | id | str_column | bool_column | double_column | date_column | +----+------------+-------------+---------------+---------------------+ | 3 | 文字列3 | true | -18.5 | 2013-10-26 10:20:05 | +----+------------+-------------+---------------+---------------------+ Returned 1 row(s) in 0.22s
0.22secで結果が返ってきた。1台構成で5行しかレコードがない状態。
データ型は、boolean, tinyint, smallint, int, bigint, float, double, timestamp, stringが設定できるらしい。
テーブルに対してINSERTも実行できる。さっそくクエリ打ってみようか。
> insert into example_tables1 values( 6, '文字列6', false, -10.31, '2013-11-01 03:50:35' ); ERROR: Failed to open HDFS file for writing: hdfs://localhost.localdomain:8020/user/cloudera/tmp/...
怒られた。impalaのコマンドはおそらくimpalaユーザで行われているので、writeの権限がないようだ。面倒な時、人は777と言う。
$ hadoop fs -chmod 777 /user/cloudera/tmp
これでもう1回INSERTコマンドを打ったら、今度は通った。全部SELECTしてみる。
> SELECT * FROM example_table1; +----+------------+-------------+---------------+---------------------+ | id | str_column | bool_column | double_column | date_column | +----+------------+-------------+---------------+---------------------+ | 6 | 文字列6 | false | -10.31 | 2013-11-01 03:50:35 | | 1 | 文字列1 | true | 101.2 | 2013-10-24 10:20:05 | | 2 | 文字列2 | false | 53.15 | 2013-10-25 10:20:05 | | 3 | 文字列3 | true | -18.5 | 2013-10-26 10:20:05 | | 4 | 文字列4 | false | 0 | 2013-10-27 10:20:05 | | 5 | 文字列5 | true | -12.35 | 2013-10-28 10:20:05 | +----+------------+-------------+---------------+---------------------+
INSERTしたのが一番上にいますね。
普通にWHERE句を使ったり、AVGやSUM、JOINなども使える。
とりあえずCOUNT、SUM、AVGあたりを使ってみる。
> select count(double_column), sum(double_column), avg(double_column) from example_table1; +----------------------+--------------------+--------------------+ | count(double_column) | sum(double_column) | avg(double_column) | +----------------------+--------------------+--------------------+ | 6 | 113.19 | 18.865 | +----------------------+--------------------+--------------------+
普通のSQLと同じ感覚で使える。SQLは世界共通言語だからね。使いやすくていいね。
WHERE句、ORDER BY、LIMITなども使ってみる。尚、ORDER BYはLIMITと合わせないと使えないらしい。
> select * from example_table1 where bool_column = false order by id limit 100; +----+------------+-------------+---------------+---------------------+ | id | str_column | bool_column | double_column | date_column | +----+------------+-------------+---------------+---------------------+ | 2 | 文字列2 | false | 53.15 | 2013-10-25 10:20:05 | | 4 | 文字列4 | false | 0 | 2013-10-27 10:20:05 | | 6 | 文字列6 | false | -10.31 | 2013-11-01 03:50:35 | +----+------------+-------------+---------------+---------------------+
BETWEENとかも普通に使える。
> select * from example_table1 where date_column between '2013-10-27' and '2013-10-29'; +----+------------+-------------+---------------+---------------------+ | id | str_column | bool_column | double_column | date_column | +----+------------+-------------+---------------+---------------------+ | 4 | 文字列4 | false | 0 | 2013-10-27 10:20:05 | | 5 | 文字列5 | true | -12.35 | 2013-10-28 10:20:05 | +----+------------+-------------+---------------+---------------------+
次はJOINいってみよう。まずは別テーブルを1つ作ってデータも入れておく。
> CREATE EXTERNAL TABLE example_table2 ( id INT, str_column STRING ); > INSERT INTO example_table2 VALUES( 1, 'JOIN 文字列1' ); > INSERT INTO example_table2 VALUES( 3, 'JOIN 文字列3' );
今作ったテーブルと前に作ったテーブルとでJOINしてみる。
> select * from example_table1 t1, example_table2 t2 where t1.id = t2.id; +----+------------+-------------+---------------+---------------------+----+--------------+ | id | str_column | bool_column | double_column | date_column | id | str_column | +----+------------+-------------+---------------+---------------------+----+--------------+ | 1 | 文字列1 | true | 101.2 | 2013-10-24 10:20:05 | 1 | JOIN 文字列1 | | 3 | 文字列3 | true | -18.5 | 2013-10-26 10:20:05 | 3 | JOIN 文字列3 | | 5 | 文字列5 | true | -12.35 | 2013-10-28 10:20:05 | 5 | JOIN 文字列5 | +----+------------+-------------+---------------+---------------------+----+--------------+
LEFT JOINとかもばっちり。
> select * from example_table1 t1 left join example_table2 t2 on t1.id = t2.id ; +----+------------+-------------+---------------+---------------------+------+--------------+ | id | str_column | bool_column | double_column | date_column | id | str_column | +----+------------+-------------+---------------+---------------------+------+--------------+ | 6 | 文字列6 | false | -10.31 | 2013-11-01 03:50:35 | NULL | NULL | | 1 | 文字列1 | true | 101.2 | 2013-10-24 10:20:05 | 1 | JOIN 文字列1 | | 2 | 文字列2 | false | 53.15 | 2013-10-25 10:20:05 | NULL | NULL | | 3 | 文字列3 | true | -18.5 | 2013-10-26 10:20:05 | 3 | JOIN 文字列3 | | 4 | 文字列4 | false | 0 | 2013-10-27 10:20:05 | NULL | NULL | | 5 | 文字列5 | true | -12.35 | 2013-10-28 10:20:05 | 5 | JOIN 文字列5 | +----+------------+-------------+---------------+---------------------+------+--------------+
JOINとかAVGとか打つと、裏でどうやってるんだろうと気になってしまうもの。そんな時はEXPLAINを打てば良いらしい。
単純なWHEREでidを指定するSQLをexplainしてみる。
> explain select * from example_table1 where id = 1; Explain query: select * from example_table1 where id = 1 PLAN FRAGMENT 0 PARTITION: UNPARTITIONED 1:EXCHANGE tuple ids: 0 PLAN FRAGMENT 1 PARTITION: RANDOM STREAM DATA SINK EXCHANGE ID: 1 UNPARTITIONED 0:SCAN HDFS table=default.example_table1 #partitions=1 size=265B predicates: id = 1 tuple ids: 0
見方がわからない・・・。とりあえずHDFSからデータを読んで、coordinatorでまとめてといったことをしているはず。たぶん。
COUNTとかSUMとかAVGとかをしているクエリもexplainしてみる。
> explain select count(double_column), sum(double_column), avg(double_column) from example_table1; Explain query: select count(double_column), sum(double_column), avg(double_column) from example_table1 PLAN FRAGMENT 0 PARTITION: UNPARTITIONED 3:AGGREGATE | output: SUM(<slot 1>), SUM(<slot 2>) | group by: | tuple ids: 1 | 2:EXCHANGE tuple ids: 1 PLAN FRAGMENT 1 PARTITION: RANDOM STREAM DATA SINK EXCHANGE ID: 2 UNPARTITIONED 1:AGGREGATE | output: COUNT(double_column), SUM(double_column) | group by: | tuple ids: 1 | 0:SCAN HDFS table=default.example_table1 #partitions=1 size=265B tuple ids: 0
SCANしてAGGREGATEしますよ、と。多分、下から順に0, 1, 2, 3と処理をしていく。各nodeでaggregateして、最後にcoodinator nodeでaggregateするとかそんな感じだろうか。
JOINについても見てみよう。
> explain select * from example_table1 t1, example_table2 t2 where t1.id = t2.id; Explain query: select * from example_table1 t1, example_table2 t2 where t1.id = t2.id PLAN FRAGMENT 0 PARTITION: UNPARTITIONED 4:EXCHANGE tuple ids: 0 1 PLAN FRAGMENT 1 PARTITION: RANDOM STREAM DATA SINK EXCHANGE ID: 4 UNPARTITIONED 2:HASH JOIN | join op: INNER JOIN (BROADCAST) | hash predicates: | t1.id = t2.id | tuple ids: 0 1 | |----3:EXCHANGE | tuple ids: 1 | 0:SCAN HDFS table=default.example_table1 #partitions=1 size=265B tuple ids: 0 PLAN FRAGMENT 2 PARTITION: RANDOM STREAM DATA SINK EXCHANGE ID: 3 UNPARTITIONED 1:SCAN HDFS table=default.example_table2 #partitions=1 size=54B compact tuple ids: 1
2つのテーブルをSCANして、HASH JOINしてといった処理が見える。資料を見ると、JOINには「broadcast joins」と「partitioned joins」があるらしい。
Hint文を使えばどちらのJOINを使うか指定できるらしい。たとえば下記のように[shuffle]を指定すればpartitioned joinsが、[broadcast]を指定すればbroadcast joinsが使われる。
> explain select * from example_table1 t1 join [shuffle] example_table2 t2 on t1.id = t2.id;
両JOINの詳細はClouderaの下記ページに書いてある。broadcastの方がデフォルトで、partitionedの方は同じくらいのサイズの大きいテーブルをJOINする時に使うのだとか。
この手のデータでJOINは使いたくないものではないので、クエリを工夫よりデータの形式をうまいことやる方向で頑張りたい。
Impalaは今のところHiveと同じmetadata storeを使っているそうなので、refreshするだけでHiveのデータにアクセスできる。
試しにHiveでテーブルを作ってデータを少し入れてみる。
$ hive // show tablesするとimpalaで作ったテーブルが見える hive> show tables; example_table1 example_table2 // 適当にテーブルを作ってデータを入れる hive> create table example_table3 ( id Int, first_name String, last_name String, age Int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; hive> load data local inpath '/tmp/test.csv' into table example_table3; ※ /tmp/test.csvには下記のような内容のCSVを置いておいた 1,william,shakespeare,52 2,wolfgang,mozart,35 hive> exit;
これでHiveにデータができたので、impalaに戻ってみる。
$ impala-shell // テーブルを確認 > show tables; +----------------+ | name | +----------------+ | example_table1 | | example_table2 | +----------------+
refreshする前なので、Hiveで作ったexample_table3はいない。
> refresh; > show tables; +----------------+ | name | +----------------+ | example_table1 | | example_table2 | | example_table3 | +----------------+
refreshすると見えるようになった。もちろんデータもSELECTできる。Hive側でデータを追加した際も、やっぱりrefreshしないと増えた分のデータは見えない。
まずはCREATE DATABASEでもしてみる。何もしてないとdefaultという名前のdatabaseが使われている。
> create database db1; > show databases; +---------+ | name | +---------+ | db1 | | default | +---------+
show tablesとかも使える。
> show tables; +----------------+ | name | +----------------+ | example_table1 | | example_table2 | +----------------+
使うDBを変える時はuse。MySQL使ってるのと同じ気持ちでコマンドを打つとたいてい通る。
> use db1;
テーブルの名前を変えてみる。default.example_table1をdb1.table1にしてみる。
> alter table default.example_table1 rename to db1.table1; > show tables; +--------+ | name | +--------+ | table1 | +--------+
DBまたいでも通った。