AWS Glue の新しいジョブタイプ『Python Shell』を実際に試してみました
はじめに
新しいジョブタイプ『Python Shell』は、単にPythonスクリプトを実行する目的のジョブです。AWS Glueを使っている人であれば、このありがたみが身にしみて感じるはずです。
AWS Glue の Python Shell とは
Python Shellは、Glueジョブに追加されたジョブの種類の一つです。従来は、Apache Sparkのスクリプトのみでしたが、今回新たにPython Shellが追加されました。
従来のジョブタイプ「Spark」は、Sparkのクラスタを制御して大規模分散処理します。このジョブタイプは、ジョブを起動すると1つのDriverと複数のExecutorが起動され、スクリプトはDriverで実行されます。
新たに追加されたジョブタイプ「Python Shell」は、単にPythonスクリプトを実行する目的のジョブです。
Python Shell のスペックは
- 初期状態
- 20秒未満で起動
- ランタイム制限なし
- サイズ
- 1 DPU(16GBを含む)
- 1/16 DPU(1GBを含む)
- 価格
- DPU-hourあたり0.44ドル、最低1分間、1秒あたりの請求
これまでの解説を見るとPython Shellではなく、AWS Lambdaでいいのでは?と思うかもしれません。Python Shellは、従来のジョブと同様にトリガに組み込んだり、スケジュールやJob eventsで他のトリガと連携させて操作させることが可能です。また、データ分析用途のライブラリもビルトインでインストール済みです。
Python 2.7 環境で 以下のライブラリが利用できます。
- Boto3
- collections
- CSV
- gzip
- multiprocessing
- NumPy
- pandas
- pickle
- re
- SciPy
- sklearn
- sklearn.feature_extraction
- sklearn.preprocessing
- xml.etree.ElementTree
- zipfile
Python Shell のユースケース
- 小中規模のタスク向けETL実行環境
- SQLによるETL処理
- S3や3rdパーティサービスとの連携
- MLサービスからPython Shellの起動
GlueのジョブはPython(PySpark)やScalaで、Glue API や Apache Spark API を用いて、ETLのコードを作成します。実際のETLワークフローでは、Glueジョブだけではなく、S3のオブジェクトのハンドリング〜Glueクローラとの連携〜ジョブをまとめたトリガを起動〜パーティション更新〜SageMakerのジョブの実行など、いわゆるオーケストレーションが必要です。これらをオーケーストレーションするコードをPython Shellに作成したり、Python Shellと従来のジョブをTriggerのJob eventsで数珠つなぎにETLワークフローを組み上げていくという方法も考えられます。
Python Shell の例
従来のジョブで以下の処理を実装すると、RedshiftからすべてのレコードをS3にアンロードした後、Glueでフィルタリング、変換する必要がありました。
下記のPython Shell の例では、Redshiftの集計〜Glueのジョブ〜パーティションの追加といった一連の処理を、ETLパイプラインとしてまとめることができます。(以下はあくまでも例なので、プロダクションでは、エラーハンドリングしてください)
- Redshift上のテーブルを集計した結果をS3にCSV出力
- Glueのジョブ(CSVをParquetに変換)の実行
- パーティションを追加する
import boto3 import pg def get_connection(): rs_conn_string = "host=%s port=%s dbname=%s user=%s password=%s" % ( "cm-cluster.abcdefghij.us-east-1.redshift.amazonaws.com", "5439", "cm_db", "cm_user", "cm_password") rs_conn = pg.connect(dbname=rs_conn_string) rs_conn.query("set statement_timeout = 1200000") return rs_conn def query(con): statement = "unload ('select lo_orderdate,sum(lo_quantity) as lo_quantity, sum(lo_ordertotalprice) as lo_ordertotalprice from cm_test.lineorder group by 1 order by 1') to 's3://cm-bucket/lineorder_summary/' iam_role 'arn:aws:iam::1234567890123:role/cm-redshift-role' delimiter ',' header;" res = con.query(statement) return res # ELT in Redshift, then Unload S3 con = get_connection() res = query(con) print res # Glue Client glue = boto3.client(service_name="glue", region_name="us-east-1", endpoint_url="https://glue.ap-northeast-1.amazonaws.com") # Glue JOB Start job_run = glue.start_job_run(JobName=JOB_NAME) job_run_id = job_run["JobRunId"] : : : # Glue JOB End glue.get_job_run(JobName=JOB_NAME, RunId=job_run_id) resp = glue.batch_stop_job_run({JOB_NAME, [job_run_id]}) # Athena Client athena = boto3.client("athena", region_name="us-east-1") # Run Query response = athena.start_query_execution( QueryString="ALTER TABLE {} ADD PARTITION ({}='{}') LOCATION '{}'".format("lineorder", "lo_orderdate", "2019-01-29", "s3://datalake/lineorder/"), QueryExecutionContext={"Database": "cm-database"}, ResultConfiguration={"OutputLocation": "s3://s3-staging-dir/"} )
最後に
これまでは、GlueのジョブにETLワークフローを実装していましたが、Spark固有の実装とETLワークフロー以外を分けて書くことが可能になりました。Python Shellは、1/16DPUの料金からPythonスクリプトが実行できるので、RedshiftのクエリやAWSサービスのAPI呼び出しの同期実行は、SparkのDriverよりもPython Shellで実行するのに向いているはずです。NumPy、Pandas、SciPy、sklearnもすぐに使えるので、アイディア次第で簡易な分析にも活用できそうです。個人的には、Pandasが入っているならpyarrowも利用できると嬉しかったな。