GlueのクローラでS3のCSVデータを検出し、Athenaでクエリを実行する
AWS GlueはETLのフルマネージドサービスです。Glueを構成する一つの要素にクローラ(Crawler)があります。これまでよくわからないけど自動でデータカタログを作成してくれて便利そうという印象がありつつも、なかなか触れていせんでした。
今回はGlueのクローラを使って、S3上のCSVデータをクロールし、検出したテーブルにAthenaでクエリを実行してみました。
AWS Glue クローラ
クローラは指定したデータストアを調べて、データカタログに登録してくれます。定期実行をする事で、スキーマやパーティションの定期的な自動更新も可能です。
- クローラは分類子を用いてデータストアにあるデータのスキーマを推論/検出します。
- 指定された順序でカスタム分類子を使用し、その後組み込みの分類子が使用されます。
- 分類子が推論したスキーマが100%確実だと判定した場合は、次に使用されるはずだったカスタム分類子もしくは組み込みの分類子は使用されません。
- 分類子: データのスキーマを推論もしくは検出します。そのスキーマがどれほど確実かを示す確信度数(certainty)も返します。
- 指定された順序でカスタム分類子を使用し、その後組み込みの分類子が使用されます。
- スキーマなどのメタデータをGlueのデータカタログ(Data Catalog)に書き込みます。
- メタデータはテーブル単位で保存され、テーブルはGlueのデータカタログ上の1つのデータベースに属します。
- メタデータにはテーブル定義に関するものだけでなく、分類子による情報なども含まれます。
Populating the AWS Glue Data Catalog - AWS Glueより
対象データストア
ネイティブで対応
- Amazon S3
- Amazon DynamoDB
JDBC接続を使う事で対応
- Amazon Redshift
- Amazon Aurora
- MariaDB
- Microsoft SQL Server
- MySQL
- Oracle
- PostgreSQL
やってみる
クローラの理解を深めるため、次のことを実際にやってみます。
- S3にデータを入れる
- クローラを作成する
- クローラを走らせる
- Athenaでクエリを実行する
S3にデータを入れる
まずはクローラで検出させるCSVデータを作成します。 PythonでIrisデータとランダムな数値データを作成し、それぞれCSV形式でS3にアップロードします。 ランダムな数値データの方は0-9までのidでパーティション化します。
import pandas as pd import numpy as np from sklearn import datasets # irisデータセットのcsvを格納 df = pd.DataFrame(iris.data, columns=iris.feature_names) df.to_csv('s3://bucket/glue-test/iris/data.csv', index=False) # ランダムな数値データをパーティションで区切りながら格納 for i in range(10): df = pd.DataFrame( np.random.random((1000, 4)), columns=('a', 'b', 'c', 'd') ) df.to_csv(f's3://bucket/glue-test/random_data/id={i}/data.csv', index=False)
AWS CLIを使ってs3 ls
で確認してみると、11個のオブジェクトが配置されているのがわかります。
aws s3 ls s3://bucket/glue-test/ --recursive
クローラを作成する
マネジメントコンソールに移り、クローラを作成します。 特に引っかかりようがないところは飛ばしながら、作成の様子を見ていきます。
クローラ作成の最初の画面では、オプションとしてタグや暗号化に関する設定、カスタム分類子の設定が可能です。カスタム分類子を事前に作成してある場合は左下に一覧として並び、クローラに使用する分類子を選択します。今回はカスタム分類子は使用しないので、そのまま進めます。
クロール対象のデータストアを選択します。 今回はS3なので、対象のS3パスを指定します。また、オプションとして、対象内のS3パスの中でもクロール対象外としたいパスの指定も可能です。
クロール対象のデータストアを追加することができます。今回は1つのS3パスのみなので、追加しません。
クロールに使用するIAMロールを選択もしくは作成します。
そのまま進めていくと、クローラの実行頻度を選択する画面が出てきます。
今回は都度実行なのでRun on demand
を選択しますが、他にも毎時や日次、曜日指定、週次、月次、カスタムが選択できます。
カスタムの場合だと、cron式で実行タイミングの設定が可能です。
クローラで検出したテーブル情報を格納するデータベースやテーブル名の接頭辞を指定します。また、いくつかオプション設定があるため順に紹介します。
クロール先として設定したS3パスごとに1つのテーブルしか検出しないかどうかを設定できます。 デフォルトでは、S3パスごとにスキーマやファイルフォーマットなどいくつかの類似性を検証し、複数のテーブルを検出を行います。このオプションを有効化する事で、カラムなどスキーマに違いがあるけど同一テーブルとして扱いたいといった場合に便利です。
クロールによってスキーマの変化を検出した場合の挙動を設定できます。設定内容は次の三種類です。
- スキーマの変化を検出した場合にデータカタログのメタ情報をどうするか
- テーブル定義を更新する
- カラムの追加時のみテーブル定義を更新する
- テーブル定義を更新しない
- パーティションを更新するかどうか
- テーブルのデータストアからオブジェクトが削除された場合にどうするか
- データカタログからテーブルとパーティションを削除する
- 削除されてもそのまま何もしない
- そのテーブルを廃止(deprecated)としてマークする
- 廃止としてマークされると、そのテーブルを対象としたGlueのETLジョブは失敗する可能性があります
最後にクローラの設定情報を確認して、クローラを作成します。
クローラを走らせる
クローラが作成できたら、クローラの一覧画面に移動して、作成したクローラを選択し走らせてみます。
今回の場合だと、1分弱程度でクロールが完了しました。
クロールによって意図した通り、2つのテーブルを検出できました。
random_data
テーブルの情報を見てみます。
カラムbだけdouble、それ以外のカラムはstringとして型が検出されています。
Athenaでクエリを実行する
S3をデータストアとしたテーブルはAthenaからも参照できます。Athenaでクエリを実行してみます。
irisテーブルからselectしてみます。
select * from iris limit 10
random_dataテーブルからもselectしてみます。
select * from random_data limit 10
HIVE_PARTITION_SCHEMA_MISMATCH
クローラの作成時にUpdate all new and existing partitions with metadata from the table.
のチェックを外したままにした場合、random_table
テーブルでselectしてみるとHIVE_PARTITION_SCHEMA_MISMATCH
エラーが発生しました。
次のStack Overflowにある通り、クローラのUpdate all new and existing partitions with metadata from the table.
を有効化して、テーブルを削除してクロールし直す事で、エラーが発生しなくなりました。
さいごに
GluwのクローラについてとクローラでS3上のテーブルを検出する方法について紹介しました。型をcastする前提など型が基本stringとして格納されていれば問題ないケースでは非常に便利そうです。また、型がしっかり値にマッチしたスキーマが必要な場合でもカスタム分類子を定義する事で、クローラを用いることができそうです。