ALBのアクセスログ約10分程度のタイムラグでAthena解析可能にする仕組みを紹介します

AWSチームのすずきです。

前回の記事では、Athenaで効率的な解析を可能にするFirehoseについて紹介させて頂きました。

今回、Firehoseへのログ登録と、Athena/Glue のパーティション更新を自動化し、 10分程度のタイムラグで Athena を利用した ALBのアクセスログ解析を可能にする仕組みを紹介します。

Firehoseで Parquet形式に変換したALBのアクセスログをAthenaで解析してみた

構成図

AWS設定

S3

ALBのアクセスログ、5分ごとに各ノードが一斉にログファイルを出力する仕様です。

S3イベントから直接Firehoseの登録を実行した場合、5分ごとのログ出力直後にFirehoseの更新が集中、 帯域超過に伴うログ欠損をさけるため、SQSを利用した非同期処理を行いました。

東京リージョンのS3に出力されたALBログファイル、Firehose、Athenaの利用費が廉価な北米リージョンにレプリケーションで利用する想定でバージョニングを有効としています。

  S3bucketAccesslog:
    Type: AWS::S3::Bucket
    Properties:
      NotificationConfiguration:
        LambdaConfigurations:
          - Function: !GetAtt 'LambdaSqsFunction.Arn'
            Event: s3:ObjectCreated:*
      VersioningConfiguration:
        Status: Enabled
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: 3

SQS

Firehose登録を行うLambdaのリトライ回数管理のため、VisibilityTimeout、DeadLetterQueueの設定を実施しました。

  SqsQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 300
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt 'SqsDeadLetterQueue.Arn'
        maxReceiveCount: 10
  SqsDeadLetterQueue:
    Type: AWS::SQS::Queue

LambdaのリトライをAWS SQSを使ってやってみる

Lambda

SQS Producer

S3のログ作成イベントを元に、SQSのメッセージを作成します。

          def lambda_handler(event, context):
            bucket = event['Records'][0]['s3']['bucket']['name']
            s3key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
            a = {}
            a["bucket"] = bucket
            a["s3key"] = s3key
            elb_info = get_elb_listener_info(s3key)
            a.update(elb_info)
            r = put_message_sqs(a)
          def get_elb_listener_info(s3key):
            elb_ip = s3key.split('_')[-2]
            d = {}
            d["elb_listener_ip"] = elb_ip
            return d
          def put_message_sqs(data):
            q = os.environ['sqs_queue_url']
            d = json.dumps(data)
            r = sqs.send_message(QueueUrl = q, MessageBody = d)

S3 イベント を SQS のProducer として設定する事も可能ですが、 ELBノードの情報をS3のキー情報から取得する処理のため、今回は Lambdaを利用しました。

ALBのアクセスログにアベイアビリティゾーン情報を追加してみた

SQS Consumer

「ReservedConcurrentExecutions」(同時実行の予約数)は、2と設定しました。

今回、Firehoseへのログ登録を実施するLambda関数(割当メモリ128MB)、Firehoseの書き込みスループットの実測値は毎秒2〜3MBでした。 Firehose の帯域を1→10MBまで拡張した環境では、書き込みスループットの超過エラーは回避出来るようになりました。

  LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      ReservedConcurrentExecutions: 2

ReservedConcurrentExecutions によるLambda実行数の制限が発動した場合、Lambda関数のスロットルが発生します。

アカウントレベルのメトリクスでLambdaのスロットル数の監視を実施している環境では、当設定がノイズとなる可能性がある点にご注意ください。

  • アカウントレベルのLambdaのCloudWatchメトリクス

スロットルが常時発生し続ける場合、処理の遅延や、リトライがDLQで設定した規定回数失敗した場合ログ欠落する事になります。 Firehoseの上限緩和手続きを実施した上で、Lambdaの同時実行数を増やす対応が必要となります。

今回、シンプルな実装とするためSQSのイベントとしてLambdaを起動していますが、 より大規模なログ処理が必要となる環境では、メッセージの受信、削除処理をLambda関数に追加、SQSをポーリングする利用や、先日並列処理に対応したStepFunctionを Consumerとする実装をお試しください。

  • SQSポーリング利用イメージ

[アップデート]Step Functionsで動的並列処理がサポートされました!

Firehose

Firehoseの S3出力バッファーの時間は120秒としています。

直近のアクセスログを確認する必要がない場合、バッファ時間は長く設定し(最長900秒)、S3に保存されるファイルの断片化を避ける事をおすすめします。

  
  Deliverystream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BufferingHints:
          IntervalInSeconds: 120

Partition 追加

現在時刻を元に、Athena/Glue テーブルのパーティションを追加するLambda関数を用意しました。

          def lambda_handler(event, context):
            dt = datetime.now().strftime("%Y-%m-%d")
            hour = datetime.now().strftime("%H")
            s3bucket = os.environ['s3bucket']
            s3key = os.environ['s3key']
            gluedb = os.environ['gluedb']
            gluetable = os.environ['gluetable']
            athena = boto3.client('athena')
            sql = "ALTER TABLE " + gluetable + " ADD IF NOT EXISTS PARTITION (dt='" + dt + "',hour='" + hour + "') location 's3://" + s3bucket + "/" + gluedb + "/" + gluetable + "/dt=" + dt + "/hour=" + hour + "'"
            athena.start_query_execution(
              QueryString= sql ,
              QueryExecutionContext={
                'Database': gluedb
              },
              ResultConfiguration={
                'OutputLocation': 's3://' + s3bucket + '/athena-output' 
              }
            )

CloudWatch Event

1時間毎に パーティション 追加Lambdaを実行します。

  ScheduledRule:
    Type: AWS::Events::Rule
    Properties:
      Description: ScheduledRule
      ScheduleExpression: cron(1 * * * ? *)
      State: ENABLED
      Targets:
        - Arn: !GetAtt 'LambdaFunctionAddPartition.Arn'
          Id: TargetFunctionV1

2019年現在、AWS Glue データカタログ に移行済みのAWS環境では、アカウントあたりのパーティションの数は 20,000,000。 1時間毎のパーティション作成を繰り返しても上限に到達するのは2283年後の計算になりますが、 同じAWSアカウント内にパーティションを多く作成するシステムが存在する場合、パーティション数の上限にご留意ください。

AWS サービスの制限(AWS Glue の制限)

Athena View

直近(70分)のログを確認するViewを作成します。 現在時刻と1時間前に一致するパーティションを指定し、Athenaのスキャン量を限定しています。

CREATE OR REPLACE VIEW alblog_1h AS
SELECT *
FROM 
    (SELECT *
    FROM alblog
    WHERE dt = cast((current_timestamp - INTERVAL '1' HOUR) AS date)
            AND hour = hour((current_timestamp - INTERVAL '1' HOUR))
    UNION ALL
    SELECT *
    FROM alblog
    WHERE dt = cast(current_timestamp AS date)
            AND hour = hour(current_timestamp) 
    )
WHERE from_iso8601_timestamp(timestamp) > (current_timestamp - INTERVAL '70' minute)

利用例

アクセスログの最終タイムスタンプ確認

SELECT current_timestamp ,
         max (request_creation_time) AS newest_request_creation_time ,
         sum(1) AS sum_logs
FROM alblog_1h

クエリの実行時刻(current_timestamp)「8:13:59 UTC」に対し、4分前のタイムスタンプを持つアクセスログの確認ができました。

5XX 件数確認

SELECT elb_status_code,
         target_status_code,
         count(1) AS row_count
FROM alblog_1h
WHERE elb_status_code LIKE '5%'
GROUP BY  elb_status_code, target_status_code
ORDER BY  row_count

直近1時間のアクセスログ、5XXレスポンスの内訳を確認します。

ELB起因(ELB5XX)、レスポンスコード500のエラーが出ている場合、詳細確認をおすすめします。

SELECT
    request_creation_time,
    elb_status_code,
    target_status_code,
    actions_executed
FROM
    alblog_1h
WHERE
    elb_status_code LIKE '500'
AND target_status_code NOT LIKE '5%'
ORDER BY
    request_creation_time

target_processing_time

ターゲットとなるEC2の応答時間を集計、AP、DB負荷が高い可能性がある要素を確認します。

所要時間の合計値(Sum)に加え、外れ値による誤判定を避けるためパーセンタイル値も求めています。

IP、UserAgent別集計

IP、UserAgent別の集計を実施、悪質なBotなど望まないリクエスト元であった場合、AWS WAFなどによる対策を検討します。

SELECT
    count(1) AS count_records,
    client_ip,
    sum(target_processing_time) AS sum_target_processing_time,
    approx_percentile(target_processing_time, 0.50) AS target_processing_time_p50,
    approx_percentile(target_processing_time, 0.90) AS target_processing_time_p90,
    approx_percentile(target_processing_time, 0.99) AS target_processing_time_p99,
    sum(received_bytes) AS sum_received_bytes,
    sum(sent_bytes) AS sum_sent_bytes,
    user_agent
FROM
    alblog_1h
GROUP BY
    client_ip,
    user_agent
ORDER BY
    sum_target_processing_time DESC
limit 50

URL別集計

URL別に所要時間の集計を実施します。特定のURLの所要時間の悪化が認められた場合、該当ページの動作状況を確認して改善を検討します。

SELECT
    count(1) AS count_records,
    request_url,
    sum(target_processing_time) AS sum_target_processing_time,
    avg(target_processing_time) AS avg_target_processing_time,
    approx_percentile(target_processing_time, 0.50) AS target_processing_time_p50,
    approx_percentile(target_processing_time, 0.90) AS target_processing_time_p90,
    approx_percentile(target_processing_time, 0.95) AS target_processing_time_p95,
    approx_percentile(target_processing_time, 0.99) AS target_processing_time_p99,
    sum(received_bytes) AS sum_received_bytes,
    sum(sent_bytes) AS sum_sent_bytes
FROM
    alblog_1h
GROUP BY
    request_url
ORDER BY
    sum_target_processing_time DESC
limit 50

まとめ

AWS標準で提供されるFirehoseを始めとする機能 Lambdaで組み合わせる事で効率的な利用が実現できました。

システムの性能管理や障害調査のため、ALBのアクセスログを頻繁にAthenaで解析する必要がある場合、 今回の紹介させて頂いた手法をお試し下さい。

テンプレート

今回紹介した、S3、SQS、Lambda、Firehose、Glueを展開する CloudFormation テンプレートです。