Amazon EMRでAmazon CloudWatchがデフォルトで使用できるようになりました
おのやんです。
みなさん、Amazon EMR(以下、EMR)で分散処理を行う際に、EMRクラスターで稼働するAmazon EC2(以下、EC2)インスタンスをAmazon CloudWatch(以下、CloudWatch)で監視してみたくないですか?私はめちゃくちゃ監視してみたいです。
EMRクラスタをCloudWatchで監視する場合、今まではユーザーデータなどを経由して、CloudWatchエージェントを手動でインストールする必要がありました。
しかし先日のアップデートにより、CloudWatchエージェントがインストールされた状態でEC2インスタンスを使ったEMRクラスタを作成することが可能になりました。
ということで、今回はこちらのアップデート内容を実際にやってみたいと思います。
アップデート内容
先日、EMRに関するアップデートが発表されました。EMRクラスターで起動するEC2インスタンスがAmazon Linux 2023に対応したほか、Apache Spark 3.5、Python 3.9などが搭載されています。
これらのアップデートの一環で、EMRがCloudWatchエージェントにも対応しました。これにより、EMRクラスターをデフォルトでCloudWatch監視できるようになりました。アップデート内容でも言及されています。
Amazon EMR リリース 7.0 には、クラスターを自動的に観察したり監視したりできる新しい拡張機能が同梱されています。Amazon CloudWatch、Amazon Managed Service for Prometheus、またはセルフマネージド型 Prometheus クラスターにシステムメトリクスをパブリッシュし、CloudWatch ダッシュボード、Amazon Managed Grafana、または API を使用してメトリクスを分析できます。
検証
それでは、実際に構築してみます。今回は公式ドキュメントのチュートリアルを例に、簡単なSparkアプリケーションを実行していきます。この際、EC2インスタンスをCloudWatchで監視します。
EMR用のファイルを準備
まずは、EMRに関する入力データ・出力データ・ログファイルの保存先としてS3バケットを作成します。
次に、health_violations.py
ファイルを作成します。health_violations.py
の内容は以下の通りです。
import argparse
from pyspark.sql import SparkSession
def calculate_red_violations(data_source, output_uri):
"""
Processes sample food establishment inspection data and queries the data to find the top 10 establishments
with the most Red violations from 2006 to 2020.
:param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
:param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
"""
with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
# Load the restaurant violation CSV data
if data_source is not None:
restaurants_df = spark.read.option("header", "true").csv(data_source)
# Create an in-memory DataFrame to query
restaurants_df.createOrReplaceTempView("restaurant_violations")
# Create a DataFrame of the top 10 restaurants with the most Red violations
top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations
FROM restaurant_violations
WHERE violation_type = 'RED'
GROUP BY name
ORDER BY total_red_violations DESC LIMIT 10""")
# Write the results to the specified output URI
top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
parser.add_argument(
'--output_uri', help="The URI where output is saved, like an S3 bucket location.")
args = parser.parse_args()
calculate_red_violations(args.data_source, args.output_uri)
また、こちらのページに掲載のリンクからfood_establishment_data.zip
をダウンロードします。
これを解凍して得られたfood_establishment_data.csv
ファイルを、同時にS3バケットに保存します。これで準備は完了です。
EMRクラスターの構築
それでは、実際に分散処理を実行するクラスタを作成してみます。なお、VPC・サブネット・EC2インスタンス用のキーペアは作成されているものとします。
まずはEMRクラスター作成画面に移動します。EMRリリースの7.0.0を選択すると、バンドルできるアプリケーションとしてCloudWatchエージェントを選択できます。今回は、デフォルトのアプリケーションの方には手を加えず、CloudWatchエージェントを追加するのみで構成します。
EMRクラスターの作成画面は設定項目が多いので、下にスクロールして適宜設定していきます。
ネットワークは既存のVPC・サブネットを選択します。このサブネットにEC2が作成されます。
クラスターログの項目では、EMRクラスターのアプリケーションを実行した際のログの格納場所を指定します。今回はEMR用に用意したS3バケットの直下に/logsのフォルダを作成して、ログの格納先とします。
s3://<S3バケット名>/logs
EC2のセキュリティに関する設定項目では、既存のキーペアを設定します。
EC2のインスタンスプロファイルでは「インスタンスプロファイルを作成」を選択します。S3バケットアクセスでは、「読み取りおよび書き込みのアクセス権を持つ、このアカウントのすべてのS3バケット」を選択します。
これらが設定できたら、EMRクラスターを作成します。作成直後は、こちらのようにステータスが「開始中」で表示されます。
EMRクラスターでアプリケーション実行の準備が整うと、「待機中」のステータスに変わります。
EMRクラスターにてアプリケーションを実行
EMRクラスターが準備できたら、ここから実際にアプリケーションを動かし、CloudWatchで監視してみましょう。
EMRクラスター詳細画面の「ステップ」タブを開くと、「ステップを追加」のボタンがあります。こちらを押下します。
ここでは、実行するアプリケーションをステップとして設定できます。今回はSparkアプリケーションを実行するので、タイプの項目には「Sparkアプリケーション」を選択します。また、名前は適用なものをつけて、デプロイモードは「クラスターノード」に設定します。
「アプリケーションの場所」の項目では、先ほど作成したS3バケットの直下に作成したhealth_violations.py
ファイルを選択します。
s3://<S3バケット名>/health_violations.py
Spark-submitオプションはそのまま空白にしておきます。
引数の部分ですが、こちらは以下の文字列を入力します。今回はSparkアプリケーションの出力の格納先として/outputs
フォルダを指定します。
--data_source s3://<S3バケット名>/food_establishment_data.csv
--output_uri s3://<S3バケット名>/outputs
ステップアクションは「続行」に設定しておきます。
これらの設定をまとめたのが、以下の画像です。こちらの設定が完了したら、「ステップを追加」を押下します。
正常にステップが送信された場合、ステップ一覧画面にてステータスが「Pending」になるのが確認できます。
アプリケーションが正常に処理を実行できれば、ステータスが「Completed」に変化します。
正常に/outputs
フォルダの中に出力としてCSVファイルが作成されているのが確認できます。
CloudWatchをメトリクスも確認できます。EMRクラスター詳細画面の「モニタリング」タブを開くと、CloudWatchによって取得された書くメトリクスを確認できます。今回は簡易的なSparkアプリケーションのサンプル処理を実行しているため時間があまり長くは取れていませんが、しっかりEMRクラスターのEC2インスタンスのメトリクスが取れているのがわかります。
EMRリリース7.0.0からはデフォルトでCloudWatchが使える
今回はAWS公式ドキュメントに記載のあるEMRチュートリアルの内容を、CloudWatchありで実施してみました。
CloudWatchエージェントの追加方法も、チェックボックスにチェックをつけるだけなので非常にシンプルです。皆さんもEMRを使用する際には、ぜひCloudWatchを使ってみてください。
では!