CloudWatch Logs に取り込んだ syslog を Kinesis Data Firehose + Lambda で動的パーティショニングしつつ S3 に出力してみた

2022.06.10

こんにちは、大前です。

Kinesis Data Firehose(以下 Kinesis Firehose)は動的パーティショニング機能を提供していますが、json 形式のログのみ対応している形になります。


一方で、CloudWatch Logs に出力した OS ログなど、json 形式でないログファイルを Kinesis Firehose を経由して S3 に出力したい場合もあるかと思います。

公式ドキュメントでは、json 形式以外のログファイルについては Lambda 関数を利用してパーティショニングキーを作成することで動的パーティショニングが可能と書かれているため、今回はこれを試してみました。

For compressed or encrypted data records, or data that is in any file format other than JSON, you can use the integrated AWS Lambda function with your own custom code to decompress, decrypt, or transform the records in order to extract and return the data fields needed for partitioning.

参考 : Dynamic Partitioning in Kinesis Data Firehose - Amazon Kinesis Data Firehose

構成

構成としては以下で検証します。EC2 に CloudWatch Agent をインストールし、syslog を CloudWatch Logs に出力します。

やってみた

このブログでは以下の手順のみ記載します。

  • Lambda 関数の作成
  • Kinesis Firehose の作成
  • ログを流してみる

S3 バケットの用意や EC2 から syslog を CloudWatch Logs に出力する為の手順、IAM ロールの作成等は省きますので、必要に応じて下記ブログやドキュメントを参照ください。

CloudWatch Logs サブスクリプションフィルターの使用 - Amazon CloudWatch Logs


Lambda 関数の作成

まず、動的パーティショニングを行う為のキーを作成する Lambda 関数を作成します。

0 からコーディングしても良いですが、Lambda には kinesis-firehose-cloudwatch-logs-processor-python というブループリントが用意されている為、これを改修する事でコーディング量を最小限にできます。


上記ブループリントがどういった機能を提供しているかは、下記ブログを参照ください。


Lambda 関数名を入力し、IAM ロールに関する設定はデフォルトのままで作成を進めます。


上記ブループリントを利用すると、以下のソースコードがデプロイされます。(2022/6 時点)

kinesis-firehose-cloudwatch-logs-processor-python(クリックで展開されます)
# Copyright 2014, Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#  http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

"""
For processing data sent to Firehose by Cloudwatch Logs subscription filters.

Cloudwatch Logs sends to Firehose records that look like this:

{
  "messageType": "DATA_MESSAGE",
  "owner": "123456789012",
  "logGroup": "log_group_name",
  "logStream": "log_stream_name",
  "subscriptionFilters": [
    "subscription_filter_name"
  ],
  "logEvents": [
    {
      "id": "01234567890123456789012345678901234567890123456789012345",
      "timestamp": 1510109208016,
      "message": "log message 1"
    },
    {
      "id": "01234567890123456789012345678901234567890123456789012345",
      "timestamp": 1510109208017,
      "message": "log message 2"
    }
    ...
  ]
}

The data is additionally compressed with GZIP.

NOTE: It is suggested to test the cloudwatch logs processor lambda function in a pre-production environment to ensure
the 6000000 limit meets your requirements. If your data contains a sizable number of records that are classified as
Dropped/ProcessingFailed, then it is suggested to lower the 6000000 limit within the function to a smaller value
(eg: 5000000) in order to confine to the 6MB (6291456 bytes) payload limit imposed by lambda. You can find Lambda
quotas at https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html

The code below will:

1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
   processing error output. Such records do not contain any log events. You can modify the code to set the result to
   Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
   each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
   transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
   this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
   method.
6) Any individual record exceeding 6,000,000 bytes in size after decompression and encoding is marked as
   ProcessingFailed within the function. The original compressed record will be backed up to the S3 bucket
   configured on the Firehose.
7) Any additional records which exceed 6MB will be re-ingested back into Firehose.
8) The retry count for intermittent failures during re-ingestion is set 20 attempts. If you wish to retry fewer number
   of times for intermittent failures you can lower this value.
"""

import base64
import json
import gzip
from io import BytesIO
import boto3


def transformLogEvent(log_event):
    """Transform each log event.

    The default implementation below just extracts the message and appends a newline to it.

    Args:
    log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}

    Returns:
    str: The transformed log event.
    """
    return log_event['message'] + '\n'


def processRecords(records):
    for r in records:
        data = base64.b64decode(r['data'])
        striodata = BytesIO(data)
        with gzip.GzipFile(fileobj=striodata, mode='r') as f:
            data = json.loads(f.read())

        recId = r['recordId']
        """
        CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
        They do not contain actual data.
        """
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': recId
            }
        elif data['messageType'] == 'DATA_MESSAGE':
            joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
            dataBytes = joinedData.encode("utf-8")
            encodedData = base64.b64encode(dataBytes)
            if len(encodedData) <= 6000000:
                yield {
                    'data': encodedData,
                    'result': 'Ok',
                    'recordId': recId
                }
            else:
                yield {
                    'result': 'ProcessingFailed',
                    'recordId': recId
                }
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': recId
            }


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_records(StreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedRecordCount'] > 0:
        for idx, res in enumerate(response['Records']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
            putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def createReingestionRecord(isSas, originalRecord):
    if isSas:
        return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
    else:
        return {'data': base64.b64decode(originalRecord['data'])}


def getReingestionRecord(isSas, reIngestionRecord):
    if isSas:
        return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
    else:
        return {'Data': reIngestionRecord['data']}


def lambda_handler(event, context):
    isSas = 'sourceKinesisStreamArn' in event
    streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
    region = streamARN.split(':')[3]
    streamName = streamARN.split('/')[1]
    records = list(processRecords(event['records']))
    projectedSize = 0
    dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
    putRecordBatches = []
    recordsToReingest = []
    totalRecordsToBeReingested = 0

    for idx, rec in enumerate(records):
        if rec['result'] != 'Ok':
            continue
        projectedSize += len(rec['data']) + len(rec['recordId'])
        # 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
        if projectedSize > 6000000:
            totalRecordsToBeReingested += 1
            recordsToReingest.append(
                getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
            )
            records[idx]['result'] = 'Dropped'
            del(records[idx]['data'])

        # split out the record batches into multiple groups, 500 records at max per group
        if len(recordsToReingest) == 500:
            putRecordBatches.append(recordsToReingest)
            recordsToReingest = []

    if len(recordsToReingest) > 0:
        # add the last batch
        putRecordBatches.append(recordsToReingest)

    # iterate and call putRecordBatch for each group
    recordsReingestedSoFar = 0
    if len(putRecordBatches) > 0:
        client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
        for recordBatch in putRecordBatches:
            if isSas:
                putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            else:
                putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
    else:
        print('No records to be reingested')

    return {"records": records}


デプロイされたコードを一部修正し、動的パーティショニングの為のキーを返却する様にします。

92 行目付近に processRecords という関数があるので、その中で以下の修正を行います。

  • partition_keys を定義
  • レスポンスに 'metadata' を追加し、定義した partition_keys を含める

また、上記に伴い import datetime も追加します。

import base64
import json
import gzip
from io import BytesIO
import boto3
import datetime #追加箇所
elif data['messageType'] == 'DATA_MESSAGE':
    joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
    dataBytes = joinedData.encode("utf-8")
    encodedData = base64.b64encode(dataBytes)

    # 追加部分 --ここから--
    timestamp = data['logEvents'][0]['timestamp']
    event_timestamp = datetime.datetime.fromtimestamp(timestamp/1000)
    partition_keys = {
        "instanceId": data['logStream'],
        "year": event_timestamp.strftime('%Y'),
        "month": event_timestamp.strftime('%m'),
        "date": event_timestamp.strftime('%d'),
        "hour": event_timestamp.strftime('%H')
    }
    # 追加部分 --ここまで--

    # レスポンスに 'metadata' を追加
    if len(encodedData) <= 6000000:
        yield {
            'data': encodedData,
            'result': 'Ok',
            'recordId': recId,
            'metadata': { 'partitionKeys': partition_keys }
        }
    else:


CloudWatch Agent 側のログ出力設定に依存するところではありますが、syslog を CloudWatch Logs に出力する際は大抵ログストリームにインスタンス ID が利用されています。今回はそれを data['logStream'] で抜き出し、パーティションキーとして利用しています。

また、パーティションキーのキーに指定している文字列(今回だと "instanceId", "year", "month", "date", "hour")は Kinesis Firehsoe の設定をする際に利用する為、メモしておきます。


続いて、「設定」よりタイムアウト値をデフォルトから変更します。これは、Kinesis Firehose 側の推奨事項で Lambda 関数のタイムアウト値を 1分以上にすることが推奨されている為です。


今回は 5分としました。


以上で Lambda に関する設定は完了です。


Kinesis Firehose の作成

続いて、Kinesis Firehose の作成を行います。

「Source」は "Direct PUT"、「Destination」は "Amazon S3" とし、任意のストリーム名を指定します。


「Data transformation」を "Enabled" にし、先ほど作成した Lambda 関数を指定します。


ログ出力先となる S3 バケットを指定します。ない場合、新規作成してください。


「Dynamic partitioning」を "Enabled" にします。


「S3 bucket prefix」をエラー時の出力先含めそれぞれ指定します。Lambda を利用して動的パーティショニングを設定する際は、動的に変化するキーを {partitionKeyFromLambda:xxx} という形式で設定する必要があります。xxx には Lambda 関数にて指定しているキーの文字列が入ります。

今回設定した prefix は以下となります。エラー時の出力は動的なキーが正しくセットできなかった場合もある為、動的なキーは含めない形で指定します。

# S3 bucket prefix("111122223333"はアカウントID)
AWSLogs/111122223333/ap-northeast-1/AmazonLinux2/!{partitionKeyFromLambda:instanceId}/!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/!{partitionKeyFromLambda:date}/!{partitionKeyFromLambda:hour}/
# S3 bucket error output prefix
AWSLogs/111122223333/ap-northeast-1/AmazonLinux2/Error/


その他設定はデフォルトのまま、Kinesis Firehose のストリームを作成したら OK です。


ログを流してみる

syslog の出力先である CloudWatch Logs グループにサブスクリプションの設定を行い、先ほど作成した Kinesis Firehose にログが流れる状態になったら、実際にログを流してみます。

また、今回は下記の様な設定で、CloduWatch Agent から /var/log/messages ログを CloudWatch Logs に出力しています。

{
  "agent": {
    "run_as_user": "root"
  },
  "logs":
    {
      "logs_collected":
        {
          "files":
            {
              "collect_list":
                [
                  {
                    "file_path": "/var/log/messages",
                    "log_group_name": "amazonlinux2-syslog-ap-northeast-1-/var/log/messages",
                    "log_stream_name": "{instance_id}",
                    "retention_in_days": 365
                  }
                ]
            }
        }
    }
}


ログ出力を行い少し待つと、指定した通りに出力先の S3 でインスタンス ID のフォルダが作られていることが確認できます。


また、インスタンス ID 配下も 年/月/日/時 でパーティションできています。

おわりに

Lambda を利用して CloudWatch Logs → Kinesis Firehose → S3 の動的パーティショニングをやってみました。json 形式のログファイルであれば Kinesis Firehose の jq パーサー機能が利用できますが、今回の syslog の様に、json 形式以外のログも往々にして存在するかと思います。

Lambda 関数の作成が必要となりますが、用意されているブループリントをうまく利用すれば Lambda 作成のためのコーディング量も減らすことができます。当然 Lambda の管理が発生するため、運用していけるかを含め、Lambda を利用した動的パーティショニングを行うかどうか検討ください。

以上、AWS 事業本部の大前でした。

参考