Amazon EMR: Spark を使用してクラスターを起動し、Amazon S3 バケットに格納されたPySparkスクリプトを実行してみた

2023.07.24

こんにちは、yagiです。

Amazon EMRでSpark を使用してクラスターを起動し、Amazon S3 バケットに格納されたPySparkスクリプトを実行してみました。

Amazon EMR とは

Amazon EMR の使用開始

Amazon EMR を使用すると、ビッグデータフレームワークを使用してデータを処理および分析するクラスターをわずか数分でセットアップできます。このチュートリアルでは、Spark を使用してサンプルクラスターを起動する方法と、Amazon S3 バケットに格納された簡単なPySparkスクリプトを実行する方法について説明します。計画と設定、管理、およびクリーンアップという 3 つの主要なワークフローカテゴリにおける Amazon EMR の必須タスクを取り上げます。

使ってみる

上記のドキュメントに従って、サンプルデータを利用して進めていきます。
まずはS3バケットを作成します。注意点としては以下があります。

  • Amazon EMR クラスターを起動する AWS リージョンと同じリージョンに作成する(今回は米国西部 (オレゴン) の us-west-2)
  • バケット名に使用できるのは、小文字、数字、ピリオド (.)、およびハイフン (-) のみ。末尾を数字にすることは不可。
  • 出力フォルダは空にしておく

PySparkスクリプトと入力データを Amazon S3 バケットにアップロードします。
このスクリプトは食品施設の検査データを処理し、S3 バケットに結果ファイルを返すものとなっています。結果ファイルには、「red」タイプの違反が最も多い上位 10 施設がリストされます。

PySparkスクリプトの内容は以下となっています。
DOC-EXAMPLE-BUCKET の箇所はS3バケットの名前に置き換えておきます。

health_violations.py

import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations 
          FROM restaurant_violations 
          WHERE violation_type = 'RED' 
          GROUP BY name 
          ORDER BY total_red_violations DESC LIMIT 10""")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)

S3バケットへアップロードしました。

Amazon EMR クラスターを起動します。
先ほどS3バケットを作成したリージョン(国西部 (オレゴン) の us-west-2)に切り替え、コンソールの「クラスターを作成」ボタンを押下します。

クラスター作成画面でクラスター名を入力し、アプリケーションバンドルにSparkを指定します。そのほかはデフォルトのままとします。

クラスター設定もデフォルトの入力のままとします。

クラスターログのオプションでS3にログを出力するように設定します。

その他、キーペアとサービスロールの作成、インスタンスのプロファイルの作成を指定して、クラスターを作成ボタンを押下します。   ステータスが待機中に変わると、作業を受け付ける準備ができたこととなります。

試しにEMRにSSHしてみます。該当のセキュリティグループのインバウンドルールにポート22でソース0.0.0.0/0を追加しておきます。
下記の通り、無事入れました!

% ssh -i ~/*.pem hadoop@ec2-*******.us-west-2.compute.amazonaws.com
Last login: Thu Jun 29 12:06:08 2023

       __|  __|_  )
       _|  (     /   Amazon Linux 2 AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-2/
10 package(s) needed for security, out of 12 available
Run "sudo yum update" to apply all updates.
-bash: warning: setlocale: LC_CTYPE: cannot change locale (UTF-8): No such file or directory
                                                                    
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R   
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R 
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R 
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR   
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

EMR に作業を送信します。
ステップタブから「ステップの追加」ボタンを押下し、追加するステップの設定内容を入力します。

「ステップを追加」ボタンを押下します。  ステータスが完了になるのを待って、結果を確認します。

S3のコンソールから、myOutputFolder を確認します。

CSVの結果ファイルと、_SUCCESS というオブジェクトが作成されていることを確認します。

結果のファイルを開き、上記10施設がリストされていることを確認しました。

感想

チュートリアルにしたがって、EMRのクラスターをセットアップ、コンソール上からステップを送信してS3から結果を取得までを実施しました。

なお作業後はEMRクラスターは停止して、再度利用したい場合は、アクションからクラスターを複製を選択すれば、同じ設定内容を複製して簡単に起動ができたり、クラスターの終了オプションで指定したアイドル時間を超えると自動終了できるように設定もできるので、その辺の配慮も安心して利用できる点かと思います。