Step FunctionでAWS Glueのジョブやクローラーを呼び出すワークフローをつくってみた!
おつかれさまです。
今回は、こちら のAWSブログを試してみました。
Step Functionsで
- AWS Glueのジョブを起動
- DynamoDBに保存されいているデータを全件取得
- S3に指定したフォーマットで保存
- Athenaで見れるようにデータカタログとビューの作成
というの一連の流れを制御します。
簡単な構成図
細かい解説は省きますが、
- Step Functionsでワークフローを制御
- Step Functionsから必要に応じて、AWS GlueやLambdaを呼び出す
という点を抑えて貰えればよいです。
やってみる(下準備)
CloudFormationスタックからリソースを作成①
- スタックの作成①
- Step Functions, Lambda, S3Bucket等の作成を行っています。
※デフォルトでリージョンがUS East (N. Virginia)
になっています。私は東京リージョンに変更しました。
- 詳細
論理ID | リソース | 説明 |
---|---|---|
DynamoDBExportsBucket | AWS::S3::Bucket | データのダンプ先としてのS3Bucket |
DynamoDBExportStateMachine | AWS::StepFunctions::StateMachine | ワークフローとして利用するStep Function |
LambdaRole | AWS::IAM::Role | Lambda用のロール |
EventTriggerRole | AWS::IAM::Role | CloudWatch Events用のロール |
GlueCrawlerAndJobRole | AWS::IAM::Role | クローラー用のロール |
StateExecutionRole | AWS::IAM::Role | Step Function用のロール |
StartGlueCrawler | AWS::Lambda::Function | クローラーを起動するLambda |
GetGlueCrawlerStatus | AWS::Lambda::Function | クローラーの状態を取得するLambda |
CreateCurrentView | AWS::Lambda::Function | AthenaでViewを作成するLambda |
GetCurrentViewStatus | AWS::Lambda::Function | AthenaでView作成の実行結果を確認するLambda |
CloudFormationスタックからリソースを作成②
- スタックの作成②
- AWS GlueのJobやCrawlerを作成
- 詳細
論理ID | リソース | 説明 |
---|---|---|
ExportConverterGlueJob | AWS::Glue::Job | DynamoDBのテーブルから取得したデータをS3に保存するジョブ |
GlueCrawler | AWS::Glue::Crawler | S3に保存したファイルからデータカタログを作成するクローラー |
Trigger | AWS::Events::Rule | ワークフローをキックするためのスケジューラ |
- Glue Jobについて補足
コードはリンク先から自動で取得されます。
処理はDynamoDBからScanで全件取得し、S3にsnapshot_timestamp
でパーティション切ってparquet形式で保存という流れです。
DynamoDBのテーブルを手動で作成
- 補足
- テーブル名はさきほどデプロイしたCloudFormationのパラメーターと合わせる
- プロビジョンドモードを選択(Glue Job側の読み込みがオンデマンド対応していないので)
- テーブル作成後は適当にレコードを作成
- 例)
{"id": "001", name: "seiichi" }
ワークフローを実行してみる
- ワークフロー図
Step Functions実行する入力値を取得
まず、実行に必要な入力値をCloudWatch Eventsからコピーしてきます。 ちなみに、スケジューラは無効になっています。
- 入力値
{ "glue_job_name": "sample-table-for-exportExportToparquet", "output_prefix": "s3://dynamodb-exports-0123456789012-ap-northeast-1/parquet/sample-table-for-export", "table_name": "sample-table-for-export", "read_percentage": "0.25", "crawler_name": "sample-table-for-exportparquetCrawler", "output_format": "parquet" }
Step Functionsを手動で実行
Step Functionsの設定画面から実行します。
実行が開始されました。
そのうち実行が完了します。
S3のファイルを確認
完了後、S3にファイルが作成されているか確認してみます。
一応中身を確認しておきます。ほとんど空のデータですが、対象のレコードを見つけました。
結果をAthenaで可視化
ちなみにGlueのCrawlerでデータカタログを作成しているので、AthenaからSQLを実行できます。
AthenからSQLを実行する際は、
- S3バケット全体のデータを含むテーブル(過去にダンプしたデータも含む)
- 直近のダンプしたデータのみ含むView
のどちらを対象にするか選べます。
ワークフローのなかで、なぜViewを作っているのが疑問でしたが納得しました。
まとめ
いかがだったでしょうか?
個人的にはAWS Step Functionsで作るワークフローは自由度が高そうだなと思いました。
以上、どなたかの役に立てば幸いです。