CloudTrailログからAmazon Athenaを使ってログイン監査レポートを作成する

2017.02.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

西澤です。CloudTrailって本当に素晴らしい機能なのですが、ログ出力の形式がJSONなので、そのままだと扱いづらかったりします。これに対する対処法については、まず下記の素晴らしいブログをご覧ください。

Amazon Athenaを使ってJSONファイルを検索してみる

AWS AthenaでCloudTrailのS3オブジェクトログを解析をしてみました!

で、CloudTrailログに対するクエリをいざやってみようと思ったのですが、HadoopもPrestoも触ったことがなかった私にはまだちょっと難しい。今回こちらに記録しておくことで、自分が再利用できるようにしておきたいと考えました。

前提

本記事では、下記を前提とさせていただきますので、ご不明点がある方は事前にご確認ください。

  • CloudTrailログがS3バケットに出力されるよう設定されている
  • Ahtenaのクエリ性能や課金に関しては考慮しない、なので、Athenaパーティションについては考えない

CloudTrailログにAthenaからクエリする

CloudTrailログのフォーマットを確認して出力したい項目を決める

まずは、CloudTrailログのフォーマットを確認しておきましょう。下記ハイライトされた項目のみを出力しようと考えました。

{
	"Records": [{
		"eventVersion": "1.01",
		"userIdentity": {
			"type": "IAMUser",
			"principalId": "AIDAJDPLRKLG7UEXAMPLE",
			"arn": "arn:aws:iam::123456789012:user/Alice",
			"accountId": "123456789012",
			"accessKeyId": "AKIAIOSFODNN7EXAMPLE",
			"userName": "Alice",
			"sessionContext": {
				"attributes": {
					"mfaAuthenticated": "false",
					"creationDate": "2014-03-18T14:29:23Z"
				}
			}
		},
		"eventTime": "2014-03-18T14:30:07Z",
		"eventSource": "cloudtrail.amazonaws.com",
		"eventName": "StartLogging",
		"awsRegion": "us-west-2",
		"sourceIPAddress": "72.21.198.64",
		"userAgent": "signin.amazonaws.com",
		"requestParameters": {
			"name": "Default"
		},
		"responseElements": null,
		"requestID": "cdc73f9d-aea9-11e3-9d5a-835b769c0d9c",
		"eventID": "3074414d-c626-42aa-984b-68ff152d6ab7"
	},
    ... additional entries ...
	]

Athenaでテーブルを作成する

records以下がARRAY、userIdentity以下が1つ階層が深いのでSTRUCTになるところがポイントだと思います。LOCATIONに指定するS3のフォルダ配下の全てがスキャン範囲(課金範囲)になるのでご注意ください。

CREATE EXTERNAL TABLE IF NOT EXISTS records_201702 (
  records ARRAY<
    STRUCT<
      eventTime:STRING,
      eventSource:STRING,
      eventName:STRING,
      awsRegion:STRING,
      sourceIPAddress:STRING,
      userIdentity:STRUCT<
        type:STRING,
        arn:STRING
      >
    >
  >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://[CloudTrailログ出力バケット名]/[prefix指定があれば]/AWSLogs/[AWSアカウントID]/CloudTrail/[Region]/2017/02';

athena_create_table

無事にテーブル作成に成功しました。

JSONにクエリを投げる

まずは、そのままクエリを実行します。

SELECT
  *
FROM records_201702
LIMIT 10;

このままだとrecords以下の配列がそのまま出てきます。結局S3にあるファイル1つ分が1レコードになった状態です。

athena_query_1

次にrecords配下の配列を分解して別々のレコードにしてあげます。

SELECT
  record
FROM records_201702
CROSS JOIN UNNEST(records) AS t (record)
LIMIT 10;

records配下が配列ではなくなり、分解されたレコードになりました。

athena_query_2

今度はカラムをそれぞれ表示させるようにします。階層が深いものは"."を付けて表現するだけでOKです。

SELECT
  record.eventTime,
  record.eventSource,
  record.eventName,
  record.awsRegion,
  record.sourceIPAddress,
  record.userIdentity.type,
  record.userIdentity.arn
FROM records_201702
CROSS JOIN UNNEST(records) AS t (record)
LIMIT 10;

ここまで来ればもうJSONは忘れて大丈夫ですね。

athena_query_3

もう大丈夫かと思いますが、ConsoleLoginだけで絞り込んでみましょう。

SELECT
  record.eventTime,
  record.eventSource,
  record.eventName,
  record.awsRegion,
  record.sourceIPAddress,
  record.userIdentity.type,
  record.userIdentity.arn
FROM records_201702
CROSS JOIN UNNEST(records) AS t (record)
WHERE record.eventName = 'ConsoleLogin'
LIMIT 10;

ここまで来れば、もうWHERE句で自由にフィルタできると思います。ログイン監査レポートのできあがりです。

athena_query_4

まとめ

CREATE TABLE文と最後に紹介したSELECT文だけコピペしてもらえれば、10分もかけずに実行結果が得られるはずです。Athenaを試してみたことが無い方にもぜひ試してみていただきたいです。調査の度にJSONと格闘するのもなかなか辛いと思いますので、やってみたブログをぜひご活用いただければと思います。

どこかの誰かのお役に立てば嬉しいです。