Amazon EMR: Spark を使用してクラスターを起動し、Amazon S3 バケットに格納されたPySparkスクリプトを実行してみた
こんにちは、yagiです。
Amazon EMRでSpark を使用してクラスターを起動し、Amazon S3 バケットに格納されたPySparkスクリプトを実行してみました。
Amazon EMR とは
Amazon EMR を使用すると、ビッグデータフレームワークを使用してデータを処理および分析するクラスターをわずか数分でセットアップできます。このチュートリアルでは、Spark を使用してサンプルクラスターを起動する方法と、Amazon S3 バケットに格納された簡単なPySparkスクリプトを実行する方法について説明します。計画と設定、管理、およびクリーンアップという 3 つの主要なワークフローカテゴリにおける Amazon EMR の必須タスクを取り上げます。
使ってみる
上記のドキュメントに従って、サンプルデータを利用して進めていきます。
まずはS3バケットを作成します。注意点としては以下があります。
- Amazon EMR クラスターを起動する AWS リージョンと同じリージョンに作成する(今回は米国西部 (オレゴン) の us-west-2)
- バケット名に使用できるのは、小文字、数字、ピリオド (.)、およびハイフン (-) のみ。末尾を数字にすることは不可。
- 出力フォルダは空にしておく
PySparkスクリプトと入力データを Amazon S3 バケットにアップロードします。
このスクリプトは食品施設の検査データを処理し、S3 バケットに結果ファイルを返すものとなっています。結果ファイルには、「red」タイプの違反が最も多い上位 10 施設がリストされます。
PySparkスクリプトの内容は以下となっています。
DOC-EXAMPLE-BUCKET の箇所はS3バケットの名前に置き換えておきます。
health_violations.py
import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri)
S3バケットへアップロードしました。
Amazon EMR クラスターを起動します。
先ほどS3バケットを作成したリージョン(国西部 (オレゴン) の us-west-2)に切り替え、コンソールの「クラスターを作成」ボタンを押下します。
クラスター作成画面でクラスター名を入力し、アプリケーションバンドルにSparkを指定します。そのほかはデフォルトのままとします。
クラスター設定もデフォルトの入力のままとします。
クラスターログのオプションでS3にログを出力するように設定します。
その他、キーペアとサービスロールの作成、インスタンスのプロファイルの作成を指定して、クラスターを作成ボタンを押下します。 ステータスが待機中に変わると、作業を受け付ける準備ができたこととなります。
試しにEMRにSSHしてみます。該当のセキュリティグループのインバウンドルールにポート22でソース0.0.0.0/0を追加しておきます。
下記の通り、無事入れました!
% ssh -i ~/*.pem hadoop@ec2-*******.us-west-2.compute.amazonaws.com Last login: Thu Jun 29 12:06:08 2023 __| __|_ ) _| ( / Amazon Linux 2 AMI ___|\___|___| https://aws.amazon.com/amazon-linux-2/ 10 package(s) needed for security, out of 12 available Run "sudo yum update" to apply all updates. -bash: warning: setlocale: LC_CTYPE: cannot change locale (UTF-8): No such file or directory EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
EMR に作業を送信します。
ステップタブから「ステップの追加」ボタンを押下し、追加するステップの設定内容を入力します。
「ステップを追加」ボタンを押下します。 ステータスが完了になるのを待って、結果を確認します。
S3のコンソールから、myOutputFolder を確認します。
CSVの結果ファイルと、_SUCCESS というオブジェクトが作成されていることを確認します。
結果のファイルを開き、上記10施設がリストされていることを確認しました。
感想
チュートリアルにしたがって、EMRのクラスターをセットアップ、コンソール上からステップを送信してS3から結果を取得までを実施しました。
なお作業後はEMRクラスターは停止して、再度利用したい場合は、アクションからクラスターを複製を選択すれば、同じ設定内容を複製して簡単に起動ができたり、クラスターの終了オプションで指定したアイドル時間を超えると自動終了できるように設定もできるので、その辺の配慮も安心して利用できる点かと思います。