Amazon EMR の Avro フォーマットのデータを Amazon Redshift にロードする

2015.08.13

先日、Amazon Redshift で Avro フォーマットのデータをロードできるように COPY コマンドが拡張されました。Avro といえば、Hadoop のシリアライズフォーマットの一つです。Avro ファイルを実際に作成したり、Pig から Avro ファイルをエクスポート〜 Redshift に投入する方法をご紹介したいと思います。

Amazon RedshiftでAvroフォーマットのデータロードをサポートしました

新たに COPY 文で FORMAT が追加になり、このオプションとして AVRO が指定できるようになっています。

COPY

COPY table_name
[ column_list ]
FROM data_source
[ WITH ] CREDENTIALS [AS] 'aws_access_credentials'
[ [ FORMAT ] [ AS ] data_format ]
[ [ parameter [ argument ] [, ... ] ]

※ 執筆時点(2015/08)では、日本語のサイトでは反映されていませんので、英語サイトをご覧下さい。

Apache Avro とは

Avro は,Thrift や Protocol Buffers と同じく、データ交換のプロトコルおよびフレームワークで,サーバ間でのデータのやりとりなどに利用します。Hadoop の生みの親である Doug Cutting 氏により開発が始まり,Apache のプロジェクトの一つとして開発が進められています。よりコンパクトでシリアライズ/デシリアライズが高速なことが特長です。

Avro ファイルを作成してみる

実際に Avro ファイルの作成してみます。Avroのスキーマファイル(users.avsc)は以下の様な JSON 形式で定義します。

{
"name": "users",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "guid", "type": "string"},
{"name": "name", "type": "string"},
{"name": "address", "type": "string"}
]}

avro(内部的にはjackson)を利用して、2レコードしか無いデータファイル(users.avro)を作成してみます。

public class AvroUtil {
    public static void main(String... args) throws Exception {
        // Schema Definition
        Schema schema = new Schema.Parser().parse(new File("data/users.avsc"));

        // User Records
        GenericRecord user1 = new GenericData.Record(schema);
        user1.put("id", 1);
        user1.put("guid", "100");
        user1.put("name", "Alice");
        user1.put("address", "Sapporo, Hokkaido");
        GenericRecord user2 = new GenericData.Record(schema);
        user2.put("id", 2);
        user2.put("guid", "200");
        user2.put("name", "Bob");
        user2.put("address", "Musashino, Tokyo");
        
        // Serialize
        File file = new File("data/users.avro");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
        dataFileWriter.create(schema, file);
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.close();
    }
}

イメージを掴んでもらうために、生成された users.avro ファイルをバイナリエディタで開いてみます。

users-avro-sample-dump

ファイルの最初の方にデータの定義があり、その後にデータが連続して登録されています。jsonやxmlのように定義名が何度も繰り返すような冗長さもなく、ファイルの最初を読み込んだ段階でデータ構造が理解できる「マシン・リーダブル」なフォーマットなので、コンパクトかつ高速になる理由が理解できたと思います。

RedshiftへAvroファイルのロード

テーブルの定義

まず、ロードするテーブルの定義します。

DROP table users;
CREATE table users (
  id int,
  guid varchar(16),
  name varchar(64),
  address varchar(128)
);

Avroファイルのロード(フォーマット指定)

フォーマットファイルは、先ほど.avroファイルのスキーマ指定に使った users.avsc です。このフォーマットに従い、データをロードします。このファイルで個別に調整する事も可能です。

-- TRUNCATE users;
COPY users
FROM 's3://cm-avro/users.avro'
CREDENTIALS 'aws_access_key_id=xxx;aws_secret_access_key=yyy'
FORMAT AVRO 's3://cm-avro/users.avsc';
INFO:  Load into table 'users' completed, 2 record(s) loaded successfully.
COPY
SELECT * FROM users;
 id | guid | name  |      address
----+------+-------+-------------------
  1 | 100  | Alice | Sapporo, Hokkaido
  2 | 200  | Bob   | Musashino, Tokyo

Avroファイルのロード(フォーマットを自動認識)

フォーマットファイルに、 auto と指定すると自動認識になります。先ほど、users.avro をバイナリエディタで見た時に、スキーマ情報も含まれている Avro のスキーマと Redshift の列名を自動でマッチングして COPY するようになっています。

-- TRUNCATE users;
COPY users
FROM 's3://cm-avro/users.avro'
CREDENTIALS 'aws_access_key_id=xxx;aws_secret_access_key=yyy'
FORMAT AVRO 'auto'
;
SELECT * FROM users;
INFO:  Load into table 'users' completed, 2 record(s) loaded successfully.
COPY
cmawsteamdb=> SELECT * FROM users;
 id | guid | name  |      address
----+------+-------+-------------------
  1 | 100  | Alice | Sapporo, Hokkaido
  2 | 200  | Bob   | Musashino, Tokyo
(2 rows)

EMR の Avro データを Redshift にロードする

では、実際のユースケースでよくある EMR から Avro データ をエクスポートして、Redshiftにロードしてみたいと思います。

Pig の準備

grunt シェルを起動して、必要なストレージハンドラを起動するために必要な JAR を登録します。

$ pig

grunt> REGISTER /usr/lib/hadoop/lib/avro-1.7.4.jar; 
grunt> REGISTER /usr/lib/pig/lib/piggybank.jar;
grunt> REGISTER /usr/lib/hadoop/lib/jackson-mapper-asl-1.9.13.jar; 
grunt> REGISTER /usr/lib/hadoop/lib/jackson-core-asl-1.9.13.jar; 
grunt> REGISTER /usr/lib/pig/lib/json-simple-1.1.jar;

※ 上記のJARファイルは、執筆時点(2015/08)の最新環境(Amazon EMR 4.0.0)となりますので、適宜変更してください。

Pig にデータのロード

EMR(Hadoop) 上の Pig にデータをロードします。Pig には先ほどと同じように JSON 形式のスキーマ(top_nhl_scorers.avsc)を利用して、テキストファイル(top_nhl_scorers.txt)をロードします。

スキーマ(top_nhl_scorers.avsc)

{"namespace": "top_nhl_scorers.avro",
"type": "record",
"name": "Names",
"fields": [
{"name": "name", "type": "string"},
{"name": "team", "type": "string"},
{"name": "games_played", "type": "int"},
{"name": "points", "type": "int"}
]
}

テキストファイル(top_nhl_scorers.txt)

Gordie Howe Detroit Red Wings 1767 1850
Jean Beliveau Montreal Canadiens 1125 1219
Alex Delvecchio Detroit Red Wings 1969 1281
Bobby Hull Chicago Black Hawks 1063 1170
Norm Ullman Toronto Maple Leafs 1410 1229
Stan Mikita Chicago Black Hawks 1394 1467
Johnny Bucyk Boston Bruins 556 1369
Frank Mahovlich Montreal Canadiens 1973 1103
Henri Richard Montreal Canadiens 1256 1046
Phil Esposito Boston Bruins 717 1590

データファイルのロード

data = LOAD 's3://cm-avro/hockey_stats/input/top_nhl_scorers.txt'
USING PigStorage('\t')
AS (name:chararray,team:chararray,games_played:int,points:int);

Pig から S3 に Avro ファイルをストア

STORE data INTO 's3://cm-avro/avro/output/'
USING org.apache.pig.piggybank.storage.avro.AvroStorage('schema_file','s3://cm-avro/hockey_stats/schemas/top_nhl_scorers.avsc');

生成された part-m-00000.avro ファイルをバイナリエディタで開いてみます。

part-m-00000-avro-sample-dump

テーブルの定義

まず、ロードするテーブルの定義します。

CREATE TABLE top_nhl_scorers (
  name varchar(32),
  team varchar(32),
  games_played int,
  points int
);

Avroファイルのロード

先ほど、part-m-00000.avro をバイナリエディタで見た時に、ファイルの最初の方にスキーマ情報が含まれていましたので、フォーマットは自動認識 'auto' でロードします。

-- TRUNCATE top_nhl_scorers;
COPY top_nhl_scorers
FROM 's3://cm-avro/avro/output/part-m-00000.avro'
CREDENTIALS 'aws_access_key_id=xxx;aws_secret_access_key=yyy'
FORMAT AVRO 'auto'
;
INFO:  Load into table 'top_nhl_scorers' completed, 10 record(s) loaded successfully.
COPY

SELECT * FROM top_nhl_scorers;
      name       |        team         | games_played | points
-----------------+---------------------+--------------+--------
 Jean Beliveau   | Montreal Canadiens  |         1125 |   1219
 Stan Mikita     | Chicago Black Hawks |         1394 |   1467
 Phil Esposito   | Boston Bruins       |          717 |   1590
 Alex Delvecchio | Detroit Red Wings   |         1969 |   1281
 Johnny Bucyk    | Boston Bruins       |          556 |   1369
 Bobby Hull      | Chicago Black Hawks |         1063 |   1170
 Frank Mahovlich | Montreal Canadiens  |         1973 |   1103
 Gordie Howe     | Detroit Red Wings   |         1767 |   1850
 Norm Ullman     | Toronto Maple Leafs |         1410 |   1229
 Henri Richard   | Montreal Canadiens  |         1256 |   1046
(10 rows)

最後に

今回は、EMR/Pigによる例をご紹介しました。Avroファイルを介してRedshiftにデータがロード出来るようになりますので、これまで以上に選択肢の幅が広がりました。非定型データをHadoop(EMR)で処理して、定型化したデータをAvroファイルで出力、Redshiftにロードして、集計分析といった流れをイメージして、この新機能をご活用していただけたらと思います。