2023年7月版 EMR on EC2のチュートリアルを試しました

EMR on EC2のチュートリアルを試しました
2023.07.07

4年近くEMRを触っていなかったので久しぶりにEMRを触ることにしました。ということで、まずはEMR on EC2の Tutorial: Getting started with Amazon EMR - Amazon EMR を試してみることにしました。

前提

  • EMR version: emr-6.11.0
  • Applicaiton: Spark(実際には PySpark を利用する)
  • ハードウェア構成: m6a.xlarge を 1 台(検証用なのでプライマリーノードのみ)
  • リージョン: 東京リージョン
  • VPC: Default VPC
  • EMR service role: EMR_DefaultRole
  • EC2 instance profile: EMR_EC2_DefaultRole
  • AWS CLI: aws-cli/2.12.6 Python/3.11.4 Linux/4.14.255-314-253.539.amzn2.x86_64 exec-env/CloudShell exe/x86_64.amzn.2 prompt/off(CloudShell を利用)

S3バケットのフォルダ構成

検証用にS3バケットを用意します。DOC-EXAMPLE-BUCKET の箇所を自身の環境に読み替えてください。health_violations.py が PySpark のスクリプト、food_establishment_data.csv が入力ファイル、myOutputFolder が処理結果の出力先フォルダになります。

  • s3://DOC-EXAMPLE-BUCKET/health_violations.py
  • s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv
  • s3://DOC-EXAMPLE-BUCKET/myOutputFolder

事前準備

ドキュメントに記載されている通り health_violations.py と food_establishment_data.csv を用意してS3バケットにアップロードします。

health_violations.py

処理内容は violation_type = 'RED' であるレコードについて name と total_red_violations で集約して、降順にソートした上で先頭の10レコードを出力するという内容です。検証用に利用するS3バケットの直下にアップロードします。

import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations 
          FROM restaurant_violations 
          WHERE violation_type = 'RED' 
          GROUP BY name 
          ORDER BY total_red_violations DESC LIMIT 10""")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)

food_establishment_data.csv

food_establishment_data.zip からダウンロードします。解凍した food_establishment_data.csv を検証用に利用するS3バケット直下にアップロードします。中身はワシントン州キング郡における保健局の2006年から2020年の検査結果です。以下のようなデータが入っています。

name, inspection_result, inspection_closed_business, violation_type, violation_points
100 LB CLAM, Unsatisfactory, FALSE, BLUE, 5
100 PERCENT NUTRICION, Unsatisfactory, FALSE, BLUE, 5
7-ELEVEN #2361-39423A, Complete, FALSE, , 0

EMRクラスターの作成

手順の再現が容易なのでCLIベースで進めます。まずはIAM Roleを作成します。次にEMRのクラスターを作成します。

IAM Roleの作成

EMRクラスターで利用する各種IAM Roleを作成します。

aws emr create-default-roles

コマンドの詳細は create-default-roles — AWS CLI 2.12.7 Command Reference を参照してください。なお、すでに作成済みの場合はこの手順は不要です。また、再実行しても特に問題はありません。以下の3つのIAM Roleが作成されます。

  • EMR_EC2_DefaultRole
  • EMR_DefaultRole
  • EMR_AutoScaling_DefaultRole

EMRクラスターの作成

次にEMRクラスターを作成します。

aws emr create-cluster \
--name "My First EMR Cluster" \
--release-label emr-6.11.0 \
--applications Name=Spark \
--instance-type m6a.xlarge \
--instance-count 1 \
--use-default-roles

コマンドの詳細は create-cluster — AWS CLI 2.12.7 Command Reference を参照してください。公式ドキュメントとの違いは以下のとおりです。

  • instance-type: 更に安い m6a.xlarge に変更した
  • instance-count: プライマリーノードのみの構成にするため 1 に変更した
  • --ec2-attributes KeyName: SSHで接続しないため記述自体を削除した

実行すると以下のような内容が出力されます。

{
    "ClusterId": "j-XXXXXXXX",
    "ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:XXXXXXXX:cluster/j-XXXXXXX"
}

クラスターの状態の確認

作成したクラスターの状態を確認します。<myClusterId> の箇所に create-cluster コマンドの処理結果の ClusterId の箇所の値を入れてください。

aws emr describe-cluster --cluster-id <myClusterId>

実行すると以下のような内容が出力されます。コマンドの結果が長いため一部割愛しています。ポイントは State が WAITING になっていることです。WAITING はアプリケーションの実行待ちの状態なのでジョブを送信すれば実行されます。

{
    "Cluster": {
        "Id": "j-XXXXXXXX",
        "Name": "My First EMR Cluster",
        "Status": {
            "State": "WAITING",
            "StateChangeReason": {
                "Message": "Cluster ready to run steps."
            },
            "Timeline": {
                "CreationDateTime": "2023-07-07T10:45:48.757000+00:00",
                "ReadyDateTime": "2023-07-07T10:50:13.655000+00:00"
            }
        },
...

もしくはクラスターの一覧を出力するコマンドで WAITING のもののみ出力するという確認の仕方もあります。

aws emr list-clusters --cluster-states WAITING

実行して以下のような内容が出力されればクラスターが WAITING になっていると判断できます。複数のクラスターを起動している場合は idが今回起動したクラスターのIDであることを確認してください。

{
    "Clusters": [
        {
            "Id": "j-XXXXXXX",
            "Name": "My First EMR Cluster",
            "Status": {
                "State": "WAITING",
                "StateChangeReason": {
                    "Message": "Cluster ready to run steps."
                },
                "Timeline": {
                    "CreationDateTime": "2023-07-07T10:45:48.757000+00:00",
                    "ReadyDateTime": "2023-07-07T10:50:13.655000+00:00"
                }
            },
            "NormalizedInstanceHours": 0,
            "ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:XXXXXXX:cluster/j-XXXXXXX"
        }
    ]
}

ジョブの送信と結果の確認

待機中のEMRクラスターにジョブを送信しS3バケットに出力される処理結果を確認します。

ジョブの送信

クラスターにジョブを送信することで実際に処理を実行させることができます。<> で囲まれている箇所を自身の環境に書き換えてください。DOC-EXAMPLE-BUCKET の箇所は検証用S3バケットになります。また書き換えた際に <> の文字の削除を忘れないようにしてください。

aws emr add-steps \
--cluster-id <myClusterId> \
--steps Type=Spark,Name="My Spark Application",ActionOnFailure=CONTINUE,Args=[<s3://DOC-EXAMPLE-BUCKET/health_violations.py>,--data_source,<s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv>,--output_uri,<s3://DOC-EXAMPLE-BUCKET/MyOutputFolder>]

実行すると以下のような内容が出力されます。コマンド名が add-steps であったようにEMRでは個々のジョブのことを Step と呼びます。

{
    "StepIds": [
        "s-XXXXXXXXXXXXX"
    ]
}

STEPの処理状況の確認

cluster-id と step-id を指定することでSTEPの処理状況を確認できます。

aws emr describe-step --cluster-id <myClusterId> --step-id <s-1XXXXXXXXXXA>

実行すると以下のような内容が出力されます。State が COMPLETED になれば処理終了です。RUNNING は処理中を意味します。COMPLETED になるまで何度かコマンドを実行して処理状況を確認してください。

{
    "Step": {
        "Id": "s-XXXXXXXX",
        "Name": "My Spark Application",
        "Config": {
            "Jar": "command-runner.jar",
            "Properties": {},
            "Args": [
                "spark-submit",
                "s3://XXXXXXXX/health_violations.py",
                "--data_source",
                "s3://XXXXXXXX/food_establishment_data.csv",
                "--output_uri",
                "s3://XXXXXXXX/MyOutputFolder"
            ]
        },
        "ActionOnFailure": "CONTINUE",
        "Status": {
            "State": "RUNNING",
            "StateChangeReason": {},
            "Timeline": {
                "CreationDateTime": "2023-07-07T11:03:54.358000+00:00",
                "StartDateTime": "2023-07-07T11:04:11.121000+00:00"
            }
        }
    }
}

結果の確認

検証用に用意したS3バケットの MyOutputFolder 配下に処理結果のCSVファイルが出力されます。part-00000 から始まるファイルです。

CSVファイルの中身は以下のとおりです。

name,total_red_violations
SUBWAY,322
T-MOBILE PARK,315
WHOLE FOODS MARKET,299
PCC COMMUNITY MARKETS,251
TACO TIME,240
MCDONALD'S,177
THAI GINGER,153
SAFEWAY INC #1508,143
TAQUERIA EL RINCONSITO,134
HIMITSU TERIYAKI,128

クラスターの削除と後処理

以上でチュートリアルは終了です。起動したEMRクラスターと検証用のS3バケットを削除します。

クラスターの削除

クラスターを削除しないと利用費が発生し続けるため削除します。なお、このコマンドは処理に成功した場合は処理結果として何も出力されません。

aws emr terminate-clusters --cluster-ids <myClusterId>

クラスター作成時にも利用した describe-cluster コマンドを実行してクラスターの状態を確認します。

aws emr describe-cluster --cluster-id <myClusterId>

State が TERMINATED になれば削除完了です。

{
    "Cluster": {
        "Id": "j-XXXXX",
        "Name": "My First EMR Cluster",
        "Status": {
            "State": "TERMINATED",
            "StateChangeReason": {
                "Code": "USER_REQUEST",
                "Message": "Terminated by user request"
            },
            "Timeline": {
                "CreationDateTime": "2023-07-07T10:45:48.757000+00:00",
                "ReadyDateTime": "2023-07-07T10:50:13.655000+00:00",
                "EndDateTime": "2023-07-07T11:22:50.819000+00:00"
            }
        },
...

検証用S3バケットの削除

必要に応じて検証用に作成したS3バケットを削除してください。

最後に

4年ぶりに触ってみましたが基本的な使い方は変わっていないことを確認できました。EMRの利用方法は他にもEMR on EKSやEMR Serverlessも増えているのでそれぞれ触って違いを確認できればと考えています。