バッチ処理のコスト削減に EC2 インスタンスプール方式を実装してみた

バッチ処理のコスト削減に EC2 インスタンスプール方式を実装してみた

2025.10.22

はじめに

短時間バッチ処理でジョブごとに専用 EC2 インスタンスを用意していると、EBS ストレージコストが発生し続けて非効率です。

業務で同様の課題に遭遇したため、少数のインスタンスを複数ジョブで共有するリソースプール方式を設計して実装してみました。
少数のインスタンス (例: 3 台) をプールして複数のジョブで共有することで、コスト削減と運用効率化を実現できます。

リソースプール方式の設計

アーキテクチャ概要

アーキテクチャの全体像は以下のような感じです。
インスタンスは停止状態でプールしておき、バッチ処理時のみ起動するという構成です。

アーキテクチャ図.png

主要コンポーネント

コンポーネント 役割 詳細
EC2 インスタンスプール バッチ処理の実行環境 少数のインスタンス (例: 3 台) を停止状態でプールし、複数のテナント/ジョブで共有
Amazon DynamoDB インスタンス状態管理 インスタンスの利用状況 (available / in-use) とテナント紐付けを管理
Amazon SQS ジョブキューイング バッチジョブリクエストをキューイングし、インスタンスが空くまで待機
Amazon EventBridge イベント駆動処理 EC2 の状態変更イベント (stopped / running) を検知し、DynamoDB 更新をトリガー
AWS Lambda 制御ロジック実行 インスタンスの割り当て、起動、停止、状態更新などの制御処理を実行

処理フロー

  1. ジョブリクエスト: バッチ処理リクエストが SQS キューに送信される
  2. インスタンス検索: Lambda 関数が DynamoDB から available なインスタンスを検索
  3. インスタンス割り当て: 楽観的ロックを使用してインスタンスを in-use 状態に更新
  4. EC2 起動: 停止中のインスタンスを起動 (stoppedrunning)
  5. バッチ処理実行: 起動したインスタンスでバッチ処理を実行
  6. EC2 停止: 処理完了後、インスタンスを停止 (runningstopped)
  7. 状態更新: EventBridge が EC2 停止イベントを検知し、DynamoDB を available 状態に更新

実装の詳細

処理の流れに沿って、各コンポーネントの実装を説明します。

① ジョブのキューイング

バッチ処理リクエストを SQS でキューイングします。
VisibilityTimeout はバッチ処理の最大実行時間に合わせて設定し、本番環境ではデッドレターキュー (DLQ) も設定しておきましょう。

② インスタンスの割り当てと起動

Lambda 関数が SQS からメッセージを受信し、DynamoDB から利用可能なインスタンスを検索して割り当て、EC2 インスタンスを起動します。

DynamoDB スキーマ

DynamoDB でインスタンスの利用状況を管理します。
今回は必要最小限の設計で実装してみます。

項目名 説明
instance_id String (PK) EC2 インスタンス ID (例: i-0123456789abcdef0)
status String インスタンスの状態 (available / in-use)
tenant_id String 使用中のテナント ID (available 時は null)
job_id String 実行中のジョブ ID (available 時は null)
version Number 楽観的ロック用のバージョン番号
updated_at String 最終更新日時 (ISO 8601 形式)

job_idversion の使いどころについては後述します。

ステータス遷移

DynamoDB の status は、EC2 インスタンスのステータスと連動する形になります。

[DynamoDB] available → in-use → (処理完了後) → available
[EC2 実機] stopped → running → (処理完了後) → stopped

処理完了後は EventBridge が EC2 の停止イベントを検知することで、DynamoDB の状態を自動的に available 状態に戻します。

処理の実装

さて、ここから実装内容の紹介です。
SQS からメッセージを受信し、インスタンスを割り当てて起動する一連の処理を実装してみました。

import json
import boto3
from datetime import datetime
from botocore.exceptions import ClientError

ec2 = boto3.client('ec2')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('InstancePool')

def lambda_handler(event, context):
    # SQS イベントソースから Records を取得
    for record in event.get('Records', []):
        message_body = json.loads(record['body'])
        tenant_id = message_body.get('tenant_id')
        job_id = message_body.get('job_id')

        instance_id = find_and_allocate_instance(tenant_id, job_id)

        if instance_id is None:
            # インスタンスが空くまで待機(SQS が自動リトライ)
            raise RuntimeError("No available instances in pool")

        start_instance(instance_id)

        execute_batch_job(instance_id, message_body)

def find_and_allocate_instance(tenant_id, job_id):
    response = table.scan(
        FilterExpression='#status = :available',
        ExpressionAttributeNames={'#status': 'status'},
        ExpressionAttributeValues={':available': 'available'}
    )

    instances = response.get('Items', [])
    if not instances:
        return None

    # 複数見つかった場合は順番に試すことで、競合時の成功率を上げる
    for instance in instances:
        try:
            # version を使うことで、読み取りから更新までの間に他のプロセスが
            # 変更していないことを保証する
            table.update_item(
                Key={'instance_id': instance['instance_id']},
                UpdateExpression='SET #status = :in_use, tenant_id = :tenant, job_id = :job, version = version + :inc, updated_at = :time',
                ConditionExpression='#status = :available AND version = :version',
                ExpressionAttributeNames={
                    '#status': 'status'  # status は予約語のため、ExpressionAttributeNames を使用
                },
                ExpressionAttributeValues={
                    ':in_use': 'in-use',
                    ':available': 'available',
                    ':tenant': tenant_id,
                    ':job': job_id,  # トレーサビリティのため、実行中のジョブ ID を記録
                    ':version': instance['version'],  # 読み取り時の version と一致することを確認
                    ':inc': 1,  # 更新のたびに version をインクリメントして競合を検知可能に
                    ':time': datetime.now().isoformat()
                }
            )
            return instance['instance_id']
        except ClientError as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                # これにより、全インスタンスが使用中になるまで試行を続ける
                continue
            raise

    return None

def start_instance(instance_id):
    ec2.start_instances(InstanceIds=[instance_id])

    # 起動完了を待たないと、バッチ処理が実行できない
    waiter = ec2.get_waiter('instance_running')
    waiter.wait(
        InstanceIds=[instance_id],
        WaiterConfig={
            'Delay': 15,  # 15 秒ごとにチェック
            'MaxAttempts': 40  # 最大 10 分待機
        }
    )

    print(f"Instance {instance_id} is now running")

楽観的ロックによる排他制御

上記コードの find_and_allocate_instance() 関数では、楽観的ロックを使用して排他制御を行っています。

ConditionExpressionversion をチェックすることで、複数の Lambda が同時に実行されても、同じインスタンスに複数のジョブが割り当てられることを防ぎます。
もし他のプロセスが先に割り当てていた場合、ConditionalCheckFailedException が発生し、次のインスタンスを試行する、という流れになります。

③ バッチ処理の実行

起動したインスタンスでバッチ処理を実行し、バッチ処理が完了したらインスタンスを停止します。
バッチの実行方法や EC2 インスタンスの停止方法は割愛しますが、SSM Run Command を利用すると手軽に実装できそうですね。

④ 処理完了後の状態更新

EC2 インスタンスが停止すると、EventBridge が停止イベントを検知し、Lambda 関数が DynamoDB の状態を更新します。

EventBridge ルール設定

EC2 の状態変更イベントを検知する EventBridge ルールを設定します。

{
  "source": ["aws.ec2"],
  "detail-type": ["EC2 Instance State-change Notification"],
  "detail": {
    "state": ["stopped", "running"]
  }
}

Lambda 関数による状態更新

EventBridge からのイベントを受け取り、DynamoDB を更新します。

import boto3
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('InstancePool')

def lambda_handler(event, context):
    """EC2 状態変更イベントを処理して DynamoDB を更新"""
    detail = event.get('detail', {})
    instance_id = detail.get('instance-id')
    state = detail.get('state')

    if not instance_id or not state:
        print(f"Invalid event: missing instance-id or state")
        return {'statusCode': 400}

    if state == 'stopped':
        # tenant_id と job_id をクリアすることで、次のジョブで使用可能にする
        table.update_item(
            Key={'instance_id': instance_id},
            UpdateExpression='SET #status = :available, tenant_id = :null, job_id = :null, updated_at = :time',
            ExpressionAttributeNames={'#status': 'status'},
            ExpressionAttributeValues={
                ':available': 'available',
                ':null': None,
                ':time': datetime.now().isoformat()
            }
        )
        print(f"Instance {instance_id} marked as available")

    elif state == 'running':
        # 実際の状態更新は割り当て時に完了しているため、ここでは何もしない
        print(f"Instance {instance_id} is now running")

これにより、インスタンスが停止すると自動的に available 状態に戻り、次のジョブで使用可能になります。

まとめ

短時間バッチ処理において、リソースプール方式による EC2 インスタンス共有の設計と実装を紹介しました。

実際に構築してみて、既存の構成を大きく崩さず、追加コストも最小限に抑えながら実装できることがわかりました。
なお、インスタンス起動時に「InsufficientInstanceCapacity」エラーが発生した場合は、リージョンや AZ やインスタンスタイプの変更を検討しましょう。

この記事をシェアする

FacebookHatena blogX

関連記事