DuckDBでイベント履歴からダウンロードしたAWS CloudTrailのイベントを分析するための取り込み方法と簡単なクエリを作ってみた
こんにちは、臼田です。
みなさん、AWSのイベント分析してますか?(挨拶
今回は先日書いたAWS CloudTrailのイベント分析の続きです。実際にイベントをDuckDBで取り込んで分析してみます。
前置き
AWS CloudTrailのイベントはよくAmazon Athenaを利用して分析されます。それでもちろん全く問題ないのですが、例えばS3に保存されていなかったりS3のデータやAmazon Athenaに対する権限がない場合など、その手段を利用できないケースがあります。
そんな時でも、AWS CloudTrailの「イベント履歴」ではイベントの閲覧やその場での簡単なフィルタなどは活用できます。
しかし、複数項目にまたがった条件を利用したいときなどはやはりAmazon Athenaが無いと厳しい。という時にDuckDBを利用してローカルで分析する手法が使えそうです。
イベント取得の性質などは上述したブログで詳しく書いてあるので、イベント取得についてはそちらを参照してください。
DuckDBにデータを取り込む
いざデータを取り込んでいきますが、細かいデータ構造を定義していくのは大変です。
DuckDBではいい感じに取り込んでくれるread_json()
関数などがありますのでこれを積極的に活用します。対象のイベントはjsonです。なぜCSVではないのか、は上述ブログを参照してください。
試しにそのまま関数を使ってみます。
D SELECT * FROM read_json('event_history.json');
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Records │
│ struct(eventversion varchar, useridentity struct("type" varchar, principalid varchar, arn varchar, accountid varchar, accesskeyi… │
├────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ [{'eventVersion': 1.11, 'userIdentity': {'type': AssumedRole, 'principalId': AROAXXXXXXXXXXXXXXXXX:8d606eed3f3735d5a49a976df6dfc… │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
イベント履歴からダウンロードしたイベントのjsonでは、そのままread_jsonしてもすべてのログが1つの構造体となりうまく取り込めません。適度なレベルにunnestしてあげる必要があります。
次のパターンを試してみます。
D SELECT * FROM
(SELECT unnest(Records, max_depth := 2) AS event
FROM read_json('event_history.json'));
┌──────────────┬──────────────────────┬─────────────────────┬───┬────────────────────┬───────────────┬──────────────────────┐
│ eventVersion │ userIdentity │ eventTime │ … │ recipientAccountId │ eventCategory │ additionalEventData │
│ varchar │ struct("type" varc… │ timestamp │ │ varchar │ varchar │ struct(configrulen… │
├──────────────┼──────────────────────┼─────────────────────┼───┼────────────────────┼───────────────┼──────────────────────┤
│ 1.11 │ {'type': AssumedRo… │ 2025-05-04 13:11:32 │ … │ 12345678912 │ Management │ NULL │
│ 1.11 │ {'type': AssumedRo… │ 2025-05-04 13:11:32 │ … │ 12345678912 │ Management │ NULL │
│ 1.08 │ {'type': AssumedRo… │ 2025-05-04 13:11:29 │ … │ 12345678912 │ Management │ {'configRuleName':… │
[手動の省略]
│ 1.08 │ {'type': AssumedRo… │ 2025-05-04 13:09:58 │ … │ 12345678912 │ Management │ {'configRuleName':… │
│ 1.08 │ {'type': AssumedRo… │ 2025-05-04 13:09:58 │ … │ 12345678912 │ Management │ {'configRuleName':… │
│ 1.08 │ {'type': AssumedRo… │ 2025-05-04 13:09:58 │ … │ 12345678912 │ Management │ {'configRuleName':… │
├──────────────┴──────────────────────┴─────────────────────┴───┴────────────────────┴───────────────┴──────────────────────┤
│ 61 rows (40 shown) 18 columns (6 shown) │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
いい感じになりましたね。しかし、このクエリではAWS CloudTrailのイベントを最大まで取得するとエラーとなります。
D SELECT * FROM
(SELECT unnest(Records, max_depth := 2) AS event
FROM read_json('event_history (4).json'));
Invalid Input Error:
"maximum_object_size" of 16777216 bytes exceeded while reading file "event_history (4).json" (>33554428 bytes).
Try increasing "maximum_object_size".
というわけで、容量も考慮した取り込みは以下のように行います。ついでにこれでテーブルを作りましょう。
D SELECT * FROM
(SELECT unnest(Records, max_depth := 2) AS event
FROM read_json('event_history (4).json', maximum_object_size=367001600));
┌──────────────┬──────────────────────┬─────────────────────┬───┬──────────────────────┬────────────┬──────────────────────┐
│ eventVersion │ userIdentity │ eventTime │ … │ vpcEndpointAccountId │ apiVersion │ serviceEventDetails │
│ varchar │ struct("type" varc… │ timestamp │ │ varchar │ varchar │ struct(repositoryn… │
├──────────────┼──────────────────────┼─────────────────────┼───┼──────────────────────┼────────────┼──────────────────────┤
│ 1.11 │ {'type': AssumedRo… │ 2025-05-04 17:24:12 │ … │ NULL │ NULL │ NULL │
│ 1.08 │ {'type': AssumedRo… │ 2025-05-04 17:24:12 │ … │ NULL │ NULL │ NULL │
│ 1.10 │ {'type': AssumedRo… │ 2025-05-04 17:24:12 │ … │ NULL │ NULL │ NULL │
[手動の省略]
│ 1.11 │ {'type': AssumedRo… │ 2025-04-19 19:52:13 │ … │ NULL │ NULL │ NULL │
│ 1.08 │ {'type': AWSServic… │ 2025-04-19 19:52:09 │ … │ NULL │ NULL │ NULL │
│ 1.08 │ {'type': AWSServic… │ 2025-04-19 19:52:04 │ … │ NULL │ NULL │ NULL │
├──────────────┴──────────────────────┴─────────────────────┴───┴──────────────────────┴────────────┴──────────────────────┤
│ 200000 rows (40 shown) 28 columns (6 shown) │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
ちなみに、取り込むイベントの種類にもよりますが、だいたい以下のようなカラムの構造になります。
D DESCRIBE (SELECT unnest(Records, max_depth := 2) AS event
FROM read_json('event_history (4).json', maximum_object_size=367001600));
┌──────────────────────┬─────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │ column_type │ null │ key │ default │ extra │
│ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │
├──────────────────────┼─────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ eventVersion │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ userIdentity │ STRUCT("type" VARCHAR, principalId VARCHAR, arn VARCHAR, accountI… │ YES │ NULL │ NULL │ NULL │
│ eventTime │ TIMESTAMP │ YES │ NULL │ NULL │ NULL │
│ eventSource │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ eventName │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ awsRegion │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ sourceIPAddress │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ userAgent │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ requestParameters │ MAP(VARCHAR, JSON) │ YES │ NULL │ NULL │ NULL │
│ responseElements │ MAP(VARCHAR, JSON) │ YES │ NULL │ NULL │ NULL │
│ requestID │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ eventID │ UUID │ YES │ NULL │ NULL │ NULL │
│ readOnly │ BOOLEAN │ YES │ NULL │ NULL │ NULL │
│ eventType │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ managementEvent │ BOOLEAN │ YES │ NULL │ NULL │ NULL │
│ recipientAccountId │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ eventCategory │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ tlsDetails │ STRUCT(tlsVersion VARCHAR, cipherSuite VARCHAR, clientProvidedHos… │ YES │ NULL │ NULL │ NULL │
│ sessionCredentialF… │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ errorCode │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ errorMessage │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ resources │ STRUCT(accountId VARCHAR, "type" VARCHAR, ARN VARCHAR)[] │ YES │ NULL │ NULL │ NULL │
│ sharedEventID │ UUID │ YES │ NULL │ NULL │ NULL │
│ additionalEventData │ STRUCT(SignatureVersion VARCHAR, CipherSuite VARCHAR, bytesTransf… │ YES │ NULL │ NULL │ NULL │
│ vpcEndpointId │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ vpcEndpointAccountId │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ apiVersion │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ serviceEventDetails │ STRUCT(repositoryName VARCHAR, lifecycleEventPolicy STRUCT(lifecy… │ YES │ NULL │ NULL │ NULL │
├──────────────────────┴─────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┤
│ 28 rows 6 columns │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
イベントの調査クエリ
AWS CloudTrailのイベント分析は、本当に様々な用途で行われますが、以下はあまり自分が把握していない環境で何が起きているのか全体感を掴むところから始めていく感じのクエリ、というコンセプトでいくつか調査クエリを紹介します。
とりあえず1行取得
一応書いておきますね
D SELECT * FROM cloudtrail_events LIMIT 1;
┌──────────────┬──────────────────────┬─────────────────────┬───┬────────────────────┬───────────────┬──────────────────────┐
│ eventVersion │ userIdentity │ eventTime │ … │ recipientAccountId │ eventCategory │ additionalEventData │
│ varchar │ struct("type" varc… │ timestamp │ │ varchar │ varchar │ struct(configrulen… │
├──────────────┼──────────────────────┼─────────────────────┼───┼────────────────────┼───────────────┼──────────────────────┤
│ 1.11 │ {'type': AssumedRo… │ 2025-05-04 13:11:32 │ … │ 123456789012 │ Management │ NULL │
├──────────────┴──────────────────────┴─────────────────────┴───┴────────────────────┴───────────────┴──────────────────────┤
│ 1 rows 18 columns (6 shown) │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
ユーザー/ロール毎の実行回数
どんなユーザーやロールが存在するか、行動しているかを確認します。
D SELECT
CASE userIdentity.type
WHEN 'AssumedRole' THEN COALESCE(userIdentity.sessionContext.sessionIssuer.userName, 'Unknown Role')
ELSE 'Other-' || userIdentity.type
END AS user_name,
COUNT(*) AS event_count
FROM cloudtrail_events
GROUP BY user_name
ORDER BY event_count DESC;
┌─────────────────────────────────────────────────────┬─────────────┐
│ user_name │ event_count │
│ varchar │ int64 │
├─────────────────────────────────────────────────────┼─────────────┤
│ cm-config-role-all-regions │ 45 │
│ cm-alert-eventrule-role │ 8 │
│ Amazon_EventBridge_Invoke_Step_Functions_1079076398 │ 8 │
└─────────────────────────────────────────────────────┴─────────────┘
上記イベントにはIAMUser
が存在しなかったので含めていませんが、いる場合は下記クエリになります。だれかいい書き方あったら教えて下さい。
D SELECT
CASE userIdentity.type
WHEN 'AssumedRole' THEN COALESCE(userIdentity.sessionContext.sessionIssuer.userName, 'Unknown Role')
WHEN 'IAMUser' THEN COALESCE(userIdentity.userName, 'Unknown User')
ELSE 'Other-' || userIdentity.type
END AS user_name,
COUNT(*) AS event_count
FROM cloudtrail_events
GROUP BY user_name
ORDER BY event_count DESC;
イベント名毎の実行回数
今度はどんなAPIイベントが実行されているかを集計します。
D SELECT
eventName,
COUNT(*) AS event_count
FROM cloudtrail_events
GROUP BY eventName
ORDER BY event_count DESC;
┌────────────────┬─────────────┐
│ eventName │ event_count │
│ varchar │ int64 │
├────────────────┼─────────────┤
│ PutEvaluations │ 45 │
│ StartExecution │ 16 │
└────────────────┴─────────────┘
サービス毎の実行回数
サービス(eventSource
)毎でも見てみます。上記のイベント名だけだとどのサービスか分かりづらいことがありますからね。
D SELECT
eventSource,
COUNT(*) AS event_count
FROM cloudtrail_events
GROUP BY eventSource
ORDER BY event_count DESC;
┌──────────────────────┬─────────────┐
│ eventSource │ event_count │
│ varchar │ int64 │
├──────────────────────┼─────────────┤
│ config.amazonaws.com │ 45 │
│ states.amazonaws.com │ 16 │
└──────────────────────┴─────────────┘
イベント名 + サービス名の実行回数
それならまとめちゃおう、というクエリ。多分最初からこれがいいですね。
D SELECT
eventName,
COUNT(*) AS event_count,
eventSource
FROM cloudtrail_events
WHERE readOnly = 'false'
GROUP BY eventName, eventSource
ORDER BY event_count DESC;
┌────────────────┬─────────────┬──────────────────────┐
│ eventName │ event_count │ eventSource │
│ varchar │ int64 │ varchar │
├────────────────┼─────────────┼──────────────────────┤
│ PutEvaluations │ 45 │ config.amazonaws.com │
│ StartExecution │ 16 │ states.amazonaws.com │
└────────────────┴─────────────┴──────────────────────┘
変更系のイベント実行数
readOnly
を見ることで、変更系のイベントがあるか確認できます。true
の場合、変更系(作成や削除など状態が変わるもの)のイベントです。(この実行例では含んでいませんね)
D SELECT
readOnly,
COUNT(*) AS event_count
FROM cloudtrail_events
GROUP BY readOnly
ORDER BY event_count DESC;
┌──────────┬─────────────┐
│ readOnly │ event_count │
│ boolean │ int64 │
├──────────┼─────────────┤
│ false │ 61 │
└──────────┴─────────────┘
送信元IP別実行回数
送信元IPアドレスを集計します。最近はAWSサービスからの実行の場合はサービスのeventSource
になるので、下記はそれだけが表示されています。
D SELECT
sourceIPAddress,
COUNT(*) AS event_count
FROM cloudtrail_events
GROUP BY sourceIPAddress
ORDER BY event_count DESC;
┌──────────────────────┬─────────────┐
│ sourceIPAddress │ event_count │
│ varchar │ int64 │
├──────────────────────┼─────────────┤
│ config.amazonaws.com │ 45 │
│ events.amazonaws.com │ 16 │
└──────────────────────┴─────────────┘
イベント名 + 送信元IP
どこから実行されているかも加えて分けてみる。
D SELECT
eventName,sourceIPAddress,
COUNT(*) AS event_count,
eventSource
FROM cloudtrail_events
GROUP BY eventName, sourceIPAddress, eventSource
ORDER BY event_count DESC;
┌────────────────┬──────────────────────┬─────────────┬──────────────────────┐
│ eventName │ sourceIPAddress │ event_count │ eventSource │
│ varchar │ varchar │ int64 │ varchar │
├────────────────┼──────────────────────┼─────────────┼──────────────────────┤
│ PutEvaluations │ config.amazonaws.com │ 45 │ config.amazonaws.com │
│ StartExecution │ events.amazonaws.com │ 16 │ states.amazonaws.com │
└────────────────┴──────────────────────┴─────────────┴──────────────────────┘
おまけ: 省略しないで全部出力する
DuckDBのデフォルトの出力方式であるduckbox
は非常にフレンドリーで嬉しいのですが、分析中は省略しないで全部出してほしいときがあります。
一方で、ファイル出力や出力フォーマットをCSVや詰めた形に変えるなどではなくあくまで手元で見れる状態のまま、特に縦の行数を減らさずに見たい、という場合もあります。
そんなときは.output
だけ実行してあげると省略されなくなります。理由はドキュメントを見てもよくわからなかったです。参照先ドキュメントはフォーマットの話とドットコマンドの話。本来デフォルトの標準出力タイプに戻すコマンドのはずなんですが、全部出してくれるようになります。
横が間延びするのは仕方ないので、クエリでカラム数を減らします。
なお、戻し方がわからないので実行するときはお気をつけください。(誰か戻し方教えて下さい)
まとめ
他にも主要なカラムをかけ合わせて状況を見ていく、という形であればいろんな組み合わせのクエリがありますが、一旦こんなところにしておきましょう。
ちょっとした分析をしたい時に強力に動作してくれるDuckDBはこういった時に心強いですね。
ぜひパッと使えるようにしておきましょう。