AWS Glue の新しいジョブタイプ『Python Shell』を実際に試してみました

はじめに

新しいジョブタイプ『Python Shell』は、単にPythonスクリプトを実行する目的のジョブです。AWS Glueを使っている人であれば、このありがたみが身にしみて感じるはずです。

AWS Glue の Python Shell とは

Python Shellは、Glueジョブに追加されたジョブの種類の一つです。従来は、Apache Sparkのスクリプトのみでしたが、今回新たにPython Shellが追加されました。

従来のジョブタイプ「Spark」は、Sparkのクラスタを制御して大規模分散処理します。このジョブタイプは、ジョブを起動すると1つのDriverと複数のExecutorが起動され、スクリプトはDriverで実行されます。

新たに追加されたジョブタイプ「Python Shell」は、単にPythonスクリプトを実行する目的のジョブです。

Python Shell のスペックは

  • 初期状態
    • 20秒未満で起動
    • ランタイム制限なし
  • サイズ
    • 1 DPU(16GBを含む)
    • 1/16 DPU(1GBを含む)
  • 価格
    • DPU-hourあたり0.44ドル、最低1分間、1秒あたりの請求

これまでの解説を見るとPython Shellではなく、AWS Lambdaでいいのでは?と思うかもしれません。Python Shellは、従来のジョブと同様にトリガに組み込んだり、スケジュールやJob eventsで他のトリガと連携させて操作させることが可能です。また、データ分析用途のライブラリもビルトインでインストール済みです。

Python 2.7 環境で 以下のライブラリが利用できます。

  • Boto3
  • collections
  • CSV
  • gzip
  • multiprocessing
  • NumPy
  • pandas
  • pickle
  • re
  • SciPy
  • sklearn
  • sklearn.feature_extraction
  • sklearn.preprocessing
  • xml.etree.ElementTree
  • zipfile

Python Shell のユースケース

  • 小中規模のタスク向けETL実行環境
    • SQLによるETL処理
    • S3や3rdパーティサービスとの連携
    • MLサービスからPython Shellの起動

GlueのジョブはPython(PySpark)やScalaで、Glue API や Apache Spark API を用いて、ETLのコードを作成します。実際のETLワークフローでは、Glueジョブだけではなく、S3のオブジェクトのハンドリング〜Glueクローラとの連携〜ジョブをまとめたトリガを起動〜パーティション更新〜SageMakerのジョブの実行など、いわゆるオーケストレーションが必要です。これらをオーケーストレーションするコードをPython Shellに作成したり、Python Shellと従来のジョブをTriggerのJob eventsで数珠つなぎにETLワークフローを組み上げていくという方法も考えられます。

Python Shell の例

従来のジョブで以下の処理を実装すると、RedshiftからすべてのレコードをS3にアンロードした後、Glueでフィルタリング、変換する必要がありました。

下記のPython Shell の例では、Redshiftの集計〜Glueのジョブ〜パーティションの追加といった一連の処理を、ETLパイプラインとしてまとめることができます。(以下はあくまでも例なので、プロダクションでは、エラーハンドリングしてください)

  • Redshift上のテーブルを集計した結果をS3にCSV出力
  • Glueのジョブ(CSVをParquetに変換)の実行
  • パーティションを追加する
import boto3
import pg

def get_connection():
    rs_conn_string = "host=%s port=%s dbname=%s user=%s password=%s" % (
        "cm-cluster.abcdefghij.us-east-1.redshift.amazonaws.com",
        "5439",
        "cm_db",
        "cm_user",
        "cm_password")

    rs_conn = pg.connect(dbname=rs_conn_string)
    rs_conn.query("set statement_timeout = 1200000")
    return rs_conn


def query(con):
    statement = "unload ('select lo_orderdate,sum(lo_quantity) as lo_quantity, sum(lo_ordertotalprice) as lo_ordertotalprice from cm_test.lineorder group by 1 order by 1') to 's3://cm-bucket/lineorder_summary/' iam_role 'arn:aws:iam::1234567890123:role/cm-redshift-role' delimiter ',' header;"
    res = con.query(statement)
    return res


# ELT in Redshift, then Unload S3
con = get_connection()
res = query(con)
print res

# Glue Client
glue = boto3.client(service_name="glue", region_name="us-east-1", endpoint_url="https://glue.ap-northeast-1.amazonaws.com")

# Glue JOB Start
job_run = glue.start_job_run(JobName=JOB_NAME)
job_run_id = job_run["JobRunId"]

:
:
:

# Glue JOB End
glue.get_job_run(JobName=JOB_NAME, RunId=job_run_id)
resp = glue.batch_stop_job_run({JOB_NAME, [job_run_id]})

# Athena Client
athena = boto3.client("athena", region_name="us-east-1")

# Run Query
response = athena.start_query_execution(
    QueryString="ALTER TABLE {} ADD PARTITION ({}='{}') LOCATION '{}'".format("lineorder", "lo_orderdate", "2019-01-29", "s3://datalake/lineorder/"),
    QueryExecutionContext={"Database": "cm-database"},
    ResultConfiguration={"OutputLocation": "s3://s3-staging-dir/"}
)

最後に

これまでは、GlueのジョブにETLワークフローを実装していましたが、Spark固有の実装とETLワークフロー以外を分けて書くことが可能になりました。Python Shellは、1/16DPUの料金からPythonスクリプトが実行できるので、RedshiftのクエリやAWSサービスのAPI呼び出しの同期実行は、SparkのDriverよりもPython Shellで実行するのに向いているはずです。NumPy、Pandas、SciPy、sklearnもすぐに使えるので、アイディア次第で簡易な分析にも活用できそうです。個人的には、Pandasが入っているならpyarrowも利用できると嬉しかったな。

合わせて読みたい

[レポート] ANT308 : AWS Glue のサーバレスアナリティクスパイプライン構築する #reinvent