この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
西澤です。CloudTrailって本当に素晴らしい機能なのですが、ログ出力の形式がJSONなので、そのままだと扱いづらかったりします。これに対する対処法については、まず下記の素晴らしいブログをご覧ください。
で、CloudTrailログに対するクエリをいざやってみようと思ったのですが、HadoopもPrestoも触ったことがなかった私にはまだちょっと難しい。今回こちらに記録しておくことで、自分が再利用できるようにしておきたいと考えました。
前提
本記事では、下記を前提とさせていただきますので、ご不明点がある方は事前にご確認ください。
- CloudTrailログがS3バケットに出力されるよう設定されている
- Ahtenaのクエリ性能や課金に関しては考慮しない、なので、Athenaパーティションについては考えない
CloudTrailログにAthenaからクエリする
CloudTrailログのフォーマットを確認して出力したい項目を決める
まずは、CloudTrailログのフォーマットを確認しておきましょう。下記ハイライトされた項目のみを出力しようと考えました。
{
"Records": [{
"eventVersion": "1.01",
"userIdentity": {
"type": "IAMUser",
"principalId": "AIDAJDPLRKLG7UEXAMPLE",
"arn": "arn:aws:iam::123456789012:user/Alice",
"accountId": "123456789012",
"accessKeyId": "AKIAIOSFODNN7EXAMPLE",
"userName": "Alice",
"sessionContext": {
"attributes": {
"mfaAuthenticated": "false",
"creationDate": "2014-03-18T14:29:23Z"
}
}
},
"eventTime": "2014-03-18T14:30:07Z",
"eventSource": "cloudtrail.amazonaws.com",
"eventName": "StartLogging",
"awsRegion": "us-west-2",
"sourceIPAddress": "72.21.198.64",
"userAgent": "signin.amazonaws.com",
"requestParameters": {
"name": "Default"
},
"responseElements": null,
"requestID": "cdc73f9d-aea9-11e3-9d5a-835b769c0d9c",
"eventID": "3074414d-c626-42aa-984b-68ff152d6ab7"
},
... additional entries ...
]
Athenaでテーブルを作成する
records以下がARRAY
、userIdentity以下が1つ階層が深いのでSTRUCT
になるところがポイントだと思います。LOCATIONに指定するS3のフォルダ配下の全てがスキャン範囲(課金範囲)になるのでご注意ください。
CREATE EXTERNAL TABLE IF NOT EXISTS records_201702 (
records ARRAY<
STRUCT<
eventTime:STRING,
eventSource:STRING,
eventName:STRING,
awsRegion:STRING,
sourceIPAddress:STRING,
userIdentity:STRUCT<
type:STRING,
arn:STRING
>
>
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://[CloudTrailログ出力バケット名]/[prefix指定があれば]/AWSLogs/[AWSアカウントID]/CloudTrail/[Region]/2017/02';
無事にテーブル作成に成功しました。
JSONにクエリを投げる
まずは、そのままクエリを実行します。
SELECT
*
FROM records_201702
LIMIT 10;
このままだとrecords以下の配列がそのまま出てきます。結局S3にあるファイル1つ分が1レコードになった状態です。
次にrecords配下の配列を分解して別々のレコードにしてあげます。
SELECT
record
FROM records_201702
CROSS JOIN UNNEST(records) AS t (record)
LIMIT 10;
records配下が配列ではなくなり、分解されたレコードになりました。
今度はカラムをそれぞれ表示させるようにします。階層が深いものは"."を付けて表現するだけでOKです。
SELECT
record.eventTime,
record.eventSource,
record.eventName,
record.awsRegion,
record.sourceIPAddress,
record.userIdentity.type,
record.userIdentity.arn
FROM records_201702
CROSS JOIN UNNEST(records) AS t (record)
LIMIT 10;
ここまで来ればもうJSONは忘れて大丈夫ですね。
もう大丈夫かと思いますが、ConsoleLogin
だけで絞り込んでみましょう。
SELECT
record.eventTime,
record.eventSource,
record.eventName,
record.awsRegion,
record.sourceIPAddress,
record.userIdentity.type,
record.userIdentity.arn
FROM records_201702
CROSS JOIN UNNEST(records) AS t (record)
WHERE record.eventName = 'ConsoleLogin'
LIMIT 10;
ここまで来れば、もうWHERE句で自由にフィルタできると思います。ログイン監査レポートのできあがりです。
まとめ
CREATE TABLE文と最後に紹介したSELECT文だけコピペしてもらえれば、10分もかけずに実行結果が得られるはずです。Athenaを試してみたことが無い方にもぜひ試してみていただきたいです。調査の度にJSONと格闘するのもなかなか辛いと思いますので、やってみたブログをぜひご活用いただければと思います。
どこかの誰かのお役に立てば嬉しいです。