Amazon Elastic MapReduceを使ってDynamo DBのデータをS3へ保存する

2013.06.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

DynamoDB to S3

この記事ではDynamoのデータをRedshiftに移行しています。
今回はコスト的な関係もあり、「DynamoからS3へデータを保存せよ」という指令を受けたため、その方法について調査していました。
また、この記事 にあるように、特定のリージョンではテーブルのexportができるようなのですが、せっかくなのでAmazon Elastic MapReduce(EMR)をつかって自分でS3へDynamoの内容を保存してみます。

環境構築方法

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.7.5
  • ruby : 1.8.7

AWSのアカウントについては登録済みであるとします。

Amazon EMRの概要について少し

ここでEMRやその他技術についてはけっこう解説しているので、ここでは簡単に解説しておきます。

Amazon EMRとは

Amazon Elastic MapReduce(Amazon EMR)は、莫大な量のデータを処理することができるようにするAmazonのウェブサービスです。
これは後述するHadoopを利用して実現しており、Hadoopは、EC2上に構築されます。EC2を立ててHadoopクラスタ構築をしなくても、EMRを使えばそこを全部AWSがやってくれるわけですね。
参考:Amazon EMR

MapReduceとは

MapReduceとは、Googleのバックエンドで利用されている並列計算システムで、大規模なデータを分散処理するためのプログラミングモデルです。
巨大なデータセットに対してクラスターを用いて並列処理させることができます。
MapReduceは、データの処理を「map」と「reduce」に分けて行います。mapは、分割されたデータに処理を行い、必要な情報を抽出します。
reduceでは、mapで抽出した情報をまとめて、データ全体についての整理された処理結果を得ることができます。 巨大データを分割してリニアに処理し、その後結果をまとめることができるということです。
MapReduceについてはこのへんでも。

Hadoopとは

大規模データの分散処理を行うためのJavaフレームワークで、MapReduceとHDFSで構成されています。
HDFSは、MapReduceで処理するデータを扱う分散ストレージで、複数のマシンを1つのストレージとして扱うことができます。
ようするにMapReduceのJava実装がHadoopですね。 詳しくはこのへん を。

Hiveとは

Hive(ハイヴ)は、Hadoop上のデータをSQLに似た命令で操作できるツールです。Hive上で動くSQLライクな言語を「HiveQL」といいます。
HiveQLでの操作はMapReduceのラッパーになっているようで、例えばHiveQLでSELECT文を実行すると、
その裏ではMapReduceのタスクが実行されて結果を取得することができます。
そして、Hiveについてはこのへんを。

DynamoデータをS3に保存するチュートリアル

では、Dynamoにテーブルをつくってデータを登録し、その内容をS3にTSV形式で出力してみましょう。

S3にバケットを用意

最終的にDynamoのデータを出力するためのバケットをS3に用意しておきます。
AWSコンソールでもなんでもいいので、適当なバケットをS3に作成しておきましょう。
とりあえず「my-emr-bucket」としておきます。バケットは各自用意した名前に置き換えてください。

Dynamoにテーブルを用意

まずはAWSコンソールを使用し、data_to_s3というテーブルをDynamo DBで作成しましょう。
Primary Keyはidという名前で、TypeはHash,Number型で作成します。
テーブルを作成したら、適当なデータを登録します。属性はname(String型)とcreateDate(String型)とでもしておきます。

Amazon EMR コマンドラインインターフェイスのインストール

コンソールからEMRの操作を行うためAmazon EMR CLIのインストールを行います。
これはrubyで動くEMR用クライアントで、ここからダウンロードできます。

ダウンロードしたファイルを解凍し、その中credentials.jsonを下記内容で作成しましょう。
EMRにアクセスするため、アクセスキーやシークレットキー、キーファイルの設定を行います。また、ログの出力先も、先ほど作成したバケット下を指定しておきます。

{
"access_id": "<アクセスキー>",
"private_key": "シークレットキー",
"keypair": "key pair名",
"key-pair-file": "pemファイルのパス",
"log_uri": "s3n://<your bucket>/logs",
"region": "ap-northeast-1"
}

credentials.jsonが作成できたら、ためしにemrのコマンドを実行してみましょう。
下記のようにバージョンが表示されれば準備完了です。

%./elastic-mapreduce --version                                                   
Version 2013-03-19

なお、私の環境では、ruby2.0では動作しなかったので、ruby1.8を使用しました。

HiveQLを記述したファイルをS3にアップ

クライアントの準備ができたので、EMR実行時に実行するHiveQLを作成してS3にアップします。
下記内容をexportS3.qという名前で、先ほど作成したS3バケットへアップロードしておきましょう。

SET dynamodb.endpoint=dynamodb.ap-northeast-1.amazonaws.com;

CREATE EXTERNAL TABLE emr_table (id string, name string, createDate string) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "data_to_s3", "dynamodb.column.mapping" = "id:id,name:name,createDate:createDate");

INSERT OVERWRITE DIRECTORY 's3://<your bucket>/fromHive/${DATE}'
SELECT * FROM emr_table;

exportS3.qの中では、リージョンを東京にセットし、emr_tableという名前でテーブルを作成しています。
そのテーブルにDynamoのdata_to_s3テーブルからデータを読み込んでいます。
カラム名は両テーブルとも同じにしてありますが、dynamodb.column.mappingを見てもわかるように、任意のカラムでマッピング可能になっています。

そして、INSERT OVERWRITE DIRECTORY〜から始まる文が、S3へデータを出力している部分です。
${DATE}となっている箇所は、コマンド実行時に渡されるパラメータによって変動する値です。
後述しますが、日時(yyyymmdd形式)をパラメータとして渡すので、その日付のパスにデータが出力されるようになります。

シェル作成

最後に、上記HiveQLを実行するジョブフローを作成して実行してみましょう。
executeJob.shという名前で次のような、elastic-mapreduceコマンドを実行するシェルを作成します。

#!/bin/sh
date=$(date '+%Y%m%d')
./elastic-mapreduce --create --name "exportS3-$date" --hive-script --arg s3://<your bucket>/exportS3.q --Log-uri s3://<your bucket>/logs/ --args -d,DATE=$date

executeJob.shでは日付を取得した後、ジョブフローを作成してEMR処理を実行しています。
「--args -d,DATE=$date」となっている箇所が、取得した日付をexportS3.qにパラメータとして渡している部分です。

実行

では、上記シェルを実行してみましょう。
シェル実行後、AWSコンソールで確認すると、ジョブフローが「exportS3-<日付>」という名前で実行されます。
処理が終了すると、作成したバケットの下にDynamoDB内容がTSV形式で出力されているはずです。

まとめ

今回はHiveをつかってDynamo DBの内容をS3に出力してみました。
これは単純な処理ですが、複数のジョブフローをつなげることで、大量のデータに対して複雑な処理を行うことも可能です。

参考サイトなど