概要

Impalaさんと親しくなろうという事で(この子を仕事で使う機会があるかは知らない)、とりあえずimpala-shellから「あれやってこれやって」と命令を発してみる。

@CretedDate 2013/10/24
@Versions CDH4.3

導入

Clouderaが公開しているVMを使って導入した。詳細は下記参照。

VirtualBoxでCloudera QuickStart VM(CDH4)を動かしてみる

CSVからテーブルを作ってみる

適当なCSVファイルをHDFSに置いて、そこからテーブルを作ってSELECTしてみる。CSVの内容は下記みたいな感じにしておく。

1,文字列1,true,101.2,2013-10-24 10:20:05
2,文字列2,false,53.15,2013-10-25 10:20:05
3,文字列3,true,-18.5,2013-10-26 10:20:05
4,文字列4,false,0,2013-10-27 10:20:05
5,文字列5,true,-12.35,2013-10-28 10:20:05

test1.csvとでも名付けて、HDFSにあげて、CREATE TABLEする。

// 適当なディレクトリ作って
$ hadoop fs -mkdir /user/cloudera/tmp

// ファイルを上げて
$ hadoop fs -put test1.csv /user/cloudera/tmp/

// ちゃんと上がったか確認して
$ hadoop fs -ls /user/cloudera/tmp/

// impala-shellに入って
$ impala-shell

// LOCATIONや区切り文字を指定しつつCREATE文を投げる
CREATE EXTERNAL TABLE example_table1 (
   id INT, str_column STRING, bool_column BOOLEAN, double_column DOUBLE, date_column TIMESTAMP
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/cloudera/tmp/';

LOCATION(CSVファイルを置いた場所)はディレクトリで指定する。指定されたディレクトリ配下は全部データとして見られる。

EXTERNAL TABLEはCSVなどの外部ファイルから作成する時に利用する。外部ファイルが変わったら、 refreshコマンドを打つと反映される。

んじゃ、SELECTしてみましょか。

> SELECT * FROM example_table1 WHERE id = 3;

+----+------------+-------------+---------------+---------------------+
| id | str_column | bool_column | double_column | date_column         |
+----+------------+-------------+---------------+---------------------+
| 3  | 文字列3    | true        | -18.5         | 2013-10-26 10:20:05 |
+----+------------+-------------+---------------+---------------------+
Returned 1 row(s) in 0.22s

0.22secで結果が返ってきた。1台構成で5行しかレコードがない状態。

データ型は、boolean, tinyint, smallint, int, bigint, float, double, timestamp, stringが設定できるらしい。

INSERTしてみる

テーブルに対してINSERTも実行できる。さっそくクエリ打ってみようか。

> insert into example_tables1 values( 6, '文字列6', false, -10.31, '2013-11-01 03:50:35' );

ERROR: Failed to open HDFS file for writing: hdfs://localhost.localdomain:8020/user/cloudera/tmp/...

怒られた。impalaのコマンドはおそらくimpalaユーザで行われているので、writeの権限がないようだ。面倒な時、人は777と言う。

$ hadoop fs -chmod 777 /user/cloudera/tmp

これでもう1回INSERTコマンドを打ったら、今度は通った。全部SELECTしてみる。

> SELECT * FROM example_table1;

+----+------------+-------------+---------------+---------------------+
| id | str_column | bool_column | double_column | date_column         |
+----+------------+-------------+---------------+---------------------+
| 6  | 文字列6    | false       | -10.31        | 2013-11-01 03:50:35 |
| 1  | 文字列1    | true        | 101.2         | 2013-10-24 10:20:05 |
| 2  | 文字列2    | false       | 53.15         | 2013-10-25 10:20:05 |
| 3  | 文字列3    | true        | -18.5         | 2013-10-26 10:20:05 |
| 4  | 文字列4    | false       | 0             | 2013-10-27 10:20:05 |
| 5  | 文字列5    | true        | -12.35        | 2013-10-28 10:20:05 |
+----+------------+-------------+---------------+---------------------+

INSERTしたのが一番上にいますね。

SELECTしてみる

普通にWHERE句を使ったり、AVGやSUM、JOINなども使える。

とりあえずCOUNT、SUM、AVGあたりを使ってみる。

> select count(double_column), sum(double_column), avg(double_column) from example_table1;

+----------------------+--------------------+--------------------+
| count(double_column) | sum(double_column) | avg(double_column) |
+----------------------+--------------------+--------------------+
| 6                    | 113.19             | 18.865             |
+----------------------+--------------------+--------------------+

普通のSQLと同じ感覚で使える。SQLは世界共通言語だからね。使いやすくていいね。

WHERE句、ORDER BY、LIMITなども使ってみる。尚、ORDER BYはLIMITと合わせないと使えないらしい。

> select * from example_table1 where bool_column = false order by id limit 100;

+----+------------+-------------+---------------+---------------------+
| id | str_column | bool_column | double_column | date_column         |
+----+------------+-------------+---------------+---------------------+
| 2  | 文字列2    | false       | 53.15         | 2013-10-25 10:20:05 |
| 4  | 文字列4    | false       | 0             | 2013-10-27 10:20:05 |
| 6  | 文字列6    | false       | -10.31        | 2013-11-01 03:50:35 |
+----+------------+-------------+---------------+---------------------+

BETWEENとかも普通に使える。

> select * from example_table1 where date_column between '2013-10-27' and '2013-10-29';

+----+------------+-------------+---------------+---------------------+
| id | str_column | bool_column | double_column | date_column         |
+----+------------+-------------+---------------+---------------------+
| 4  | 文字列4    | false       | 0             | 2013-10-27 10:20:05 |
| 5  | 文字列5    | true        | -12.35        | 2013-10-28 10:20:05 |
+----+------------+-------------+---------------+---------------------+

JOINしてみる

次はJOINいってみよう。まずは別テーブルを1つ作ってデータも入れておく。

> CREATE EXTERNAL TABLE example_table2 ( id INT, str_column STRING );
> INSERT INTO example_table2 VALUES( 1, 'JOIN 文字列1' );
> INSERT INTO example_table2 VALUES( 3, 'JOIN 文字列3' );

今作ったテーブルと前に作ったテーブルとでJOINしてみる。

> select * from example_table1 t1, example_table2 t2 where t1.id = t2.id;

+----+------------+-------------+---------------+---------------------+----+--------------+
| id | str_column | bool_column | double_column | date_column         | id | str_column   |
+----+------------+-------------+---------------+---------------------+----+--------------+
| 1  | 文字列1    | true        | 101.2         | 2013-10-24 10:20:05 | 1  | JOIN 文字列1 |
| 3  | 文字列3    | true        | -18.5         | 2013-10-26 10:20:05 | 3  | JOIN 文字列3 |
| 5  | 文字列5    | true        | -12.35        | 2013-10-28 10:20:05 | 5  | JOIN 文字列5 |
+----+------------+-------------+---------------+---------------------+----+--------------+

LEFT JOINとかもばっちり。

> select * from example_table1 t1 left join example_table2 t2 on t1.id = t2.id ;

+----+------------+-------------+---------------+---------------------+------+--------------+
| id | str_column | bool_column | double_column | date_column         | id   | str_column   |
+----+------------+-------------+---------------+---------------------+------+--------------+
| 6  | 文字列6    | false       | -10.31        | 2013-11-01 03:50:35 | NULL | NULL         |
| 1  | 文字列1    | true        | 101.2         | 2013-10-24 10:20:05 | 1    | JOIN 文字列1 |
| 2  | 文字列2    | false       | 53.15         | 2013-10-25 10:20:05 | NULL | NULL         |
| 3  | 文字列3    | true        | -18.5         | 2013-10-26 10:20:05 | 3    | JOIN 文字列3 |
| 4  | 文字列4    | false       | 0             | 2013-10-27 10:20:05 | NULL | NULL         |
| 5  | 文字列5    | true        | -12.35        | 2013-10-28 10:20:05 | 5    | JOIN 文字列5 |
+----+------------+-------------+---------------+---------------------+------+--------------+

EXPLAINで実行計画を見てみる

JOINとかAVGとか打つと、裏でどうやってるんだろうと気になってしまうもの。そんな時はEXPLAINを打てば良いらしい。

単純なWHEREでidを指定するSQLをexplainしてみる。

> explain select * from example_table1 where id = 1;

Explain query: select * from example_table1 where id = 1
PLAN FRAGMENT 0
  PARTITION: UNPARTITIONED

  1:EXCHANGE
     tuple ids: 0 

PLAN FRAGMENT 1
  PARTITION: RANDOM

  STREAM DATA SINK
    EXCHANGE ID: 1
    UNPARTITIONED

  0:SCAN HDFS
     table=default.example_table1 #partitions=1 size=265B
     predicates: id = 1
     tuple ids: 0 

見方がわからない・・・。とりあえずHDFSからデータを読んで、coordinatorでまとめてといったことをしているはず。たぶん。

COUNTとかSUMとかAVGとかをしているクエリもexplainしてみる。

> explain select count(double_column), sum(double_column), avg(double_column) from example_table1;

Explain query: select count(double_column), sum(double_column), avg(double_column) from example_table1
PLAN FRAGMENT 0
  PARTITION: UNPARTITIONED

  3:AGGREGATE
  |  output: SUM(<slot 1>), SUM(<slot 2>)
  |  group by: 
  |  tuple ids: 1 
  |  
  2:EXCHANGE
     tuple ids: 1 

PLAN FRAGMENT 1
  PARTITION: RANDOM

  STREAM DATA SINK
    EXCHANGE ID: 2
    UNPARTITIONED

  1:AGGREGATE
  |  output: COUNT(double_column), SUM(double_column)
  |  group by: 
  |  tuple ids: 1 
  |  
  0:SCAN HDFS
     table=default.example_table1 #partitions=1 size=265B
     tuple ids: 0 

SCANしてAGGREGATEしますよ、と。多分、下から順に0, 1, 2, 3と処理をしていく。各nodeでaggregateして、最後にcoodinator nodeでaggregateするとかそんな感じだろうか。

JOINについても見てみよう。

> explain select * from example_table1 t1, example_table2 t2 where t1.id = t2.id;
Explain query: select * from example_table1 t1, example_table2 t2 where t1.id = t2.id
PLAN FRAGMENT 0
  PARTITION: UNPARTITIONED

  4:EXCHANGE
     tuple ids: 0 1 

PLAN FRAGMENT 1
  PARTITION: RANDOM

  STREAM DATA SINK
    EXCHANGE ID: 4
    UNPARTITIONED

  2:HASH JOIN
  |  join op: INNER JOIN (BROADCAST)
  |  hash predicates:
  |    t1.id = t2.id
  |  tuple ids: 0 1 
  |  
  |----3:EXCHANGE
  |       tuple ids: 1 
  |    
  0:SCAN HDFS
     table=default.example_table1 #partitions=1 size=265B
     tuple ids: 0 

PLAN FRAGMENT 2
  PARTITION: RANDOM

  STREAM DATA SINK
    EXCHANGE ID: 3
    UNPARTITIONED

  1:SCAN HDFS
     table=default.example_table2 #partitions=1 size=54B compact
     tuple ids: 1 

2つのテーブルをSCANして、HASH JOINしてといった処理が見える。資料を見ると、JOINには「broadcast joins」と「partitioned joins」があるらしい。

Hint文を使えばどちらのJOINを使うか指定できるらしい。たとえば下記のように[shuffle]を指定すればpartitioned joinsが、[broadcast]を指定すればbroadcast joinsが使われる。

>  explain select * from example_table1 t1 join [shuffle] example_table2 t2 on t1.id = t2.id;

両JOINの詳細はClouderaの下記ページに書いてある。broadcastの方がデフォルトで、partitionedの方は同じくらいのサイズの大きいテーブルをJOINする時に使うのだとか。

http://www.cloudera.com/content/cloudera-content/cloudera-docs/Impala/latest/Installing-and-Using-Impala/ciiu_performance.html#perf_joins_unique_1

この手のデータでJOINは使いたくないものではないので、クエリを工夫よりデータの形式をうまいことやる方向で頑張りたい。

Hiveのテーブルを使ってみる

Impalaは今のところHiveと同じmetadata storeを使っているそうなので、refreshするだけでHiveのデータにアクセスできる。

試しにHiveでテーブルを作ってデータを少し入れてみる。

$ hive

// show tablesするとimpalaで作ったテーブルが見える
hive> show tables;

example_table1
example_table2

// 適当にテーブルを作ってデータを入れる
hive> create table example_table3 ( id Int, first_name String, last_name String, age Int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
hive> load data local inpath '/tmp/test.csv' into table example_table3;

※ /tmp/test.csvには下記のような内容のCSVを置いておいた
1,william,shakespeare,52
2,wolfgang,mozart,35

hive> exit;

これでHiveにデータができたので、impalaに戻ってみる。

$ impala-shell

// テーブルを確認
> show tables;

+----------------+
| name           |
+----------------+
| example_table1 |
| example_table2 |
+----------------+

refreshする前なので、Hiveで作ったexample_table3はいない。

> refresh;
> show tables;

+----------------+
| name           |
+----------------+
| example_table1 |
| example_table2 |
| example_table3 |
+----------------+

refreshすると見えるようになった。もちろんデータもSELECTできる。Hive側でデータを追加した際も、やっぱりrefreshしないと増えた分のデータは見えない。

ALTERでテーブルの情報を変えてみる

まずはCREATE DATABASEでもしてみる。何もしてないとdefaultという名前のdatabaseが使われている。

> create database db1;
> show databases;

+---------+
| name    |
+---------+
| db1     |
| default |
+---------+

show tablesとかも使える。

> show tables;

+----------------+
| name           |
+----------------+
| example_table1 |
| example_table2 |
+----------------+

使うDBを変える時はuse。MySQL使ってるのと同じ気持ちでコマンドを打つとたいてい通る。

> use db1;

テーブルの名前を変えてみる。default.example_table1をdb1.table1にしてみる。

> alter table default.example_table1 rename to db1.table1;
> show tables;

+--------+
| name   |
+--------+
| table1 |
+--------+

DBまたいでも通った。