CloudWatch LogsのログデータをStep Functions + LambdaでS3バケットにエクスポートしてみた

CloudWatch LogsのデータをS3にエクスポートするには、Kinesis Data Firehoseを利用するのが簡易的ではありますが、既になんらかの処理でサブスクリプションフィルタを利用していた場合、追加でサブスクリプションフィルタを設定することができません。 *1

本エントリでは、サブスクリプションフィルタを利用せず、CloudWatch Logsのロググループを、S3バケットにエクスポートしてみたいと思います。

SQS + Lambda等、実装方法は複数考えられますが、ここでは、Step Functions + Lambdaで実装してみたいと思います。

構成

以下のようなイメージです。

00

CloudWatch Eventsにて、日次で特定時刻にステートマシンを呼び出します。ステートマシンが実行されると、指定したS3バケット内に前日分のログデータがエクスポートされます。日付毎にフォルダが作成され、配下にログが集約されていきます。(タスクIDはCloudWatch Logsをエクスポートした際のタスクのIDです。)

設定

構築順とは異なりますが、処理が行われる順に設定や動作等を説明していきたいと思います。

CloudWatch Events

CloudWatch Eventsでルールを作成しています。イベントソースではCron式のスケジュールを設定し、ターゲットにはこの後紹介します、ステートマシンを設定しています。

スケジュールされたイベントはUTCタイムゾーンが使用されますので、上記設定(16:00)の場合は、日本時刻では1:00にステートマシンが起動されます。

Step Functions & Lambda

ステートマシンのワークフローは以下となります。

定義は以下になります。少々長かったので折りたたんでいます。


ステートマシン定義
{
  "StartAt": "Configure",
  "TimeoutSeconds": 82800,
  "States": {
    "Configure": {
      "Type": "Pass",
      "Result": {
        "index": 0
      },
      "ResultPath": "$.iterator",
      "Next": "DescribeLogGroups"
    },
    "DescribeLogGroups": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:DescribeLogGroups",
      "ResultPath": "$.describe_log_groups",
      "Next": "ExportLogGroup"
    },
    "ExportLogGroup": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:ExportLogGroup",
      "ResultPath": "$.iterator",
      "Next": "DescribeExportTask"
    },
    "DescribeExportTask": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:DescribeExportTask",
      "ResultPath": "$.describe_export_task",
      "Next": "IsExportTask"
    },
    "IsExportTask": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.describe_export_task.status_code",
          "StringEquals": "COMPLETED",
          "Next": "IsComplete"
        },
        {
          "Or": [
            {
              "Variable": "$.describe_export_task.status_code",
              "StringEquals": "PENDING"
            },
            {
              "Variable": "$.describe_export_task.status_code",
              "StringEquals": "RUNNING"
            }
          ],
          "Next": "WaitSeconds"
        }
      ],
      "Default": "Fail"
    },
    "WaitSeconds": {
      "Type": "Wait",
      "Seconds": 1,
      "Next": "DescribeExportTask"
    },
    "IsComplete": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.iterator.end_flg",
          "BooleanEquals": true,
          "Next": "Succeed"
        }
      ],
      "Default": "ExportLogGroup"
    },
    "Succeed": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail"
    }
  }
}
  

ここでは、アカウント内に存在するロググループすべてをエクスポートの対象にしたいので、ロググループ数、データ量により、エクスポート時間の増加が考えられます。そして、Lambda関数には実行時間の制限があります。 *2

1回のLambda関数の実行で、すべての処理を行ってしまうと、実行時間の制限に抵触するおそれがあります。 そのため、エクスポートするロググループひとつひとつに対し、都度Lambda関数が起動するよう、ステートマシン側で制御しています。

ループ処理等、処理ロジックをステートマシンで実装しているので、 少々複雑にみえるかもしれませんが、その分、Lambd関数はシンプルになっていると思います。

次は、各ステートと、Lambda関数をみていきたいと思います。

Configureステート

ステートタイプにPassを利用しているので処理自体はなく、パスを定義しています。

後続のステートでは、ロググループ名をリストに格納し、Lambda関数が順次リストの値を読み取りエクスポートを行っていきます。以下、リストのイメージです。

00

この値(パス)はその際のインデックスとして使用していきます。このステートが実行されると、ステートマシンには以下の値が返ります。

"iterator": {
  "index": 0
}

この値を後続のステートで増分さて、処理対象を変化させています。

DescribeLogGroupsステート

ロググループ名を取得するLambda関数を呼び出しています。取得したロググループ名をリストに格納し、そのリストと要素数をステートマシンに返しています。

Lambda関数のコードは以下となります。

DescribeLogGroups関数

import boto3
import jmespath
import logging

logs_client = boto3.client('logs')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    #CloudWatch Logsロググループ名取得(ロググループ名をリストに格納)
    response = logs_client.describe_log_groups()
    log_groups = jmespath.search('logGroups[].logGroupName',response)

    logger.info('Log list : '.join(log_groups))

    return {
        'element_num':len(log_groups),
        'log_groups':log_groups
    }

このステートが実行されると、ステートマシンには以下のような値が返ります。(値は環境により異なります)

"describe_log_groups": {
  "element_num": 4,
  "log_groups": [
    "/aws/lambda/DescribeExportTask",
    "/aws/lambda/DescribeLogGroups",
    "/aws/lambda/ExportLogGroup",
    "/aws/lambda/TestFunc",
  ]
}

ExportLogGroupステート

エクスポート処理を行うLamda関数を呼び出しています。前述の通り、1回の実行でエクスポートを行うのは、1つのロググループです。ロググループが複数ある場合は、後続のステートからこのステートに処理が戻ってきます。

Lambda関数のコードは以下となります。

ExportLogGroup関数

import os
import boto3
import datetime
import logging

logs_client = boto3.client('logs')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    export_bucket = os.environ['EXPORT_BUCKET']
    index = event['iterator']['index']
    count = event['describe_log_groups']['element_num']
    target_log_group = event['describe_log_groups']['log_groups']

    today = datetime.date.today()                    #実行日取得(例:2019-09-12)
    yesterday = today - datetime.timedelta(days=1)   #前日取得(例:2019-09-11)
    #出力日時(from)取得(例:2019-09-11 00:00:00)
    from_time = datetime.datetime(year=today.year, month=today.month, day=yesterday.day, hour=0, minute=0,second=0)
    #出力日時(to)取得(例:2019-09-11 23:59:59.999999)
    to_time = datetime.datetime(year=today.year, month=today.month, day=yesterday.day, hour=23, minute=59,second=59,microsecond=999999)

    #エポック時刻取得(float型)
    epoc_from_time = from_time.timestamp()
    epoc_to_time = to_time.timestamp()
    #エポック時刻をミリ秒にしint型にキャスト(create_export_taskメソッドにintで渡すため)
    m_epoc_from_time = int(epoc_from_time * 1000)
    m_epoc_epoc_to_time = int(epoc_to_time * 1000)

    #CloudWatch Logsエクスポート
    response = logs_client.create_export_task(
        logGroupName = target_log_group[index],
        fromTime = m_epoc_from_time,
        to = m_epoc_epoc_to_time,
        destination = export_bucket,
        destinationPrefix = yesterday.strftime('%Y%m%d')
    )

    logger.info('Target log group : ' + target_log_group[index])
    logger.info('Task ID : ' + response['taskId'])

    index += 1

    return {
        'index':index,
        'end_flg':count == index,
        'task_id':response['taskId']
    }

このステートが実行されると、ステートマシンには以下のような値が返ります。(値は実行したタイミングにより異なります)

"iterator": {
  "index": 1,
  "end_flg": false,
  "task_id": "765f9bfb-e65a-42ff-a2fb-5a38d52ee03b"
}

リストに格納されたロググループすべてをエクスポートし終えると、 処理完了フラグ(end_flg)を設定し、後続のフローが変化するようになります。

DescribeExportTaskステート

実行したエクスポートのステータスを取得するLamda関数を呼び出しています。 CloudWatch Logsのエクスポートには制限があり、アカウントごとに、一度に1つのアクティブ(実行中または保留中)のエクスポートしか実施できません。

そのため、前段で実施したエクスポートが完了しているか確認したいので、ここではエクスポートタスクのステータスを取得しています。

DescribeExportTask関数

import boto3
import logging

logs_client = boto3.client('logs')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    task_id = event['iterator']['task_id']

    #エクスポートタスク ステータス取得
    response = logs_client.describe_export_tasks(
        taskId=task_id,
    )
    status_code = response['exportTasks'][0]['status']['code']

    logger.info('Task ID : ' + task_id)
    logger.info('Status code : ' + status_code)

    return {
        'status_code': status_code
    }

このステートが実行されると、ステートマシンには以下のような値が返ります。(値は実行したタイミングにより異なります)

"describe_export_task": {
  "status_code": "RUNNING"
}

IsExportTaskステート

ステートタイプChoiceを利用し、エクスポートタスクのステータス値を確認しています。

ステータスが"COMPLETED"の場合、エクスポートは完了しているため、すべてのロググループに対しエクスポートを終えているのか確認するフローに移ります。

ステータスが"PENDING"または"RUNNING"の場合、エクスポートは未完なので、再度ステータス取得のフローに移ります。

それ以外のステータスが返ってきた場合は、ステートマシンを失敗で終了するフローに移ります。

WaitSecondsステート

ステートタイプWaitを利用し、指定した時間待機させます。 このステートは、エクスポートが未完の場合に呼び出しされるので、再度エクスポートタスクのステータスを確認する必要があります。

間髪を入れずに処理を戻した場合、エクスポート実行中であることが想定されます。また、その分だけステートマシンの状態遷移が発生するので、利用料金も気になってきます。そのため、エクスポート完了までの一定時間(ここでは1秒)処理を待機させています。

IsCompleteステート

ステートタイプChoiceを利用し、すべてのロググループに対しエクスポートを終えているのか確認しています。ロググループすべてをエクスポートし終えていると、"end_flg"が"true"になりますので、その値を確認しています。"true"でなければ、エクスポート対象のロググループがまだ存在しているので、再度エクスポート処理を行うステートに戻ります。

実行結果確認

CloudWatch Eventsルールで実行された、9/18 1:00(JST)の実行結果をみてみたいと思います。

UTCでは、9/17 16:00が実行日になるので、その前日(9/16)がエクスポートの対象になります。出力対象のバケットを確認すると、前日日付のフォルダが作成されています。

日付フォルダ配下には、タスクID毎にフォルダが作成されています。

タスクIDだけではどのロググループのデータか判別できないので、ExportLogGroup関数のログを参照してください。タスクIDと紐づくようにロググループ名がロギングされています。 例えば、タスクID(フォルダ名)が「765f9bfb-e65a-42ff-a2fb-5a38d52ee03b」の場合、ロググループ名は「/aws/lambda/DescribeLogGroups」となります。

タスクIDフォルダ配下には、該当期間のログストリームが保管されています。

該当のロググループを確認すると、9/16(UTC)のデータが出力されていることがわかります。

最後に

CloudWatch Logsをエクスポートする構成の一例をご紹介しました。Kinesis Data Firehoseを利用できない場合等に、参考にしていただければ幸いです。

本環境をサクッとデプロイできるよう、Serverless Frameworkを用意したので、検証する方はご利用ください。

参考

脚注