この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWSチームのすずきです。
前回の記事では、Athenaで効率的な解析を可能にするFirehoseについて紹介させて頂きました。
今回、Firehoseへのログ登録と、Athena/Glue のパーティション更新を自動化し、 10分程度のタイムラグで Athena を利用した ALBのアクセスログ解析を可能にする仕組みを紹介します。
構成図
AWS設定
S3
ALBのアクセスログ、5分ごとに各ノードが一斉にログファイルを出力する仕様です。
S3イベントから直接Firehoseの登録を実行した場合、5分ごとのログ出力直後にFirehoseの更新が集中、 帯域超過に伴うログ欠損をさけるため、SQSを利用した非同期処理を行いました。
東京リージョンのS3に出力されたALBログファイル、Firehose、Athenaの利用費が廉価な北米リージョンにレプリケーションで利用する想定でバージョニングを有効としています。
S3bucketAccesslog:
Type: AWS::S3::Bucket
Properties:
NotificationConfiguration:
LambdaConfigurations:
- Function: !GetAtt 'LambdaSqsFunction.Arn'
Event: s3:ObjectCreated:*
VersioningConfiguration:
Status: Enabled
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: 3
SQS
Firehose登録を行うLambdaのリトライ回数管理のため、VisibilityTimeout、DeadLetterQueueの設定を実施しました。
SqsQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 300
RedrivePolicy:
deadLetterTargetArn: !GetAtt 'SqsDeadLetterQueue.Arn'
maxReceiveCount: 10
SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Lambda
SQS Producer
S3のログ作成イベントを元に、SQSのメッセージを作成します。
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
s3key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
a = {}
a["bucket"] = bucket
a["s3key"] = s3key
elb_info = get_elb_listener_info(s3key)
a.update(elb_info)
r = put_message_sqs(a)
def get_elb_listener_info(s3key):
elb_ip = s3key.split('_')[-2]
d = {}
d["elb_listener_ip"] = elb_ip
return d
def put_message_sqs(data):
q = os.environ['sqs_queue_url']
d = json.dumps(data)
r = sqs.send_message(QueueUrl = q, MessageBody = d)
S3 イベント を SQS のProducer として設定する事も可能ですが、 ELBノードの情報をS3のキー情報から取得する処理のため、今回は Lambdaを利用しました。
SQS Consumer
「ReservedConcurrentExecutions」(同時実行の予約数)は、2と設定しました。
今回、Firehoseへのログ登録を実施するLambda関数(割当メモリ128MB)、Firehoseの書き込みスループットの実測値は毎秒2〜3MBでした。 Firehose の帯域を1→10MBまで拡張した環境では、書き込みスループットの超過エラーは回避出来るようになりました。
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
ReservedConcurrentExecutions: 2
ReservedConcurrentExecutions によるLambda実行数の制限が発動した場合、Lambda関数のスロットルが発生します。
アカウントレベルのメトリクスでLambdaのスロットル数の監視を実施している環境では、当設定がノイズとなる可能性がある点にご注意ください。
- アカウントレベルのLambdaのCloudWatchメトリクス
スロットルが常時発生し続ける場合、処理の遅延や、リトライがDLQで設定した規定回数失敗した場合ログ欠落する事になります。 Firehoseの上限緩和手続きを実施した上で、Lambdaの同時実行数を増やす対応が必要となります。
今回、シンプルな実装とするためSQSのイベントとしてLambdaを起動していますが、 より大規模なログ処理が必要となる環境では、メッセージの受信、削除処理をLambda関数に追加、SQSをポーリングする利用や、先日並列処理に対応したStepFunctionを Consumerとする実装をお試しください。
- SQSポーリング利用イメージ
Firehose
Firehoseの S3出力バッファーの時間は120秒としています。
直近のアクセスログを確認する必要がない場合、バッファ時間は長く設定し(最長900秒)、S3に保存されるファイルの断片化を避ける事をおすすめします。
Deliverystream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BufferingHints:
IntervalInSeconds: 120
Partition 追加
現在時刻を元に、Athena/Glue テーブルのパーティションを追加するLambda関数を用意しました。
def lambda_handler(event, context):
dt = datetime.now().strftime("%Y-%m-%d")
hour = datetime.now().strftime("%H")
s3bucket = os.environ['s3bucket']
s3key = os.environ['s3key']
gluedb = os.environ['gluedb']
gluetable = os.environ['gluetable']
athena = boto3.client('athena')
sql = "ALTER TABLE " + gluetable + " ADD IF NOT EXISTS PARTITION (dt='" + dt + "',hour='" + hour + "') location 's3://" + s3bucket + "/" + gluedb + "/" + gluetable + "/dt=" + dt + "/hour=" + hour + "'"
athena.start_query_execution(
QueryString= sql ,
QueryExecutionContext={
'Database': gluedb
},
ResultConfiguration={
'OutputLocation': 's3://' + s3bucket + '/athena-output'
}
)
CloudWatch Event
1時間毎に パーティション 追加Lambdaを実行します。
ScheduledRule:
Type: AWS::Events::Rule
Properties:
Description: ScheduledRule
ScheduleExpression: cron(1 * * * ? *)
State: ENABLED
Targets:
- Arn: !GetAtt 'LambdaFunctionAddPartition.Arn'
Id: TargetFunctionV1
2019年現在、AWS Glue データカタログ に移行済みのAWS環境では、アカウントあたりのパーティションの数は 20,000,000。 1時間毎のパーティション作成を繰り返しても上限に到達するのは2283年後の計算になりますが、 同じAWSアカウント内にパーティションを多く作成するシステムが存在する場合、パーティション数の上限にご留意ください。
Athena View
直近(70分)のログを確認するViewを作成します。 現在時刻と1時間前に一致するパーティションを指定し、Athenaのスキャン量を限定しています。
CREATE OR REPLACE VIEW alblog_1h AS
SELECT *
FROM
(SELECT *
FROM alblog
WHERE dt = cast((current_timestamp - INTERVAL '1' HOUR) AS date)
AND hour = hour((current_timestamp - INTERVAL '1' HOUR))
UNION ALL
SELECT *
FROM alblog
WHERE dt = cast(current_timestamp AS date)
AND hour = hour(current_timestamp)
)
WHERE from_iso8601_timestamp(timestamp) > (current_timestamp - INTERVAL '70' minute)
利用例
アクセスログの最終タイムスタンプ確認
SELECT current_timestamp ,
max (request_creation_time) AS newest_request_creation_time ,
sum(1) AS sum_logs
FROM alblog_1h
クエリの実行時刻(current_timestamp)「8:13:59 UTC」に対し、4分前のタイムスタンプを持つアクセスログの確認ができました。
5XX 件数確認
SELECT elb_status_code,
target_status_code,
count(1) AS row_count
FROM alblog_1h
WHERE elb_status_code LIKE '5%'
GROUP BY elb_status_code, target_status_code
ORDER BY row_count
直近1時間のアクセスログ、5XXレスポンスの内訳を確認します。
ELB起因(ELB5XX)、レスポンスコード500のエラーが出ている場合、詳細確認をおすすめします。
SELECT
request_creation_time,
elb_status_code,
target_status_code,
actions_executed
FROM
alblog_1h
WHERE
elb_status_code LIKE '500'
AND target_status_code NOT LIKE '5%'
ORDER BY
request_creation_time
target_processing_time
ターゲットとなるEC2の応答時間を集計、AP、DB負荷が高い可能性がある要素を確認します。
所要時間の合計値(Sum)に加え、外れ値による誤判定を避けるためパーセンタイル値も求めています。
IP、UserAgent別集計
IP、UserAgent別の集計を実施、悪質なBotなど望まないリクエスト元であった場合、AWS WAFなどによる対策を検討します。
SELECT
count(1) AS count_records,
client_ip,
sum(target_processing_time) AS sum_target_processing_time,
approx_percentile(target_processing_time, 0.50) AS target_processing_time_p50,
approx_percentile(target_processing_time, 0.90) AS target_processing_time_p90,
approx_percentile(target_processing_time, 0.99) AS target_processing_time_p99,
sum(received_bytes) AS sum_received_bytes,
sum(sent_bytes) AS sum_sent_bytes,
user_agent
FROM
alblog_1h
GROUP BY
client_ip,
user_agent
ORDER BY
sum_target_processing_time DESC
limit 50
URL別集計
URL別に所要時間の集計を実施します。特定のURLの所要時間の悪化が認められた場合、該当ページの動作状況を確認して改善を検討します。
SELECT
count(1) AS count_records,
request_url,
sum(target_processing_time) AS sum_target_processing_time,
avg(target_processing_time) AS avg_target_processing_time,
approx_percentile(target_processing_time, 0.50) AS target_processing_time_p50,
approx_percentile(target_processing_time, 0.90) AS target_processing_time_p90,
approx_percentile(target_processing_time, 0.95) AS target_processing_time_p95,
approx_percentile(target_processing_time, 0.99) AS target_processing_time_p99,
sum(received_bytes) AS sum_received_bytes,
sum(sent_bytes) AS sum_sent_bytes
FROM
alblog_1h
GROUP BY
request_url
ORDER BY
sum_target_processing_time DESC
limit 50
まとめ
AWS標準で提供されるFirehoseを始めとする機能 Lambdaで組み合わせる事で効率的な利用が実現できました。
システムの性能管理や障害調査のため、ALBのアクセスログを頻繁にAthenaで解析する必要がある場合、 今回の紹介させて頂いた手法をお試し下さい。
テンプレート
今回紹介した、S3、SQS、Lambda、Firehose、Glueを展開する CloudFormation テンプレートです。