S3のアクセスログをサーバレスに処理してAthenaで解析してみた

S3のアクセスログをサーバレスに処理してAthenaで解析してみた

Clock Icon2019.03.28 20:12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWSチームのすずきです。

APIの署名バージョン情報がログ項目として追加されたS3のサーバアクセスログ、CloudTrailより廉価な利用が可能ですが、
出力されるログファイル数の多さ、正規表現によるログのパースの困難さなどにより、Athenaなどを利用した解析が難しい課題がありました。

https://dev.classmethod.jp/cloud/aws/s3-accesslog-new-fields/

今回この解決のため、ログファイルのパースと集約をLambdaとFirehoseを利用して実施、Athenaによる解析処理が行いやすいログ形式への変換を実現しました。

また、後処理のLambdaを用いて署名バージョン2(SigV2)のアクセスログのみを抽出。移行対応が必要となるログが存在した場合、 CloudWatch Logs で簡単に確認可能とする仕組みも用意しました。

これらの仕組みと、展開するテンプレートを紹介させて頂きます。

構成図

設置方法

CloudFormation展開

  • AWSコンソールにログイン済みのブラウザから、展開リンクをクリックします。

  • 展開用リンク

  • cm-s3accesslog-sigv2.yaml

  • IAMリソースの作成を承認をチェックし「作成」します。

  • 5分程度でリソース設置が完了します。

リソース説明

S3(オリジナルアクセスログ)

s3://<スタック名>-accesslog-<リージョン>-<アカウントID>

  • 調査対象とするS3アクセスログの出力先(ターゲットバケット)として利用します。
  • 非圧縮、テキスト形式のS3アクセスログの一時的な置き場として利用します。

Lambda (前処理)

<スタック名>-LambdaFunction-xxxxxxx

  • アクセスログを複数の正規表現を用いてパース後、JSON形式のログデータをFirehoseに投入します。

Firehose

  • JSON形式のログデータを一定時間(600秒)バッファし、圧縮(GZ形式)して出力します。

S3(JSON変換済)

s3://<スタック名>-firehose-output-<リージョン>-<アカウントID>

  • Athenaで解析可能なJSONファイルの保存場所となります。
  • 非圧縮で100MBのアクセスログ、15MB程度に圧縮された状態で保存されます。
  • 今回のテンプレートでは、以下のS3にGZ圧縮されたログが保存されます。
s3://<スタック名>-firehose-output-<リージョン>-<アカウントID>/firehose/s3_logs/<年>/<月>/<日>/<時間>/
  • FirehoseでS3プリフィックス設定を追加する事で、年月日をAthenaのパーティションとして認識するApache Hiveフォーマットで保存する事も可能です。

https://dev.classmethod.jp/cloud/firehose-custom-s3-prefixes/

  • 2019年3月現在、CloudFormation未対応でした。

Lambda(後処理)

<スタック名>-LambdaFunction2-xxxxxxx

  • S3のイベントトリガーで起動し、Firehoseより出力されたログファイルから、V2署名(SigV2)のログを抽出します。
  • 集計、確認に必要な項目を、標準出力を利用してCloudWatchLogsに出力します。
  • V2署名(SigV2)のアクセスログが少ない場合、簡易なログ確認手段となります。
  • V2署名のログが多く、CloudWatch Logsの費用が問題となる場合や、CloudWatch Logs Insightsを利用した可視化の必要性がない場合には、後処理Lambdaのトリガーを無効にしてください。

CloudWatch Logs (Insights)

ロググループ 「/aws/lambda/<スタック名>-LambdaFunction2-xxxxxxx」

  • V2署名のS3アクセスログが蓄積されます。

https://dev.classmethod.jp/cloud/aws/check-s3-sig2-usage-cloudwatchinsight/

CloudWatch Logs Insights クエリ例

ロググループ 「/aws/lambda/<スタック名>-LambdaFunction2-xxxxxxx」 と、任意の期間を指定してクエリを実行します。

  • ログ20件の確認
fields @message 
| filter SignatureVersion = 'SigV2'
| limit 20

  • IPアドレス、S3バケット、操作内容、ユーザエージェント別件数集計
stats count(*) by RemoteIP, Bucket, Operation, UserAgent
| filter SignatureVersion = 'SigV2'

Athena

JSON形式で集約したログ、Athenaを利用して解析する方法を紹介します。

テーブル作成

  • Firehoseの出力S3より、解析対象となる年月日のパスを確認します。
  • Athenaのテーブル作成時、確認したS3パスをロケーションとして指定してします。
  • DB領域は「default」を利用しました。
CREATE EXTERNAL TABLE IF NOT EXISTS default.s3accesslog (
  `Bucket` string,
  `Timestamp` timestamp,
  `RemoteIP` string,
  `Requester` string,
  `RequestID` string,
  `Operation` string,
  `Key` string,
  `RequestURI` string,
  `HTTPstatus` string,
  `ErrorCode` string,
  `BytesSent` string,
  `ObjectSize` string,
  `TotalTime` string,
  `TurnAroundTime` string,
  `Referrer` string,
  `UserAgent` string,
  `VersionId` string,
  `HostId` string,
  `SignatureVersion` string,
  `CipherSuite` string,
  `AuthenticationType` string,
  `HostHeader` string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://<スタック名>-firehose-output-<リージョン>-<アカウントID>/firehose/s3_logs/<年>/<月>/<日>/'
TBLPROPERTIES ('has_encrypted_data'='false');

クエリ例

  • 10件表示
SELECT * FROM "default"."s3accesslog" limit 10;

  • 過去24時間に発生した、V2署名アクセス、接続元IP、IAM、バケット、ユーザエージェント別集計
SELECT count(1) AS cnt,
         RemoteIP,
         Requester,
         Bucket ,
         Operation,
         UserAgent ,
         SignatureVersion
FROM "default"."s3accesslog"
WHERE Timestamp > now() - interval '24' hour
AND SignatureVersion = 'SigV2'
GROUP BY  RemoteIP, Requester, Bucket ,Operation, UserAgent , SignatureVersion
ORDER BY  cnt DESC 

件数が多い場合、RemoteIPなどを除外する事で、利用頻度の高いシステムを絞り込みやすくなります。

  • 過去24時間に発生した、V2署名アクセス、バケット、ユーザエージェント別集計
SELECT count(1) AS cnt,
         Bucket ,
         UserAgent ,
         SignatureVersion
FROM "default"."s3accesslog"
WHERE Timestamp > now() - interval '24' hour
AND SignatureVersion = 'SigV2'
GROUP BY  Bucket ,Operation, UserAgent , SignatureVersion
ORDER BY  cnt DESC 

まとめ

LambdaとFirehoseにより、S3のアクセスログの解析が簡単に実施出来るようになりました。

CloudTrailのオブジェクトログと比較して、S3のアクセスログは廉価に利用する事が可能です。
大量のS3API利用があるシステムや、継続的なV2署名ログの確認が必要な場合、
今回紹介させて頂いた仕組みなどを参考に頂ければと思います。

今回、S3のアクセスログのパースに利用した正規表現は、先のブログ記事に頂いたコメントとリンク先の情報を踏襲させていただきました。
ume 様、ありがとうございました。

bungoume/parser.py

s3accesslog-sigv2

cm-s3accesslog-sigv2.yaml)
AWSTemplateFormatVersion: '2010-09-09'
Description: Convert S3 access log to JSON and output to Firehose with sigv2 check (20190324)
Parameters:
  FirehoseExpiredate:
    Description: Number of days to keep S3 file (Firehose)
    Type: String
    Default: 7
Resources:
  S3bucketAccesslog:
    DependsOn: LambdaInvokePermission
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    Properties:
      BucketName: !Sub '${AWS::StackName}-s3-accesslog-${AWS::Region}-${AWS::AccountId}'
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 1
      NotificationConfiguration:
        LambdaConfigurations:
          - Function: !GetAtt 'LambdaFunction.Arn'
            Event: s3:ObjectCreated:*
  LambdaInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt 'LambdaFunction.Arn'
      Action: lambda:InvokeFunction
      Principal: "s3.amazonaws.com"
      SourceAccount: !Ref 'AWS::AccountId'
      SourceArn: !Sub 'arn:aws:s3:::${AWS::StackName}-s3-accesslog-${AWS::Region}-${AWS::AccountId}'
  LogGroupLambda:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${LambdaFunction}'
      RetentionInDays: 7
  LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-s3-accesslog-${AWS::Region}-${AWS::AccountId}/*'
              - Effect: Allow
                Action:
                  - firehose:PutRecordBatch
                Resource: !GetAtt 'Deliverystream.Arn'
  LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt 'LambdaRole.Arn'
      Code:
        ZipFile: !Sub |
          import json
          import re
          from datetime import datetime
          import boto3
          import os
          import urllib.parse
          s3 = boto3.client('s3')
          firehose = boto3.client('firehose')

          RE_TEXT = r"""
            ^(?P<BucketOwner>\S+)\u0020
            (?P<Bucket>\S+)\u0020
            \[(?P<Timestamp>[^\]]+)\]\u0020
            (?P<RemoteIP>\S+)\u0020
            (?P<Requester>\S+)\u0020
            (?P<RequestID>[0-9A-F]{16})\u0020
            (?P<Operation>[A-Z0-9\.\_]+)\u0020
            (?P<Key>\S+)\u0020
            \"(?P<RequestURI>.*)\"\u0020
            (?P<HTTPstatus>([0-9]{3}|-))\u0020
            (?P<ErrorCode>([A-Za-z]+|-))\u0020
            (?P<BytesSent>([0-9]+|-))\u0020
            (?P<ObjectSize>([0-9]+|-))\u0020
            (?P<TotalTime>([0-9]+|-))\u0020
            (?P<TurnAroundTime>([0-9]+|-))\u0020
            \"(?P<Referrer>.*)\"\u0020
            \"(?P<UserAgent>.*)\"\u0020
            (?P<VersionId>[A-Za-z0-9\_\.]+|-)
            (\u0020(?P<SessionPrincipal>\S+))?
            """
          RE_TEXT2 = r"""
            \u0020
            (?P<HostId>([A-Za-z0-9\/\+\=]+|-))\u0020
            (?P<SignatureVersion>(SigV2|SigV4|-))\u0020
            (?P<CipherSuite>[A-Z0-9\-]+)\u0020
            (?P<AuthenticationType>(AuthHeader|QueryString|-))\u0020
            (?P<HostHeader>\S+)
            (?P<SslProtocol>\u0020.*)?
            """
          RE_TEXT_NOQUOTE = RE_TEXT.replace("\\\"", "").replace(".*", r"\S*")
          RE_FORMAT_V4 = re.compile(RE_TEXT + RE_TEXT2, flags=re.VERBOSE)
          RE_FORMAT_V3 = re.compile(RE_TEXT, flags=re.VERBOSE)
          RE_FORMAT_V2 = re.compile(RE_TEXT_NOQUOTE + RE_TEXT2, flags=re.VERBOSE)
          def parse_line(line):
            line = line.rstrip("\n")
            m = RE_FORMAT_V4.match(line)
            if not m:
              m = RE_FORMAT_V3.match(line)
            if not m:
              m = RE_FORMAT_V2.match(line)
            doc = m.groupdict()
            doc.pop("SessionPrincipal", None)
            dt = datetime.strptime(doc["Timestamp"], "%d/%b/%Y:%H:%M:%S %z")
            doc.pop("Timestamp")
            doc["@Timestamp"] = dt.strftime('%Y-%m-%d %H:%M:%S')
            return doc
          def lambda_handler(event, context):
            bucket = event['Records'][0]['s3']['bucket']['name']
            key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
            r = s3.get_object(Bucket=bucket, Key=key)
            a = r['Body'].read().decode('utf-8','ignore').splitlines()
            l = []
            for b in a:
              c = parse_line(b)
              if len(c) > 1:
                l.append(c)
            if len(l) > 0:
              put_log_firehose(l)
          def log_parse(data):
            a = data
            z = {}
          def put_log_firehose(data):
            s = os.environ['firehose_stream_name']
            b = []
            for a in data:
              b.append({'Data': json.dumps(a) + "\n"})
              if len(b) > 200 or len(str(b)) > 20000:
                r = firehose.put_record_batch(
                  DeliveryStreamName = s,
                  Records = b
                )
                print (json.dumps(r))
                b = []
            if len(b) > 0:
              r = firehose.put_record_batch(
                DeliveryStreamName = s,
                Records = b
              )
              del r['RequestResponses']
              print (json.dumps(r))
      Runtime: python3.7
      MemorySize: 128
      Timeout: 300
      Description: Convert S3 access log to JSON and output to Firehose
      Environment:
        Variables:
          firehose_stream_name: !Ref 'Deliverystream'
      Tags:
        - Key: CloudformationArn
          Value: !Ref 'AWS::StackId'        
  Deliverystream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !Sub 'arn:aws:s3:::${S3bucketFirehose}'
        BufferingHints:
          IntervalInSeconds: '600'
          SizeInMBs: '64'
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub '/aws/firehose/${AWS::StackName}'
          LogStreamName: 'firehose'        
        CompressionFormat: GZIP
        Prefix: firehose/s3_logs/
        RoleARN: !GetAtt 'DeliveryRole.Arn'
        ProcessingConfiguration:
          Enabled: 'false'
  DeliveryRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref 'AWS::AccountId'
  DeliveryPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: firehose_delivery_policy
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Action:
              - s3:AbortMultipartUpload
              - s3:GetBucketLocation
              - s3:GetObject
              - s3:ListBucket
              - s3:ListBucketMultipartUploads
              - s3:PutObject
            Resource:
              - !Sub 'arn:aws:s3:::${S3bucketFirehose}'
              - !Sub 'arn:aws:s3:::${S3bucketFirehose}*'
      Roles:
        - !Ref 'DeliveryRole'
  LogGroupFirehose:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/firehose/${AWS::StackName}'
      RetentionInDays: !Ref 'FirehoseExpiredate'
  S3bucketFirehose:
    DependsOn: LambdaInvokePermission2
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    Properties:
      BucketName: !Sub '${AWS::StackName}-firehose-output-${AWS::Region}-${AWS::AccountId}'
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: !Ref 'FirehoseExpiredate'
      NotificationConfiguration:
        LambdaConfigurations:
          - Function: !GetAtt 'LambdaFunction2.Arn'
            Event: s3:ObjectCreated:*
            Filter:
              S3Key:
                Rules:
                  - Name: prefix
                    Value: firehose/
                  - Name: suffix
                    Value: gz
  LambdaInvokePermission2:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt 'LambdaFunction2.Arn'
      Action: lambda:InvokeFunction
      Principal: "s3.amazonaws.com"
      SourceAccount: !Ref 'AWS::AccountId'
      SourceArn: !Sub 'arn:aws:s3:::${AWS::StackName}-firehose-output-${AWS::Region}-${AWS::AccountId}'
  LogGroupLambda2:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${LambdaFunction2}'
      RetentionInDays: 7
  LambdaRole2:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-firehose-output-${AWS::Region}-${AWS::AccountId}/*'
  LambdaFunction2:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt 'LambdaRole2.Arn'
      Code:
        ZipFile: !Sub |
          import json
          import datetime
          import boto3
          import os
          import urllib.parse
          import gzip
          s3 = boto3.client('s3')
          def lambda_handler(event, context):
            bucket = event['Records'][0]['s3']['bucket']['name']
            key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
            r = s3.get_object(Bucket=bucket, Key=key)
            a = gzip.decompress(r['Body'].read()).decode('utf-8').splitlines()
            l = []
            for b in a:
              c = json.loads(b)
              if 'SignatureVersion' in c:
                if c["SignatureVersion"] == 'SigV2':
                  print_log_stdout(c)
          def print_log_stdout(data):
            a = data
            z = {}
            z["Bucket"] = a["Bucket"]
            z["Timestamp"] = a["Timestamp"]
            z["RemoteIP"] = a["RemoteIP"] 
            z["Operation"] = a["Operation"]
            z["Key"] = a["Key"]
            z["RequestURI"] = a["RequestURI"]
            z["UserAgent"] = a["UserAgent"]
            z["SignatureVersion"] = a["SignatureVersion"]
            print (json.dumps(z))
      Runtime: python3.7
      MemorySize: 128
      Timeout: 300
      Description: SigV2 access extraction from S3 access log (JSON)
      Tags:
        - Key: CloudformationArn
          Value: !Ref 'AWS::StackId'        

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.