EC2インスタンスが稼働しっぱなしの場合にしつっこく通知してみた

EC2インスタンスが稼働しっぱなしの場合にしつっこく通知してみた

悲しみの連鎖はここで断ち切る。
Clock Icon2024.02.20

こんにちは。AWS事業本部コンサルティング部の戸川です。

人生って忘却の連続ですよね。

と言うわけで今回は、使い終わったら停止しておくべきEC2インスタンスをついつい止め忘れてしまう全人類のために、対策を考えてみました。

構成

方法は色々あると思いますが、今回は以下のような構成で実現しました。

処理の説明

EC2 Instance State-change NotificationイベントをAmazon EventBridgeで検知しinstance_state_change_functionを呼び出す

② インスタンスのstateに応じて起動時刻、停止時刻をinstance_operating_time_tableに書き込む

check_instance_operating_time_functionを定期実行する

instance_operating_time_tableを参照し、インスタンスの稼働時間を計算する

⑤ インスタンス稼働時間が規定を超えている場合にSNSトピックへメッセージを送信する

⑥ 超過稼働中のインスタンスIDと稼働時間をメールで通知する

この構成にした理由

  • インスタンスのOSに依存しない汎用的な仕組みを作りたかった
  • 新規で構築したインスタンスも自動的に検知対象へ加えたかった
  • 定期的にしつこく通知したかった

Amazon EventBridge

instance_state_change_rule

Lambda関数instance_state_change_functionを呼び出すためのルールです。

イベントパターン

{
  "source": ["aws.ec2"],
  "detail-type": ["EC2 Instance State-change Notification"]
}

ターゲット

instance_state_change_function

check_instance_operating_time_scheduler

Lambda関数check_instance_operating_time_functionを定期実行するためのルールです。

スケジュール

お好みでどうぞ。

ターゲット

check_instance_operating_time_function

Lambda

instance_state_change_function

instance_state_change_ruleから呼び出され、インスタンスの状態に応じた処理を行うための関数です。

running:
インスタンス起動時刻の書き込み

stopped:
インスタンス停止時刻の書き込み

terminated:
テーブルから該当アイテムを削除

import boto3
import logging

logger = logging.getLogger()
dynamodb = boto3.client('dynamodb')
table_name = 'instance_operating_time_table'

# メイン処理
def lambda_handler(event, context):
    instance_state = event['detail']['state']
    if instance_state == 'running':
        instance_start(event)
    elif instance_state == 'stopped':
        instance_stop(event)
    elif instance_state == 'terminated':
        instance_terminate(event)
    else:
        return


# インスタンス起動時、DynamoDBに起動時刻を登録
def instance_start(event):
    instance_id = event['detail']['instance-id']
    start_time = event['time']
    try:
        response = dynamodb.put_item(
            TableName=table_name,
            Item={
                'instance_id': {'S': instance_id},
                'start_time': {'S': start_time},
                'stop_time': {'S': ''}
            }
        )
        logger.info('書き込みが成功しました:', response)
    except Exception as e:
        logger.error('書き込み中にエラーが発生しました: %s', e)


# インスタンス停止時、DynamoDBに停止時刻を追加
def instance_stop(event):
    instance_id = event['detail']['instance-id']
    stop_time = event['time']
    try:
        response = dynamodb.update_item(
            TableName=table_name,
            Key={
                'instance_id': {
                    'S': instance_id
                }
            },
            UpdateExpression='SET stop_time = :val1',
            ExpressionAttributeValues={
                ':val1': {
                    'S': stop_time
                }
            }
        )
        logger.info('更新が成功しました:', response)
    except Exception as e:
        logger.error('更新中にエラーが発生しました: %s', e)


# インスタンス削除時、DynamoDBからitemを削除
def instance_terminate(event):
    instance_id = event['detail']['instance-id']
    try:
        response = dynamodb.delete_item(
            TableName=table_name,
            Key={
                'instance_id': {
                    'S': instance_id
                }
            },
        )
        logger.info('削除が成功しました:', response)
    except Exception as e:
        logger.error('更新中にエラーが発生しました: %s', e)

check_instance_operating_time_function

check_instance_operating_time_schedulerから定期実行され、実行中のインスタンス稼働時間チェックと通知を行う関数です。

import boto3
import logging
import os
from datetime import datetime, timedelta, timezone

logger = logging.getLogger()
dynamodb = boto3.client('dynamodb')
table_name = 'instance_operating_time_table'

# メイン処理
def lambda_handler(event, context):
    try:
        response = dynamodb.scan(
            TableName=table_name
        )
        items = response.get('Items', [])
        check_operating_time(items)
        logger.info('スキャン結果: %s', response)
    except Exception as e:
        logger.error('スキャン中にエラーが発生しました: %s', e)


# インスタンスの連続稼働時間を確認
def check_operating_time(items):
    utc_now = datetime.now(timezone.utc)
    # 規定時間を6時間に設定
    specified_time = timedelta(hours=6)
    for item in items:
        if not item['stop_time']['S']:
            instance_id = item['instance_id']['S']
            start_time_object = datetime.strptime(item['start_time']['S'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
            operating_time = utc_now - start_time_object
            # 稼働時間が規定時間を超えている場合、通知処理を実行
            if operating_time >= specified_time:
                notify_excess_operation(instance_id, operating_time)


# 超過稼働しているインスタンスについてメール通知
def notify_excess_operation(instance_id, operating_time):
    sns = boto3.client("sns", region_name="ap-northeast-1")
    topic_arn = os.environ["TOPIC_ARN"]
    message = (
        "instance_id: {}\noperating_time: {}"
    ).format(instance_id, operating_time)
    response = sns.publish(
        TopicArn=topic_arn,
        Message=message,
        Subject="instance_excess_operation"
    )

    return response

DynamoDB

instance_operating_time_table

インスタンスの起動時刻、停止時刻を記録するためのテーブルです。

テーブル設計

instance_id start_time stop_time
Partition key × ×
Sort key × × ×
Data type String String String

しつっこく通知した様子


しつこいな。

最後に

人々がこれ以上インスタンスの止め忘れという悲しみを背負わないように対策を考えてみました。
冒頭にも書きましたが、方法は色々あると思うのでご自身の状況に合ったやり方を見つけていただければと思います。

この記事が少しでもどなたかのお役に立てば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.