一定期間利用のないQuickSightのリーダー(閲覧者)アカウントを判別し、自動で削除する
データ事業本部インテグレーション部機械学習チーム・新納(にいの)です。
2024年5月にQuickSightの料金体系が変更されるとアナウンスされました。既存アカウントは2025年5月1日まで旧料金体系で利用できますが、新しく作成したQuickSightアカウントには新料金体系が適用されます。
中でも、リーダー(閲覧者)の課金体系の変更はインパクトが大きいのではないでしょうか。これまでは実際にダッシュボードを見た分だけ料金がかかっていましたが、今後は使用頻度に関係なく、全てのリーダーアカウントに月額3ドルの固定料金がかかることになります。そのため、あまり使っていないリーダーアカウントをそのままにしておくと、使用していなくても毎月料金が発生してしまいます。
思いもよらぬ課金を防ぐため、一定期間利用のないリーダーアカウントを自動で削除する仕組みを作成してみました。
利用環境・前提
今回は以下の仕組みで自動削除を実現しています。
ポイント:
-
CloudTrailのログをAthenaでテーブル化しておく
-
CloudTrailログのQuickSightのGetDashboardイベントをLambdaからクエリし、一定期間にアクセスがあるかどうかをチェック
- アクセスがないユーザーは削除
-
EventBridgeからLambdaを定期的に実行する
QuickSightの最終アクセス時間はQuickSightの画面上から確認可能ですが、2024年10月現在、API経由では取得できません。このため、CloudTrailログからQuickSightのGetDashboardのイベントが最後に発生した時間を最終アクセス時間とみなします。
LambdaからCloudTrailのAPIを直接使用する方法もありますが、APIの使用回数に制限があることと、1回のリクエストで50件までしか取得できないという制限があります。これらの制限に対応するには少し複雑な処理が必要になるため、より簡単に実装できるAthenaへのクエリを採用することにしました。
CloudTrailログをAthenaでテーブル化し、ビューを作成
CloudTrailログをパーティションを設定してテーブル化
CloudTrailのログをAthenaのテーブルとして設定していない方は、監査やイベント追跡など、普段の運用や障害対応でも役立つのでこの機会に設定することをおすすめします。
今回は以下のブログを参考に、日付でパーティション化したテーブルを作成します。パーティション化することでクエリした時のスキャン量を減らし、余計なデータをスキャンして無駄な課金を発生させないようにできます。
通常、パーティションは新しい日付が追加されるたびにロードが必要ですが、設定した内容で自動でパーティション作成をしてくれるPartiton Projectionという仕組みを使えばロードが不要になります。
この仕組みを使ってCloudTrailログをテーブル化してみましょう。マネジメントコンソールに移動し、CloudTailの画面から「イベント履歴」→「Athenaテーブルを作成」を押すと以下のように自動でCREATE TABLE文を作成してくれます。このままでもテーブル自体は作れますが、Partition Projectionの設定がされていないので作成は押さずにコピーしてAthenaの画面へ遷移します。
先ほどコピーしたCREATE TABLE文にPartition Projectionの設定を追加し、テーブルを作成します。以下は設定を追加したクエリです。
CREATE EXTERNAL TABLE <任意のテーブル名> (
eventVersion STRING,
userIdentity STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
invokedBy: STRING,
accessKeyId: STRING,
userName: STRING,
sessionContext: STRUCT<
attributes: STRUCT<
mfaAuthenticated: STRING,
creationDate: STRING>,
sessionIssuer: STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
username: STRING>,
ec2RoleDelivery: STRING,
webIdFederationData: MAP<STRING,STRING>>>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestParameters STRING,
responseElements STRING,
additionalEventData STRING,
requestId STRING,
eventId STRING,
resources ARRAY<STRUCT<
arn: STRING,
accountId: STRING,
type: STRING>>,
eventType STRING,
apiVersion STRING,
readOnly STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcEndpointId STRING,
tlsDetails STRUCT<
tlsVersion: STRING,
cipherSuite: STRING,
clientProvidedHostHeader: STRING>
)
COMMENT 'CloudTrail table for <S3バケット名> bucket'
PARTITIONED BY (region string, date string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<S3バケット名>/AWSLogs/<AWSアカウントID>/CloudTrail/'
TBLPROPERTIES (
'projection.enabled' = 'true',
'projection.date.type' = 'date',
'projection.date.range' = 'NOW-1YEARS,NOW',
'projection.date.format' = 'yyyy/MM/dd',
'projection.date.interval' = '1',
'projection.date.interval.unit' = 'DAYS',
'projection.region.type' = 'enum',
'projection.region.values'='us-east-1,us-east-2,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1',
'storage.location.template' = 's3://<S3バケット名>/AWSLogs/<AWSアカウントID>/CloudTrail/${region}/${date}',
'classification'='cloudtrail',
'compressionType'='gzip',
'typeOfData'='file'
);
ポイント:
- PARTITIONED BY句でregionとdateでパーティション化
- ROW FORMAT SERDEを 'org.apache.hive.hcatalog.data.JsonSerDe'に変更
- TBLPROPERTIES句の'projection'でPartition Projectionの設定を追加
Partition Projection自体の詳細な以下のエントリをご参照ください。
- 【全リージョン対応】CloudTrailのログをAthenaのPartition Projectionなテーブルで作る | DevelopersIO
- [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | DevelopersIO
実行するとAthenaでパーティション化されたテーブルの作成が確認できます。
QuickSightのイベントソースに特化したビューを作成
上記のテーブルを直接クエリしてもいいのですが、今回はQuickSightでのダッシュボードアクセスの履歴さえわかればいいので、ある程度条件でフィルタし、クエリしやすいようにuserIdentityカラムをフラット化したビューを作成します。
CREATE VIEW <任意のビュー名> AS
SELECT eventVersion,
userIdentity.type AS userIdentity_type,
userIdentity.principalId AS userIdentity_principalId,
userIdentity.arn AS userIdentity_arn,
userIdentity.accountId AS userIdentity_accountId,
userIdentity.invokedBy AS userIdentity_invokedBy,
userIdentity.accessKeyId AS userIdentity_accessKeyId,
userIdentity.userName AS userIdentity_userName,
userIdentity.sessionContext.attributes.mfaAuthenticated AS userIdentity_sessionContext_attributes_mfaAuthenticated,
userIdentity.sessionContext.attributes.creationDate AS userIdentity_sessionContext_attributes_creationDate,
userIdentity.sessionContext.sessionIssuer.type AS userIdentity_sessionContext_sessionIssuer_type,
userIdentity.sessionContext.sessionIssuer.principalId AS userIdentity_sessionContext_sessionIssuer_principalId,
userIdentity.sessionContext.sessionIssuer.arn AS userIdentity_sessionContext_sessionIssuer_arn,
userIdentity.sessionContext.sessionIssuer.accountId AS userIdentity_sessionContext_sessionIssuer_accountId,
userIdentity.sessionContext.sessionIssuer.userName AS userIdentity_sessionContext_sessionIssuer_userNam,
eventTime,
eventSource,
eventName,
awsRegion,
sourceIpAddress,
userAgent,
errorCode,
errorMessage,
requestParameters,
responseElements,
additionalEventData,
requestId,
eventId,
resources,
eventType,
apiVersion,
readOnly,
recipientAccountId,
serviceEventDetails,
sharedEventID,
vpcEndpointId
FROM <CloudTrailログのテーブル名>
WHERE eventsource = 'quicksight.amazonaws.com'
AND eventName = 'GetDashboard'
AND awsRegion = 'ap-northeast-1'
実行イメージの一部はこちら。今回の用途以外にも、QuickSightのダッシュボードがどれくらい見られているかチェックするのにも役立ちます。
Lambdaでログをクエリし、リーダーアカウントを削除
Lambdaを利用し、上記のビューをクエリして一定期間利用のないリーダーアカウントを削除します。
処理の流れ
- QuickSightユーザーの一覧を取得
- READERロールのユーザーのみを処理
- 除外グループのメンバーはスキップ
- Athenaで最終アクセス日時を確認
- 非アクティブユーザーを削除
判定条件
- READERロールのみが対象
- 指定日数以内にアクセスがないユーザーを削除
- 除外グループのメンバーは削除対象外
IAMロール
今回の実行に必要なIAMポリシーは以下の通りです。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:ap-northeast-1:<AWSアカウントID>:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:ap-northeast-1:<AWSアカウントID>:log-group:/aws/lambda/<Lambda関数名>:*"
]
},
{
"Effect": "Allow",
"Action": [
"quicksight:ListUsers",
"quicksight:ListUserGroups",
"quicksight:DeleteUser"
],
"Resource": [
"arn:aws:quicksight:ap-northeast-1:<AWSアカウントID>:user/<QuickSightの名前空間名>/*",
"arn:aws:quicksight:ap-northeast-1:<AWSアカウントID>:group/<QuickSightの名前空間名>/*"
]
},
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
"Resource": [
"arn:aws:athena:ap-northeast-1:<AWSアカウントID>:workgroup/*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetDatabase"
],
"Resource": [
"arn:aws:glue:ap-northeast-1:<AWSアカウントID>:catalog",
"arn:aws:glue:ap-northeast-1:<AWSアカウントID>:database/*",
"arn:aws:glue:ap-northeast-1:<AWSアカウントID>:table/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject",
"s3:CreateBucket"
],
"Resource": [
"arn:aws:s3:::<S3バケット名>",
"arn:aws:s3:::<S3バケット名>/*",
"arn:aws:s3:::<Athenaのoutput locationに指定したS3バケット名>",
"arn:aws:s3:::<Athenaのoutput locationに指定したS3バケット名>/*"
]
}
]
}
Lambda関数
今回はPythonで作成しました。全体のコードは以下の通りです。
import boto3
import json
import os
from datetime import datetime, timezone, timedelta
def lambda_handler(event, context):
print("Starting Lambda execution")
# 環境変数から設定を読み込む
AWS_ACCOUNT_ID = os.environ["AWS_ACCOUNT_ID"]
QUICKSIGHT_NAMESPACE = os.environ["QUICKSIGHT_NAMESPACE"]
INACTIVITY_THRESHOLD = int(os.environ["INACTIVITY_THRESHOLD"])
EXCLUDED_GROUPS = os.environ["EXCLUDED_GROUPS"].split(",")
ATHENA_DATABASE = os.environ["ATHENA_DATABASE"]
ATHENA_VIEW = os.environ["ATHENA_VIEW"]
ATHENA_WORKGROUP = os.environ["ATHENA_WORKGROUP"]
print(f"Loaded environment variables: Database={ATHENA_DATABASE}, View={ATHENA_VIEW}, Workgroup={ATHENA_WORKGROUP}")
# AWS クライアントの初期化
quicksight = boto3.client("quicksight")
athena = boto3.client("athena")
# 現在の日時とINACTIVITY_THRESHOLD日前の日時を計算
current_date = datetime.now(timezone.utc)
start_date = current_date - timedelta(days=INACTIVITY_THRESHOLD)
def get_username_from_path(full_username):
"""
ユーザー名のパスから実際のユーザー名を取得する
例: 'aaaa/bbbb' -> 'bbbb'
"""
return full_username.split('/')[-1] if '/' in full_username else full_username
def is_user_in_excluded_group(username):
"""ユーザーが除外グループに属しているかチェックする関数"""
try:
response = quicksight.list_user_groups(
AwsAccountId=AWS_ACCOUNT_ID,
Namespace=QUICKSIGHT_NAMESPACE,
UserName=username,
)
user_groups = [group["GroupName"] for group in response["GroupList"]]
return any(group in EXCLUDED_GROUPS for group in user_groups)
except Exception as e:
print(f"Error checking groups for user {username}: {str(e)}")
return False
def get_user_last_activity(username):
"""Athenaを使用してユーザーの最後のアクティビティを取得"""
query = f"""
SELECT
parse_datetime(eventTime,'yyyy-MM-dd''T''HH:mm:ss''Z') as eventTime
FROM {ATHENA_DATABASE}.{ATHENA_VIEW}
WHERE COALESCE(userIdentity_userName,userIdentity_sessionContext_sessionIssuer_userNam) = '{username}'
AND from_iso8601_timestamp(eventTime) >= from_iso8601_timestamp('{start_date.strftime("%Y-%m-%dT%H:%M:%SZ")}')
ORDER BY eventTime DESC
LIMIT 1
"""
try:
# クエリ実行
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
"Database": ATHENA_DATABASE
},
WorkGroup=ATHENA_WORKGROUP
)
query_execution_id = response["QueryExecutionId"]
print(f"Query execution ID: {query_execution_id}")
# クエリ完了を待機
while True:
response = athena.get_query_execution(QueryExecutionId=query_execution_id)
state = response["QueryExecution"]["Status"]["State"]
if state in ["SUCCEEDED", "FAILED", "CANCELLED"]:
break
if state == "FAILED":
error_message = response["QueryExecution"]["Status"].get("StateChangeReason", "Unknown error")
print(f"Query failed for {username}. Error: {error_message}")
return False
if state == "SUCCEEDED":
# 結果を取得
results = athena.get_query_results(QueryExecutionId=query_execution_id)
# ヘッダー行を除いた結果の行数をチェック
if len(results["ResultSet"]["Rows"]) > 1:
# 最新のアクティビティ時間を取得
last_activity = results["ResultSet"]["Rows"][1]["Data"][0].get("VarCharValue")
if last_activity:
print(f"Last activity found for {username}: {last_activity}")
try:
# タイムスタンプ文字列をパース
last_activity_date = datetime.strptime(
last_activity.split('.')[0], # ミリ秒部分を削除
"%Y-%m-%d %H:%M:%S"
)
last_activity_date = last_activity_date.replace(tzinfo=timezone.utc)
# INACTIVITY_THRESHOLD日以内のアクティビティかチェック
is_active = last_activity_date > start_date
print(f"Is user active: {is_active}")
return is_active
except Exception as e:
print(f"Error parsing date for {username}: {str(e)}")
print(f"Raw last_activity value: {last_activity}")
return False
print(f"No activity found in results for {username}")
return False
else:
print(f"Query cancelled for {username}")
return False
except Exception as e:
print(f"Error executing query for {username}: {str(e)}")
return False
print("Starting user processing")
# QuickSightユーザーのリストを取得
paginator = quicksight.get_paginator("list_users")
for page in paginator.paginate(
AwsAccountId=AWS_ACCOUNT_ID, Namespace=QUICKSIGHT_NAMESPACE
):
for user in page["UserList"]:
full_username = user["UserName"]
username = get_username_from_path(full_username)
role = user["Role"]
# READERロールのユーザーのみを処理
if role != "READER":
print(f"Skipping non-READER user: {full_username} (Role: {role})")
continue
# 除外グループのチェック
if is_user_in_excluded_group(full_username):
print(f"User {full_username} is in an excluded group. Skipping.")
continue
print(f"Processing user: {full_username} (Actual username: {username})")
# ユーザーの最後のアクティビティを確認
has_recent_activity = get_user_last_activity(username)
if not has_recent_activity:
print(f"No recent activity found for user {full_username}. Deleting...")
try:
# ユーザーを削除
quicksight.delete_user(
AwsAccountId=AWS_ACCOUNT_ID,
Namespace=QUICKSIGHT_NAMESPACE,
UserName=full_username
)
pass
except Exception as e:
print(f"Error deleting user {full_username}: {str(e)}")
else:
print(
f"User {full_username} has been active within {INACTIVITY_THRESHOLD} days. Skipping."
)
print("Lambda execution completed")
return {"statusCode": 200, "body": json.dumps("Processing completed")}
Lamdbaの環境変数には以下の値を入力してください。
環境変数 | 値 |
---|---|
AWS_ACCOUNT_ID | AWSアカウントID |
QUICKSIGHT_NAMESPACE | QuickSightの名前空間名 |
INACTIVITY_THRESHOLD | 最終アクセスからの経過日数 |
EXCLUDED_GROUPS | 除外対象グループ(複数ある場合は,区切りで入力) |
ATHENA_DATABASE | CloudTrailログのテーブルがあるAthenaのデータベース名 |
ATHENA_VIEW | CloudTrailログのビュー名 |
ATHENA_WORKGROUP | Athenaのワークグループ名(事前にAthenaからoutput locationの設定が必要) |
Lambdaのテスト実行の際は「# ユーザーを削除」とコメントのある部分をコメントアウトしてから削除対象のユーザーが意図通りかどうか確認してください。
テスト実行で確認できたらEventBridgeスケジューラーを設定し、Lambdaをトリガーするように設定すれば完成です。EventBridgeの設定については以下をご参照ください。
最後に
QuickSightの一定期間使われていないリーダーアカウントを検知し、削除する仕組みのご紹介でした。もしアカウントが多く、実行時間が15分をすぎてしまいそうな場合は上述したPythonスクリプトをGlue Python Shellで実行させるのも一つの回避策として有効です。
今回作成したCloudTrailログのテーブルやビューは、イベントの追跡・監査や、QuickSightのダッシュボードがどれくらい使われているかなどを見るのにも便利ですので、この機会に活用してみてください。