落とし忘れたAmazon EMRクラスターを通知する

2017.10.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon EMRを使った開発をしていると、うっかりクラスターを落とし忘れて運用費を圧迫してしまうことがあります。

AWS Lambdaでクラスターのランニング状況を監視し、落とし忘れたクラスターを通知する方法を紹介します。

アーキテクチャ

lambda-cloudwatch-events-architecture

処理の流れは以下です

  1. Amazon CloudWatch Events で定期的に AWS Lambdaを呼び出す
  2. AWS Lambda は EMR クラスター一覧の起動状況をチェック
  3. クラスターの起動時間が閾値を超えていると、Amazon SNS に通知

ウォークスルー

それでは、実際に通知システムを作成します。

Step 1 : 通知用Amazon SNSの作成

通知用のAmazon SNS を用意します。 SNS より先の購読先は、E-Mailや Lambda 関数など各自用意してください。

Step 2 : Lambda 向け IAM Role の作成

Lambda 向け IAM Role を作成します。

Lambda が利用するロールのため、Trusted entities は 以下のようになります。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

また、Lambda 関数は

  • SNS へのメッセージ送信
  • EMR へのクラスター一覧の取得
  • CloudWatch Logs へのログ出力

を行うため、以下のようなポリシーを用意します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "arn:aws:sns:REGION:AWS-ACCOUNT-ID:SNS-Name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:ListClusters"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

sns:Publish の送信先SNS(Resource)には、Step 1で作成したSNSのARNに置き換えてください。

Step 3 : Lambda 関数の作成

EMRクラスターを監視する Python 2.7のLambda関数を作成します。

Lambda 関数の Execution role

Lambda の Execution role には Step 2 で作成した IAM Role を設定します。

Basic settings

以下の環境変数を利用します

  • THRESHOLD_CLUSTER_HOUR : EMRランニング時間の閾値
  • SNS_ARN : 送信先SNS

Lambda 関数のコード

追加でインストールするライブラリは存在しないため、AWS管理コンソールのインライン編集で完結します。

check_running_emr

import boto3
import datetime
import json
import os

from dateutil.tz import tzlocal

THRESHOLD_CLUSTER_HOUR = int(os.environ['THRESHOLD_CLUSTER_HOUR'])
SNS_ARN = os.environ['SNS_ARN']

sns = boto3.client('sns')
emr = boto3.client('emr')

def json_serial(obj):
    """JSON serializer for objects not serializable by default json code"""

    if isinstance(obj, (datetime.datetime, datetime.date)):
        return obj.isoformat()
    raise TypeError ("Type %s not serializable" % type(obj))

def lambda_handler(event, context):
    current_time = datetime.datetime.now(tz = tzlocal())

    response = emr.list_clusters(
        ClusterStates=[
          'STARTING',
          'BOOTSTRAPPING',
          'RUNNING',
          'WAITING',
          'TERMINATING',
          ]
    )
    for cluster in response['Clusters']:
        creatim_date_time = cluster['Status']['Timeline']['CreationDateTime']

        subject = "[{}:{}] running {} Normalized Instance Hours".format(
            cluster['Name'],
            cluster['Id'],
            cluster['NormalizedInstanceHours'])
        main_body = json.dumps(cluster, indent = 2, default = json_serial)

        if current_time > creatim_date_time + datetime.timedelta(hours = THRESHOLD_CLUSTER_HOUR):
            sns.publish(
                TargetArn = SNS_ARN,
                Subject = subject,
                Message = main_body,
            )
    return 0

やっていることは以下の3つです

(1) EMR のクラスター一覧をAPI で取得

response = emr.list_clusters(
    ClusterStates=['STARTING','BOOTSTRAPPING','RUNNING','WAITING','TERMINATING']
)

停止状態のクラスターは除外しています。

(2) クラスターの起動時間が閾値を超えていないかチェック

クラスターの起動日時を EMR:ListClusters のレスポンスにある CreationDateTimeから取得し、現在日時との差分を求めます

creatim_date_time = cluster['Status']['Timeline']['CreationDateTime']
...
current_time > creatim_date_time + datetime.timedelta(hours=THRESHOLD_CLUSTER_HOUR)

(3) 閾値を超えていたらSNSに通知

sns.publish(
    TargetArn=SNS_ARN,
    Subject=subject,
    Message=main_body,
)

Step 4 : Lambda を呼び出すイベントスケジューラーの作成

CloudWatch Events を使って Lambda を定期的に呼びだします。

CloudWatch の画面で Events を選択し、新規ルールを作成します。

  • 定期実行のため Event Source は Schedule を選び、 Fixed rate of 2 Hours を設定します。
  • Target には Step 3 で作成した Lambda 関数を設定します。

cloudwatch_event_rule_for_lambda

Lambda 関数の Triggers タブを確認すると CloudWatch Eventsがトリガー登録されているはずです。

check_cloudwatch_event_rule_for_lambda

Step 5 : Lambda 関数のテスト実行

CloudWatch Events のスケジューラーで Lambda 関数が呼び出されると、次のようなイベント情報が Lambda 関数に渡ります。

{
  "source": "aws.events",
  "account": "123456789012",
  "version": "0",
  "time": "2017-10-07T10:25:30Z",
  "id": "1234-1234-1234-1234-1234",
  "region": "eu-central-1",
  "detail": {},
  "resources": [
    "arn:aws:events:eu-central-1:123456789012:rule/emr_cluster_check_rule"
  ],
  "detail-type": "Scheduled Event"
}

この JSON をテストイベントの入力情報にして、実際に Lambda 関数の動作確認を行います。

Lambda 関数のメニュー ”Configure test events” からイベントのテストデータを設定します。

configure-test-event-for-lambda

テストイベントを設定後は「Test」からLambda関数を手動実行します。

エラーが起きた場合は、エラーメッセージに従い、不具合修正して下さい。

Step 6 : 通知確認

監視対象のEMRクラスターが見つかると、次のような通知が SNS 経由で飛ぶようになります。

件名

[Test:j-1L2I1H32ROFEN] running 8 Normalized Instance Hours

本文

{
  "Status": {
    "Timeline": {
      "ReadyDateTime": "2017-10-07T09:20:24.324000+00:00",
      "CreationDateTime": "2017-10-07T09:09:05.442000+00:00"
    },
    "State": "WAITING",
    "StateChangeReason": {
      "Message": "Cluster ready after last step completed."
    }
  },
  "NormalizedInstanceHours": 8,
  "Id": "j-1L2I1H32ROFEN",
  "Name": "Test"
}

まとめ

クラウドの強みを活かしてEMRを富豪的に使うのも楽しいですが、一方で、必要以上にリソースを使ってしまうリスクもあります。 利用費の高いサービスに対しては、無駄なリソースを定期的に監視すると、運用費の削減が期待できます。

なお、今回作成した通知システムは最低限の機能しか持ち合わせていません。

実際の運用に合わせて、以下のような機能改善が考えられるでしょう。

  • 長時間起動しているクラスターはターミネートさせる
  • 対象クラスターが50を超える場合に備えて、ページング処理を追加
  • SNS から Slack などのチャットに通知させる
  • Lambda の失敗時には DLQ に飛ばす
  • 全リージョンに対して実行

参考

  • EMR API Reference :ListClusters http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListClusters.html