VPCフローログをAmazon Data Firehose経由でS3 Tablesに書き込んでみた

VPCフローログをAmazon Data Firehose経由でS3 Tablesに書き込んでみた

VPCフローログをAmazon Data Firehose経由でS3 Tablesに書き込んでみた
Clock Icon2025.03.24

お疲れさまです。とーちです。

今回は、VPCフローログをAmazon Data Firehose(以下、Firehose)経由でAmazon S3 Tablesに書き込む方法について解説したいと思います。

VPCフローログとS3 Tablesの組み合わせ

VPCフローログ(VPC Flow Logs)は、AWSのネットワークモニタリングサービスで、Amazon Virtual Private Cloud(VPC)内のネットワークトラフィックに関する情報を収集・記録するための機能です。今回はこのVPCフローログをFirehoseを使ってS3 Tablesに書き込んでみます。

通常のS3バケットであればVPCフローログから直接出力先として指定できるのですが、S3 Tablesへの出力は2025/3現在サポートされていません。そのためFirehoseを経由して書き込む必要があります。

S3 Tablesにログを置くメリットとしては、S3 Tablesの特徴がそのまま当てはまると思います。すなわち

  • 高いクエリパフォーマンス: S3 Tablesの自動圧縮機能により、VPCフローログのクエリが最大3倍高速化されます。これにより、大量のネットワークトラフィックログを分析する際のレスポンス時間が短縮されます。
  • 運用工数の削減: テーブルから参照されていない孤立ファイルの削除や上記の自動圧縮などのメンテナンス作業を自動で行ってくれます。
  • 高度な分析機能: タイムトラベル(過去の特定時点のデータ状態を参照)やスキーマ進化(列の追加削除等によりスキーマが変化しても、既存データへのアクセスを維持しながら新しいデータ形式に対応できる機能)など、Icebergテーブル形式の特性を活かした高度な分析が可能になります。

VPCフローログのような長期間保存、また保存した大量のログに対して分析をかけたいというケースにはS3 Tablesはマッチするのではないかと思います。

S3 Tablesのメリットについては以下の記事もご参照ください。

https://dev.classmethod.jp/articles/reinvent2024-amazon-s3-tables/

https://dev.classmethod.jp/articles/re-growth-sap-2024-analytics-updates/

https://dev.classmethod.jp/articles/regrowth-osaka-2024-iceberg-festival/

それではVPCフローログをS3 Tablesに出力してみましょう。

前提条件

なお、S3 TablesのテーブルバケットやFirehoseでS3 Tablesに書き込むためのIAMロールの作成、AWS Lake Formationを使った権限付与などは既に完了しているものとします。もしこれらがまだ実施されていない場合は以下の記事を参考に実施してください。

生のVPCフローログをS3 Tablesに出力する

まずはVPCフローログをそのままの形で、Firehose経由でS3 Tablesに出力してみましょう。

S3 Tables用のテーブル作成

Firehoseから起動できるデータ変換用のAWS Lambda等を使ってデバッグしてみると分かるのですが、FirehoseがVPCフローログから受け取るデータは以下のような形式になっています(標準のVPCフローログ形式の場合)。

{
    "invocationId": "a123bcde-f456-7890-g123-4567hijklmno",
    "deliveryStreamArn": "arn:aws:firehose:us-west-2:123456789012:deliverystream/SAMPLE-STREAM",
    "region": "us-west-2",
    "records": [
      {
        "recordId": "shardId-00000000000000000000000000000000000000000000000000000000000000000000000000000000000012345678901234567890123456789012345678901234567890000000000000",
        "approximateArrivalTimestamp": 1642776378962,
        "data": "eyJtZXNzYWdlIjoiMiAxMjM0NTY3ODkwMTIgZW5pLWFiY2RlZjEyMzQ1Njc4OTAgMTAuMC4xLjEwMCAxMC4wLjIuMjAwIDEyMzQ1IDU0MzIxIDYgMyAyNTYgMTY0Mjc3NjI5OCAxNjQyNzc2Mjk4IEFDQ0VQVCBPSyJ9Cg=="
      }
    ]
  }

records配列の中の一つ一つの要素の"data" の値が、VPCフローログが出力する1行のデータになります。"data"の値の部分はbase64でエンコードされているので、デコードすると以下のような形式になっています。

{"message":"2 123456789012 eni-abcdef1234567890 10.0.1.100 10.0.2.200 12345 54321 6 3 256 1642776298 1642776298 ACCEPT OK"}

S3 Tablesにデータを格納するために上記の形式に対応した列とデータ型のテーブルを用意する必要があるので、以下のようなテーブルを作成します。

CREATE TABLE `my_s3_namespace`.`vpc_flow_logs_a` (
message string 
)
TBLPROPERTIES ('table_type' = 'iceberg')

Firehoseストリームの作成

この状態でFirehoseストリームを作成します。

ソースはVPCフローログの場合、Direct PUTになります。

Firehoseソース設定

送信先の設定では"一意のキー設定"を使って、宛先Database名(リソースリンク)と上記で作ったテーブル名を指定します。

Firehose送信先設定

サービスアクセス設定として、上記のS3 Tablesに書き込みができるIAMロールを指定します。IAMロールにつけるIAMポリシーは以下の記事をご参照ください。

S3 テーブルを宛先として使用するための Firehose のロールを作成する

VPCフローログの作成

最後にFirehoseへ出力するよう設定したVPCフローログを作成します。

VPCフローログ設定

結果の確認

ここまで設定できたら、S3 TablesにVPCフローログが流れているはずなので確認してみましょう。

S3 Tablesのデータ確認

ちゃんとVPCフローログが出力されていますね。しかし、messageという一つの列にENIや宛先IPアドレスやポートなどの情報がまとめて入ってしまっているので、このままでは分析も何もあったものではありません。

Lambdaでデータ変換してS3 Tablesに出力する

そこでFirehoseでレコード変換機能を使用します。レコード変換機能はFirehoseの宛先に出力する前に指定したLambdaを使ってデータを変換できる機能です。これを使って列を分割して、分析しやすい形に変更します。

変換用Lambda関数の作成

そのためのLambdaが以下です。

import base64
import json
import logging
from datetime import datetime, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# VPCフローログの列名定義
COLUMN_NAMES = [
    "version", "account_id", "interface_id", "srcaddr", "dstaddr",
    "srcport", "dstport", "protocol", "packets", "bytes",
    "start", "end", "action", "log_status"
]

def lambda_handler(event, context):
    output = []
    
    logger.info(f"Processing {len(event['records'])} records")
    
    for record in event['records']:
        try:
            # Base64でエンコードされたデータをデコード
            payload = base64.b64decode(record['data']).decode('utf-8').strip()
            
            # JSONとして解析
            json_data = json.loads(payload)
            
            # messageフィールドからVPCフローログを抽出
            if 'message' in json_data:
                vpc_flow_log = json_data['message'].strip()
                
                # 空白で分割してフィールドを抽出
                fields = vpc_flow_log.split(' ')
                
                # 辞書を作成
                result = {}
                for i, field in enumerate(fields):
                    if i < len(COLUMN_NAMES):
                        column_name = COLUMN_NAMES[i]
                        # ハイフンは null として扱う
                        if field == '-':
                            result[column_name] = None
                        else:
                            # 数値フィールドを適切な型に変換
                            if column_name in ['version', 'srcport', 'dstport', 'protocol', 'packets', 'bytes', 'start', 'end']:
                                try:
                                    if field.isdigit() or (field.startswith('-') and field[1:].isdigit()):
                                        result[column_name] = int(field)
                                    else:
                                        result[column_name] = None
                                except ValueError:
                                    result[column_name] = None
                            else:
                                result[column_name] = field
                    else:
                        # 予期しないフィールドがある場合
                        result[f"extra_{i}"] = field
                
                # タイムスタンプ情報から日付フィールドを生成
                if 'start' in result and result['start'] is not None:
                    start_timestamp = result['start']
                    
                    # Unix時間(秒)をdatetime型に変換
                    start_date = datetime.fromtimestamp(start_timestamp, tz=timezone.utc)
                    
                    # ISO形式の日付文字列を生成 (YYYY-MM-DD)
                    result['log_date'] = start_date.strftime('%Y-%m-%d')
                    
                    # 年、月、日のフィールドも追加
                    result['log_year'] = start_date.year
                    result['log_month'] = start_date.month
                    result['log_day'] = start_date.day
                
                # 結果をJSON文字列に変換してBase64エンコード
                data_json = json.dumps(result) + '\n'
                encoded_data = base64.b64encode(data_json.encode('utf-8')).decode('utf-8')
                
                output_record = {
                    'recordId': record['recordId'],
                    'result': 'Ok',
                    'data': encoded_data
                }
            else:
                logger.warning("No 'message' field found in the record")
                output_record = {
                    'recordId': record['recordId'],
                    'result': 'ProcessingFailed',
                    'data': record['data']
                }
        except Exception as e:
            # エラーが発生した場合はログに記録し、元のデータを返す
            logger.error(f"Error processing record: {str(e)}")
            output_record = {
                'recordId': record['recordId'],
                'result': 'ProcessingFailed',
                'data': record['data']
            }
        
        output.append(output_record)
    
    logger.info(f"Successfully processed {len(output)} records")
    return {'records': output}

このLambdaはFirehoseからVPCフローログが出力したレコードを受け取り、処理するためのものです。

再掲となりますが、Firehoseからは以下のような形でデータを受け取ります。

{
    "invocationId": "a123bcde-f456-7890-g123-4567hijklmno",
    "deliveryStreamArn": "arn:aws:firehose:us-west-2:123456789012:deliverystream/SAMPLE-STREAM",
    "region": "us-west-2",
    "records": [
      {
        "recordId": "shardId-00000000000000000000000000000000000000000000000000000000000000000000000000000000000012345678901234567890123456789012345678901234567890000000000000",
        "approximateArrivalTimestamp": 1642776378962,
        "data": "eyJtZXNzYWdlIjoiMiAxMjM0NTY3ODkwMTIgZW5pLWFiY2RlZjEyMzQ1Njc4OTAgMTAuMC4xLjEwMCAxMC4wLjIuMjAwIDEyMzQ1IDU0MzIxIDYgMyAyNTYgMTY0Mjc3NjI5OCAxNjQyNzc2Mjk4IEFDQ0VQVCBPSyJ9Cg=="
      }
    ]
  }

recordsのところが配列になっていて、いくつかのレコード(VPCフローログの1行)が入った状態でデータを受け取るので、 for record in event['records']: で1レコードずつ処理する作りになっています。

そして1レコードずつ、デコード、空白区切りでの列の分割、適切な型への変換を行います。

また、VPCフローログのような1日に大量のレコードが出力されるデータの場合、月などの日付データを使ったテーブルのパーティショニングが有効なので、start (集約間隔内にフローの最初のパケットが受信された時間)という列から、date型を持つ列(log_date)も生成しておきます。

こうして作成されたJSON形式のデータを再度base64でエンコードし、結果として出力します。

Lambdaに付与するIAMロールのポリシーは特別なものは不要で、AWSLambdaBasicExecutionRole をつけておけばOKです。

そのほかLambdaの設定はほとんどデフォルトで良いのですが、タイムアウト設定だけは1分以上にしておきましょう(Firehoseで設定する際に1分以上にすることと注意書きが出ます)。

Lambda関数のタイムアウト設定

変換後データ用のテーブル作成

続いてデータが格納されるS3 Tablesのテーブルを作成します。Amazon Athenaから以下のSQLで作成しましょう。

CREATE TABLE `my_s3_namespace`.`vpc_flow_logs` (
  version int,
  account_id string,
  interface_id string,
  srcaddr string,
  dstaddr string,
  srcport int,
  dstport int,
  protocol bigint,
  packets bigint,
  bytes bigint,
  start bigint,
  `end` bigint,
  action string,
  log_status string,
  vpc_id string,
  subnet_id string,
  instance_id string,
  tcp_flags int,
  type string,
  pkt_srcaddr string,
  pkt_dstaddr string,
  region string,
  az_id string,
  sublocation_type string,
  sublocation_id string,
  pkt_src_aws_service string,
  pkt_dst_aws_service string,
  flow_direction string,
  traffic_path int,
  log_date date
)
PARTITIONED BY (month(`log_date`))
TBLPROPERTIES ('table_type' = 'iceberg')

PARTITIONED BY (month(log_date)) により、log_date 列の「月」の値に基づいて物理的に分割(パーティション化)しています。これにより、特定の月のデータだけを効率的にクエリできるようになります。

Firehose用IAMロールの更新

またFirehoseが使用するIAMロールにも変更が必要です。以下のようにLambda関数を実行する権限を追加してください(リージョン(us-east-1)とアカウントID(123456789012)は適宜修正してください)。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3TableAccessViaGlueFederation",
            "Effect": "Allow",
            "Action": [
                "glue:GetTable",
                "glue:GetDatabase",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:catalog/s3tablescatalog/*",
                "arn:aws:glue:us-east-1:123456789012:catalog/s3tablescatalog",
                "arn:aws:glue:us-east-1:123456789012:catalog",
                "arn:aws:glue:us-east-1:123456789012:database/*",
                "arn:aws:glue:us-east-1:123456789012:table/*/*"
            ]
        },
        {
            "Sid": "S3DeliveryErrorBucketPermission",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*",
                "arn:aws:s3:::*/*"
            ]
        },
        {
            "Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation",
            "Effect": "Allow",
            "Action": [
                "lakeformation:GetDataAccess"
            ],
            "Resource": "*"
        },
        {
            "Sid": "LoggingInCloudWatch",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:us-east-1:123456789012:log-group:*:log-stream:log-*"
            ]
        },
        {
            "Sid": "RequiredWhenAttachingLambdaToFirehose",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:123456789012:function:*"
            ]
        }
    ]
}

Lambda変換付きFirehoseストリームの作成

テーブルまで作成できたらFirehoseでストリームを作成します。先ほどとほとんど設定は同じなので異なる部分だけ記載します。

レコード変換機能の設定

レコード変換機能の設定

データ変換をオンにするのチェックボックスにチェックをつけて作成したLambda関数を指定しましょう。

バッファサイズと間隔は上記ではテストのためにできるだけ短い間隔で出力するように指定していますが、本番環境に展開するときは適したパラメータにしましょう。

送信先の設定

送信先の設定

一意のキー設定のみ指定して、テーブル名は先程作成したテーブル名を指定しましょう。

VPCフローログの作成

最後に先ほどと同じようにFirehoseへ出力するよう設定したVPCフローログを作成します。作り方は先程と同じなので割愛します。

結果の確認

Athenaで見ると見事にS3 TablesにVPCフローログが出力されていました。

S3 Tablesのデータ確認

各フィールドが適切に分割され、型変換も行われていることが確認できます。これでVPCフローログを効率的に分析できる環境が整いました。

まとめ

以上、Firehose経由でVPCフローログをS3 Tablesに出力してみる方法を紹介しました。そのうちVPCフローログから直接S3 Tablesに配信する設定がサポートされるような気もしますが、今すぐS3 TablesにVPCフローログを出力したいという方は上記を参考にしていただければと思います。

以上、とーちでした。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.