Amplify HostingのアクセスログをS3とCloudWatch Logsに定期エクスポートしてみる

Amplify HostingのアクセスログをS3とCloudWatch Logsに定期エクスポートしてみる

2025.10.02

はじめに

かつまたです。
Amplify HostingでアプリをデプロイするとマネージドなCloudFrontディストリビューションが自動的に作成され、アクセスログが取得可能になります。
このアクセスログは最大2週間分のCSV形式でのエクスポートが可能となっています。一方でS3やCloudWatch Logsへのログ配信は現状機能としてはサポートされていません。

そこで今回はGenerateAccessLogs APIを利用した、S3やCloudWatch Logsへの定期アクセスログ配信を行うカスタムソリューションについて紹介します。

z4evOXtkxaGgw95h0t1Nz.md.drawio.png

https://docs.aws.amazon.com/ja_jp/amplify/latest/userguide/deploy-website-from-s3.html

やってみる

ソリューションとしては、GenerateAccessLogs APIを利用したLambda関数をEventBridgeにより定期的に呼び出すことでログ配信を実現します。

  1. 前提条件
  • ログ配信先のS3バケットおよび、CloudWatch Logsが作成済み

Lambda実行ロール

  • AWSLambdaBasicExecutionRole
  • Amplify・S3用カスタムポリシー
			
			{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmplifyAccessLogsPermissions",
            "Effect": "Allow",
            "Action": [
                "amplify:GenerateAccessLogs"
            ],
            "Resource": "arn:aws:amplify:*:*:apps/*"
        },
        {
            "Sid": "S3PutObjectPermissions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::ログ配信先バケット名/amplify-access-logs/*"
        }
    ]
}

		
  1. Lambda関数を作成します。
  • ランタイム:Python3.12
  • 実行ロール:先ほど作成したロール
    スクリーンショット 2025-10-01 17.21.09.png
  1. 以下コードをデプロイします。

コード概要

  • generate_access_logsでAmplifyのアクセスログを取得するためのpresigned URLを生成
  • urllib.request.urlopenでpresigned URLからログファイルをダウンロード
  • put_objectおよびput_log_eventsでS3とCloudWatch Logsにログエクスポート
Python例
			
			import json
import logging
import os
import sys
import urllib.request
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from urllib.error import URLError, HTTPError

import boto3
from botocore.exceptions import ClientError, BotoCoreError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

if not logger.handlers:
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(
        logging.Formatter(
            "%(asctime)s %(levelname)s %(name)s - %(message)s"
        )
    )
    logger.addHandler(handler)

class ConfigurationError(Exception):
    """設定エラー"""
    pass

class AmplifyAPIError(Exception):
    """Amplify API呼び出しエラー"""
    pass

class LogDownloadError(Exception):
    """ログダウンロードエラー"""
    pass

class CloudWatchExportError(Exception):
    """CloudWatch Logs エクスポートエラー"""
    pass

class S3ExportError(Exception):
    """S3 エクスポートエラー"""
    pass

# 定数定義
DEFAULT_LOG_GROUP = '/aws/amplify/access-logs'
DEFAULT_TIME_RANGE_HOURS = 24
MAX_LOG_EVENTS_PER_BATCH = 10000
URLLIB_TIMEOUT_SECONDS = 30

class Config:

    def __init__(self):
        self.amplify_app_id = self._get_required_env('AMPLIFY_APP_ID')
        self.amplify_domain_name = self._get_required_env('AMPLIFY_DOMAIN_NAME')
        self.s3_bucket_name = os.environ.get('S3_BUCKET_NAME', '')
        self.cloudwatch_log_group = os.environ.get(
            'CLOUDWATCH_LOG_GROUP',
            DEFAULT_LOG_GROUP
        )
        self.time_range_hours = int(
            os.environ.get('TIME_RANGE_HOURS', str(DEFAULT_TIME_RANGE_HOURS))
        )

    @staticmethod
    def _get_required_env(key: str) -> str:
        """必須環境変数を取得"""
        value = os.environ.get(key)
        if not value:
            raise ConfigurationError(f"必須の環境変数が設定されていません: {key}")
        return value

amplify_client = boto3.client('amplify')
s3_client = boto3.client('s3')
logs_client = boto3.client('logs')


def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Lambda関数のエントリーポイント
    Amplify のアクセスログを取得して CloudWatch Logs および S3 にエクスポート

    Args:
        event: Lambda イベント
        context: Lambda コンテキスト

    Returns:
        ステータスコードとレスポンスボディを含む辞書

    Raises:
        ConfigurationError: 設定エラー時
        AmplifyAPIError: Amplify API エラー時
    """
    try:
        config = Config()

        logger.info(
            "アクセスログのエクスポートを開始 - AppID: %s, Domain: %s",
            config.amplify_app_id,
            config.amplify_domain_name
        )

        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(hours=config.time_range_hours)

        logger.info(
            "時間範囲: %s から %s (UTC)",
            start_time.isoformat(),
            end_time.isoformat()
        )

        # Amplify からログを取得
        log_data = fetch_amplify_logs(
            config.amplify_app_id,
            config.amplify_domain_name,
            start_time,
            end_time
        )

        if not log_data:
            logger.info("指定された時間範囲にログが存在しません")
            return create_response(200, 'No logs available')

        logger.info("ログを取得しました: %d バイト", len(log_data))

        # CloudWatch Logs にエクスポート
        export_to_cloudwatch(log_data, start_time, config.cloudwatch_log_group)

        if config.s3_bucket_name:
            export_to_s3(log_data, start_time, config.s3_bucket_name)
        else:
            logger.info("S3バケット名が未設定のため、S3へのエクスポートをスキップ")

        logger.info("アクセスログのエクスポートが正常に完了しました")
        return create_response(200, 'Logs exported successfully')

    except ConfigurationError as e:
        logger.error("設定エラー: %s", e)
        return create_response(400, f'Configuration error: {str(e)}')

    except AmplifyAPIError as e:
        logger.error("Amplify API エラー: %s", e)
        return create_response(500, f'Amplify API error: {str(e)}')

    except (CloudWatchExportError, S3ExportError) as e:
        logger.error("エクスポートエラー: %s", e)
        return create_response(500, f'Export error: {str(e)}')

    except Exception as e:
        logger.exception("予期しないエラーが発生: %s", e)
        return create_response(500, f'Unexpected error: {str(e)}')


def fetch_amplify_logs(
    app_id: str,
    domain_name: str,
    start_time: datetime,
    end_time: datetime
) -> Optional[str]:
    """
    Amplify の GenerateAccessLogs API を呼び出してログを取得

    Args:
        app_id: Amplify アプリ ID
        domain_name: ドメイン名
        start_time: 開始時刻(UTC)
        end_time: 終了時刻(UTC)

    Returns:
        ログデータ(文字列)または None

    Raises:
        AmplifyAPIError: API 呼び出しエラー時
        LogDownloadError: ログダウンロードエラー時
    """
    try:
        start_timestamp = int(start_time.timestamp())
        end_timestamp = int(end_time.timestamp())

        logger.info("Amplify GenerateAccessLogs API を呼び出し中...")
        response = amplify_client.generate_access_logs(
            appId=app_id,
            domainName=domain_name,
            startTime=start_timestamp,
            endTime=end_timestamp
        )

        presigned_url = response.get('logUrl')
        if not presigned_url:
            logger.warning("presigned URL が取得できませんでした")
            return None

        logger.debug("presigned URL を取得: %s...", presigned_url[:100])

        return download_log_from_url(presigned_url)

    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        error_message = e.response.get('Error', {}).get('Message', str(e))
        logger.error(
            "Amplify API エラー - Code: %s, Message: %s",
            error_code,
            error_message
        )
        raise AmplifyAPIError(
            f"Amplify API 呼び出しに失敗しました: {error_code} - {error_message}"
        ) from e

    except BotoCoreError as e:
        logger.error("boto3 エラー: %s", e)
        raise AmplifyAPIError(f"AWS SDK エラー: {str(e)}") from e


def download_log_from_url(url: str) -> str:
    """
    presigned URL からログをダウンロード

    Args:
        url: presigned URL

    Returns:
        ログデータ(文字列)

    Raises:
        LogDownloadError: ダウンロードエラー時
    """
    try:
        logger.info("ログをダウンロード中...")
        with urllib.request.urlopen(url, timeout=URLLIB_TIMEOUT_SECONDS) as response:
            log_data = response.read().decode('utf-8')
        return log_data

    except HTTPError as e:
        logger.error("HTTP エラー: %s - %s", e.code, e.reason)
        raise LogDownloadError(
            f"ログのダウンロードに失敗しました(HTTP {e.code}): {e.reason}"
        ) from e

    except URLError as e:
        logger.error("URL エラー: %s", e.reason)
        raise LogDownloadError(
            f"ログのダウンロードに失敗しました: {e.reason}"
        ) from e

    except UnicodeDecodeError as e:
        logger.error("デコードエラー: %s", e)
        raise LogDownloadError(
            "ログデータのデコードに失敗しました"
        ) from e


def export_to_cloudwatch(
    log_data: str,
    start_time: datetime,
    log_group: str
) -> None:
    """
    ログデータを CloudWatch Logs にエクスポート

    Args:
        log_data: ログデータ
        start_time: 開始時刻
        log_group: ロググループ名

    Raises:
        CloudWatchExportError: CloudWatch Logs エクスポートエラー時
    """
    try:
        log_stream_name = start_time.strftime('%Y/%m/%d/%H%M%S')

        logger.info("CloudWatch Logs にエクスポート中 - Stream: %s", log_stream_name)

        # ログストリームを作成
        create_log_stream_if_not_exists(log_group, log_stream_name)

        log_events = create_log_events(log_data, start_time)

        if not log_events:
            logger.warning("送信するログイベントがありません")
            return

        send_log_events_in_batches(log_group, log_stream_name, log_events)

        logger.info(
            "CloudWatch Logs へのエクスポートが完了: %s/%s",
            log_group,
            log_stream_name
        )

    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        error_message = e.response.get('Error', {}).get('Message', str(e))
        logger.error(
            "CloudWatch Logs API エラー - Code: %s, Message: %s",
            error_code,
            error_message
        )
        raise CloudWatchExportError(
            f"CloudWatch Logs へのエクスポートに失敗: {error_code} - {error_message}"
        ) from e

    except Exception as e:
        logger.exception("CloudWatch Logs エクスポート中の予期しないエラー: %s", e)
        raise CloudWatchExportError(
            f"CloudWatch Logs へのエクスポートに失敗: {str(e)}"
        ) from e


def create_log_stream_if_not_exists(log_group: str, log_stream: str) -> None:
    """ログストリームが存在しない場合は作成"""
    try:
        logs_client.create_log_stream(
            logGroupName=log_group,
            logStreamName=log_stream
        )
        logger.info("ログストリームを作成: %s", log_stream)
    except logs_client.exceptions.ResourceAlreadyExistsException:
        logger.debug("ログストリームは既に存在します: %s", log_stream)


def create_log_events(log_data: str, start_time: datetime) -> list:
    """ログデータからログイベントのリストを作成"""
    log_events = []
    timestamp = int(start_time.timestamp() * 1000) 

    for line in log_data.splitlines():
        if line.strip(): 
            log_events.append({
                'timestamp': timestamp,
                'message': line
            })
            timestamp += 1 

    logger.info("ログイベントを作成: %d 件", len(log_events))
    return log_events


def send_log_events_in_batches(
    log_group: str,
    log_stream: str,
    log_events: list
) -> None:
    """ログイベントをバッチで送信(最大10,000イベント/バッチ)"""
    total_batches = (len(log_events) + MAX_LOG_EVENTS_PER_BATCH - 1) // MAX_LOG_EVENTS_PER_BATCH

    for i in range(0, len(log_events), MAX_LOG_EVENTS_PER_BATCH):
        batch = log_events[i:i + MAX_LOG_EVENTS_PER_BATCH]
        batch_number = (i // MAX_LOG_EVENTS_PER_BATCH) + 1

        logs_client.put_log_events(
            logGroupName=log_group,
            logStreamName=log_stream,
            logEvents=batch
        )

        logger.info(
            "バッチ %d/%d 送信完了: %d イベント",
            batch_number,
            total_batches,
            len(batch)
        )


def export_to_s3(
    log_data: str,
    start_time: datetime,
    bucket_name: str
) -> None:
    """
    ログデータを S3 にエクスポート

    Args:
        log_data: ログデータ
        start_time: 開始時刻
        bucket_name: S3バケット名

    Raises:
        S3ExportError: S3 エクスポートエラー時
    """
    try:
        s3_key = (
            f"amplify-access-logs/"
            f"{start_time.strftime('%Y/%m/%d/%H')}/"
            f"access-logs-{start_time.strftime('%Y%m%d-%H%M%S')}.log"
        )

        logger.info("S3 にエクスポート中 - Bucket: %s, Key: %s", bucket_name, s3_key)

        s3_client.put_object(
            Bucket=bucket_name,
            Key=s3_key,
            Body=log_data.encode('utf-8'),
            ContentType='text/plain',
            ServerSideEncryption='AES256'
        )

        logger.info("S3 へのエクスポートが完了: s3://%s/%s", bucket_name, s3_key)

    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        error_message = e.response.get('Error', {}).get('Message', str(e))
        logger.error(
            "S3 API エラー - Code: %s, Message: %s",
            error_code,
            error_message
        )
        raise S3ExportError(
            f"S3 へのエクスポートに失敗: {error_code} - {error_message}"
        ) from e

    except Exception as e:
        logger.exception("S3 エクスポート中の予期しないエラー: %s", e)
        raise S3ExportError(
            f"S3 へのエクスポートに失敗: {str(e)}"
        ) from e


def create_response(status_code: int, message: str) -> Dict[str, Any]:
    """
    Lambda レスポンスを生成

    Args:
        status_code: HTTP ステータスコード
        message: メッセージ

    Returns:
        レスポンス辞書
    """
    return {
        'statusCode': status_code,
        'body': json.dumps({
            'message': message
        })
    }

		
  1. 以下の環境変数を設定します。
  • Amplifyアプリ名
  • Amplifyドメイン名
  • ログ配信先S3バケット名
  • ログ配信先CloudWatch Logs名

オプションとして、ログ取得時間範囲であるTIME_RANGE_HOURSも設定可能です。

スクリーンショット 2025-10-01 16.50.45-1.png

  1. EventBridgeを設定します。今回はLambdaのトリガー設定から新規作成し、トリガーとして設定しました。任意の起動スケジュールを設定します。
    スクリーンショット 2025-10-01 16.34.10.png

  2. EventBridgeでのスケジュール実行、またはテストイベントによりを確かめてみます。

  3. S3、CloudWatch Logsともにアクセスログがエクスポートされていることを確認できました。
    スクリーンショット 2025-10-02 10.36.50.png

スクリーンショット 2025-10-02 10.43.41.png

おわりに

ご覧いただきありがとうございました。
Amplify Hostingのアクセスログを定期的にS3やCloudWatch Logsへエクスポートするカスタムソリューションを紹介しました。

GenerateAccessLogs APIとLambda、EventBridgeを組み合わせることで、Amplify Hostingでネイティブにサポートされていないログの長期保存が実現できます。

Amplify Hostingでログの長期保存などが必要になった際の参考になれば幸いです。

参考資料

https://docs.aws.amazon.com/ja_jp/amplify/latest/userguide/deploy-website-from-s3.html
https://docs.aws.amazon.com/ja_jp/amplify/latest/APIReference/API_GenerateAccessLogs.html

アノテーション株式会社について

アノテーション株式会社はクラスメソッドグループのオペレーション専門特化企業です。サポート・運用・開発保守・情シス・バックオフィスの専門チームが、最新 IT テクノロジー、高い技術力、蓄積されたノウハウをフル活用し、お客様の課題解決を行っています。当社は様々な職種でメンバーを募集しています。「オペレーション・エクセレンス」と「らしく働く、らしく生きる」を共に実現するカルチャー・しくみ・働き方にご興味がある方は、アノテーション株式会社 採用サイトをぜひご覧ください。

この記事をシェアする

FacebookHatena blogX

関連記事