スナップショットの実データ量と経過日数を取得して削除優先度をスコアリングしてみた
こんにちは。クラウド事業本部の木村です。
EBSスナップショット、気づくと大量に溜まっていてコストになっていることはないでしょうか。手動で取得したスナップショットや、過去の運用で作成されたものが不要になっていてもそのまま残置されているケースは結構多いかと思います。
こうしたスナップショットはコストがかかり続けるので、全部確認し不要なものは削除するのが理想です。
ただ数百件を超えると1件ずつ確認していくのは現実的ではありませんし、小さなサイズのスナップショットの場合、必要かを検討する時間にかかる人的コストの方が削減効果より高くなってしまうケースもあるかと思います。
そこで今回はスナップショットに発生している実コストと経過期間を元に対応の優先順位をつけるPythonスクリプトをClaude Codeを利用して作ってみましたのでその内容を紹介させていただきます。
EBSスナップショットの課金の仕組み
EBSスナップショットは増分バックアップです。そのため初回のスナップショットはボリューム上の使用ブロック全体を保存しますが、2回目以降は前回から変更があったブロックだけを保存する形式です。
- ボリューム (100 GiBのボリューム)
- スナップショット A(初回): 使用ブロック全体を保存 → 例: 60 GiB 分
- スナップショット B(2回目): Snap A からの差分のみ保存 → 例: 5 GiB 分
- スナップショット C(3回目): Snap B からの差分のみ保存 → 例: 23 GiB 分
上記のようなボリュームのスナップショットの場合、課金対象となるのは60 + 5 + 23 = 88 GiB分になります。
上記の通り課金対象は初回スナップショット + 増分の分になるため、コンソール画面に表示されているフルスナップショットやボリュームサイズで記載されている数字を元に削減効果を測定すると過大評価になるため注意が必要です。
複数のスナップショットがある場合の削減効果
同一ボリュームに先ほどの例のように複数のスナップショットのチェーンがある場合、削除してもデータが隣接するスナップショットに課金対象が吸収されるため、位置によって削減効果が異なります。
- ボリューム (100 GiBのボリューム)
- スナップショット A(初回)のみ削除 → 削除してもスナップショット Bに課金対象が吸収される → 削減効果少
- スナップショット B(2回目)のみ削除→ 削除してもスナップショット Cに課金対象が吸収される → 削減効果少
- スナップショット C(3回目)のみ削除→ Cで追加された増分の削除につながる → 23 GiB 分の削除効果
チェーン内のスナップショットを効率的に削減したい場合は、末尾から順に削除するかチェーン全体を削除するのが効果的です。
厳密にはチェーンの先頭や中間を削除した場合でも削減効果は0ではないのですが、スクリプトで正確な算出を行うハードルが高いです。そのため今回のスクリプトでは保守的に0として算出しています。
実際のコスト削減効果の測定のため、処理の中でスナップショットを以下の4つに分類しています。
| チェーン位置 | 削減効果 |
|---|---|
| sole(単独) | コスト全額 |
| last(末尾) | 増分コスト分 |
| first(先頭) | 削減効果少(スクリプトでは0で計算) |
| middle(中間) | 削減効果少(スクリプトでは0で計算) |
スコアリングの設計
今回のスコアはティアベースで、実データ量に基づく月額推定削減額、取得からの経過日数、ソースボリュームが削除済みかどうかの3要素を元に算出しています。
スコア体系
3つの要素で10点満点になるようにしています。
- 実データ量に基づく月額推定削減額
削除した場合の月額削減推定額に基づいて、以下のスコアをつけています。
| 月額削減額 | スコア |
|---|---|
| 50 USD 以上 | 6 |
| 20 USD 以上 | 5 |
| 10 USD 以上 | 4 |
| 5 USD 以上 | 3 |
| 1 USD 以上 | 2 |
| 0.50 USD 以上 | 1 |
| 0.50 USD 未満 | 0 |
削除対応の目的がコスト削減であるため、他の項目よりも比重を重くしています。
- 取得からの経過日数
スナップショットの作成からの経過期間に基づいて、以下のスコアをつけています。
| 経過期間 | スコア |
|---|---|
| 3年以上 | 3 |
| 1年以上 | 2 |
| 半年以上 | 1 |
| 半年未満 | 0 |
- ソースボリュームが削除済み
元になるボリュームが削除されているスナップショットは、バックアップとして必要とされている可能性が低くなるため、+1点のボーナスを付与しています。
アクション区分
スコアに基づいて4段階のアクション区分を割り当てます。
| スコア | アクション |
|---|---|
| 8〜10 | 要対応 |
| 5〜7 | 確認推奨 |
| 1〜4 | 低優先 |
| 0 | 対象外 |
やってみた
実際に作成したスクリプトが以下になります。
"""
EBS Snapshot Priority Scorer
EBS Direct API (ListSnapshotBlocks / ListChangedBlocks) を使って
スナップショットの実データ量を取得し、ティアベーススコアリングで
削除優先順位を判定するスクリプト。
スコアリング:
コストティア(0-6) + 経過期間ティア(0-3) + 削除済みボーナス(0-1) = 0-10点
使い方:
python ebs_snapshot_scorer.py [--region REGION] [--output DIR]
"""
import argparse
import csv
import os
import random
import sys
import time
from collections import defaultdict
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
# --- 定数 ---
BLOCK_SIZE_BYTES = 524288 # 512 KiB
BYTES_PER_GIB = 1024**3
DEFAULT_PRICE_PER_GB_MONTH = 0.05 # USD (ap-northeast-1 / us-east-1)
ARCHIVE_PRICE_PER_GB_MONTH = 0.0125 # USD
API_MAX_RESULTS = 10000
MAX_RETRIES = 5
BASE_BACKOFF_SEC = 1.0
# --- スコアリング閾値 ---
COST_TIERS = [
(50.0, 6),
(20.0, 5),
(10.0, 4),
(5.0, 3),
(1.0, 2),
(0.50, 1),
(0.0, 0),
]
AGE_TIERS = [
(1095, 3), # 3年
(365, 2), # 1年
(180, 1), # 半年
(0, 0),
]
def get_cost_tier(savings_usd):
"""月額削減額からコストティア(0-6)を返す"""
for threshold, score in COST_TIERS:
if savings_usd >= threshold:
return score
return 0
def get_age_tier(age_days):
"""経過日数から期間ティア(0-3)を返す"""
for threshold, score in AGE_TIERS:
if age_days >= threshold:
return score
return 0
# ============================================================
# Phase 1: スナップショット・ボリューム情報の収集
# ============================================================
def get_all_snapshots(ec2_client):
"""自アカウントの全スナップショットをページネーションで取得"""
snapshots = []
paginator = ec2_client.get_paginator("describe_snapshots")
for page in paginator.paginate(OwnerIds=["self"]):
snapshots.extend(page["Snapshots"])
if len(snapshots) % 200 == 0 and len(snapshots) > 0:
print(f" ... {len(snapshots)} 件取得済み")
return snapshots
def get_existing_volume_ids(ec2_client):
"""現存するボリュームIDのセットを返す"""
volume_ids = set()
paginator = ec2_client.get_paginator("describe_volumes")
for page in paginator.paginate():
for vol in page["Volumes"]:
volume_ids.add(vol["VolumeId"])
return volume_ids
def get_tag_value(tags, key):
"""タグリストから指定キーの値を取得"""
if not tags:
return ""
for tag in tags:
if tag.get("Key") == key:
return tag.get("Value", "")
return ""
# ============================================================
# Phase 1.5: AMI 参照検出
# ============================================================
def get_ami_snapshot_map(ec2_client):
"""自アカウントの全AMIを取得し、スナップショットID → AMI ID のマップを返す。
AMI参照中のスナップショットは:
- 削除前に deregister-image が必要
- クロスボリュームのブロック共有により、コストが過大評価される可能性がある
"""
snap_to_ami = {}
paginator = ec2_client.get_paginator("describe_images")
for page in paginator.paginate(Owners=["self"]):
for image in page["Images"]:
ami_id = image["ImageId"]
for bdm in image.get("BlockDeviceMappings", []):
ebs = bdm.get("Ebs", {})
snap_id = ebs.get("SnapshotId")
if snap_id:
snap_to_ami[snap_id] = ami_id
return snap_to_ami
# ============================================================
# Phase 2: スナップショットの分類
# ============================================================
def classify_snapshots(snapshots, existing_volume_ids, ami_snapshot_map=None):
"""
スナップショットを分類する。
chain_type:
- sole: そのボリュームの唯一のスナップショット
- first: チェーンの先頭
- middle: チェーンの中間
- last: チェーンの末尾
volume_deleted: ソースボリュームが存在しない場合 True
ami_snapshot_map: {snapshot_id: ami_id} のマップ。指定されていれば ami_referenced フラグを設定。
"""
if ami_snapshot_map is None:
ami_snapshot_map = {}
# ボリュームID別にグループ化
# vol-ffffffff はソースボリューム情報が失われた古いスナップショット。
# 実際は異なるボリューム由来のため、各スナップショットを独立扱いする。
UNKNOWN_VOLUME_IDS = {"vol-ffffffff", ""}
volume_groups = defaultdict(list)
for snap in snapshots:
vol_id = snap.get("VolumeId", "")
if vol_id in UNKNOWN_VOLUME_IDS:
volume_groups[f"{vol_id}_{snap['SnapshotId']}"].append(snap)
else:
volume_groups[vol_id].append(snap)
# 各グループ内を StartTime でソート
for vol_id in volume_groups:
volume_groups[vol_id].sort(key=lambda s: s["StartTime"])
classified = []
for vol_id, snaps in volume_groups.items():
volume_deleted = vol_id not in existing_volume_ids
is_sole = len(snaps) == 1
for i, snap in enumerate(snaps):
if is_sole:
chain_type = "sole"
elif i == 0:
chain_type = "first"
elif i == len(snaps) - 1:
chain_type = "last"
else:
chain_type = "middle"
prev_snapshot_id = None
if not is_sole and i > 0:
prev_snapshot_id = snaps[i - 1]["SnapshotId"]
snap_id = snap["SnapshotId"]
ami_id = ami_snapshot_map.get(snap_id)
classified.append(
{
"snapshot_id": snap_id,
"volume_id": vol_id,
"volume_size_gib": snap.get("VolumeSize", 0),
"start_time": snap["StartTime"],
"state": snap.get("State", ""),
"storage_tier": snap.get("StorageTier", "standard"),
"description": snap.get("Description", ""),
"name": get_tag_value(snap.get("Tags"), "Name"),
"encrypted": snap.get("Encrypted", False),
"chain_type": chain_type,
"volume_deleted": volume_deleted,
"chain_index": i,
"chain_total": len(snaps),
"prev_snapshot_id": prev_snapshot_id,
"ami_referenced": ami_id is not None,
"ami_id": ami_id or "",
}
)
return classified
# ============================================================
# Phase 3: 実データ量の計測とコスト算出
# ============================================================
def _call_with_retry(func, **kwargs):
"""指数バックオフ付きでAPI呼び出し"""
for attempt in range(MAX_RETRIES):
try:
return func(**kwargs)
except ClientError as e:
code = e.response["Error"]["Code"]
if code in ("RequestThrottledException", "ThrottlingException"):
wait = BASE_BACKOFF_SEC * (2**attempt) + random.uniform(0, 1)
time.sleep(wait)
continue
raise
raise RuntimeError(f"API呼び出しが {MAX_RETRIES} 回リトライ後も失敗しました")
def count_snapshot_blocks(ebs_client, snapshot_id):
"""ListSnapshotBlocks でスナップショットの全ブロック数をカウント
注意: この API は論理的な全ブロックを返す。AMI 参照やスナップショットコピー等で
別ボリューム由来のスナップショットとブロックを共有している場合、実際の課金額
(アカウント内でユニークなブロック分のみ)よりも大幅に過大評価することがある。
"""
block_count = 0
block_size = BLOCK_SIZE_BYTES
next_token = None
while True:
params = {"SnapshotId": snapshot_id, "MaxResults": API_MAX_RESULTS}
if next_token:
params["NextToken"] = next_token
resp = _call_with_retry(ebs_client.list_snapshot_blocks, **params)
block_count += len(resp.get("Blocks", []))
block_size = resp.get("BlockSize", BLOCK_SIZE_BYTES)
next_token = resp.get("NextToken")
if not next_token:
break
return block_count, block_size
def count_changed_blocks(ebs_client, first_snapshot_id, second_snapshot_id):
"""ListChangedBlocks で2つのスナップショット間の差分ブロック数をカウント"""
changed_count = 0
block_size = BLOCK_SIZE_BYTES
next_token = None
while True:
params = {
"FirstSnapshotId": first_snapshot_id,
"SecondSnapshotId": second_snapshot_id,
"MaxResults": API_MAX_RESULTS,
}
if next_token:
params["NextToken"] = next_token
resp = _call_with_retry(ebs_client.list_changed_blocks, **params)
changed_count += len(resp.get("ChangedBlocks", []))
block_size = resp.get("BlockSize", BLOCK_SIZE_BYTES)
next_token = resp.get("NextToken")
if not next_token:
break
return changed_count, block_size
def measure_costs(ebs_client, classified, price_per_gb):
"""
各スナップショットの実データ量と月額コストを計算する。
EBS Direct API で実測:
- sole / first → ListSnapshotBlocks(全ブロック数)
- middle / last → ListChangedBlocks(前のスナップショットとの差分)
アーカイブ済みスナップショット:
- EBS Direct API アクセス不可のため VolumeSize で概算
"""
now = datetime.now(timezone.utc)
total = len(classified)
api_success = 0
api_fallback = 0
error_codes = defaultdict(int)
for i, snap in enumerate(classified):
snap_id = snap["snapshot_id"]
age_days = (now - snap["start_time"]).days
snap["age_days"] = max(age_days, 1)
# アーカイブ済みは Direct API アクセス不可
is_archived = snap["storage_tier"] == "archive"
unit_price = ARCHIVE_PRICE_PER_GB_MONTH if is_archived else price_per_gb
if is_archived:
actual_gib = float(snap["volume_size_gib"])
snap["measurement"] = "archive_estimate"
else:
# EBS Direct API で実測
try:
if snap["prev_snapshot_id"]:
# チェーン内の2番目以降: 差分ブロック数
block_count, block_size = count_changed_blocks(
ebs_client, snap["prev_snapshot_id"], snap_id
)
snap["measurement"] = "changed_blocks"
else:
# sole / チェーン先頭: 全ブロック数
block_count, block_size = count_snapshot_blocks(ebs_client, snap_id)
snap["measurement"] = "all_blocks"
actual_gib = (block_count * block_size) / BYTES_PER_GIB
api_success += 1
except ClientError as e:
code = e.response["Error"]["Code"]
error_codes[code] += 1
api_fallback += 1
if api_fallback <= 3:
print(f" [WARN] {snap_id}: {code}")
actual_gib = float(snap["volume_size_gib"])
snap["measurement"] = "fallback_volume_size"
except Exception as e:
error_codes[type(e).__name__] += 1
api_fallback += 1
if api_fallback <= 3:
print(f" [WARN] {snap_id}: {type(e).__name__}: {e}")
actual_gib = float(snap["volume_size_gib"])
snap["measurement"] = "fallback_volume_size"
snap["actual_data_gib"] = round(actual_gib, 3)
snap["monthly_cost_usd"] = round(actual_gib * unit_price, 4)
snap["cumulative_cost_usd"] = round(
snap["monthly_cost_usd"] * (snap["age_days"] / 30), 2
)
# 削減推定額
if snap["volume_deleted"]:
# ソースボリューム削除済み → チェーン全削除が前提なので全額削減
snap["savings_usd"] = snap["monthly_cost_usd"]
elif snap["chain_type"] in ("sole", "last"):
snap["savings_usd"] = snap["monthly_cost_usd"]
else:
# first / middle: 削除してもデータが隣接スナップに吸収される
snap["savings_usd"] = 0.0
progress = i + 1
if progress % 10 == 0 or progress == total:
print(f" [{progress}/{total}] コスト計測中...")
# --- EBS Direct API エラー診断 ---
print(f"\n --- EBS Direct API 結果 ---")
print(f" 成功: {api_success} 件 / フォールバック: {api_fallback} 件")
if error_codes:
print(f" エラー内訳:")
for code, count in sorted(error_codes.items(), key=lambda x: -x[1]):
print(f" {code}: {count} 件")
if api_fallback > 0 and api_success == 0:
print()
print(" [診断] 全件フォールバックしています。以下を確認してください:")
if "AccessDeniedException" in error_codes:
print(" → IAMポリシーに ebs:ListSnapshotBlocks / ebs:ListChangedBlocks が含まれていますか?")
print(" → 暗号化スナップショットの場合 kms:Decrypt 権限も必要です")
return classified
# ============================================================
# Phase 4: スコアリング
# ============================================================
def calculate_scores(classified):
"""ティアベースのスコアリング
スコア = コストティア(0-6) + 経過期間ティア(0-3) + 削除済みボーナス(0-1) = 0-10点
コストティア:
>= 50 USD/月 → 6, >= 20 → 5, >= 10 → 4, >= 5 → 3,
>= 1 → 2, >= 0.50 → 1, < 0.50 → 0
経過期間ティア:
>= 3年(1095日) → 3, >= 1年(365日) → 2, >= 半年(180日) → 1, < 半年 → 0
削除済みボーナス:
ソースボリューム削除済み → +1, それ以外 → 0
"""
for snap in classified:
cost_score = get_cost_tier(snap["savings_usd"])
age_score = get_age_tier(snap["age_days"])
vol_deleted_bonus = 1 if snap["volume_deleted"] else 0
snap["cost_score"] = cost_score
snap["age_score"] = age_score
snap["vol_deleted_bonus"] = vol_deleted_bonus
snap["score"] = cost_score + age_score + vol_deleted_bonus
# スコア降順、同スコア内は savings_usd 降順でソート
classified.sort(key=lambda s: (s["score"], s["savings_usd"]), reverse=True)
return classified
def build_ranked_entries(scored):
"""volume_deleted チェーンをグループ化し、個別エントリと統合したランキングを返す。
- volume_deleted かつ 2件以上のチェーン → 1つのグループエントリ
スコア: cost_tier(チェーン合計月額) + age_tier(最終スナップの経過日数) + 1
- それ以外(sole(DEL)、exists個別)→ 個別エントリのまま
"""
# volume_deleted チェーンを特定(sole は除く)
del_chain_groups = defaultdict(list)
del_chain_snap_ids = set()
for snap in scored:
if snap["volume_deleted"] and snap["chain_type"] != "sole":
del_chain_groups[snap["volume_id"]].append(snap)
del_chain_snap_ids.add(snap["snapshot_id"])
# グループエントリを作成
group_entries = []
for vol_id, snaps in del_chain_groups.items():
snaps_sorted = sorted(snaps, key=lambda s: s["start_time"])
total_monthly = sum(s["monthly_cost_usd"] for s in snaps)
total_data_gib = sum(s["actual_data_gib"] for s in snaps)
last_snap = snaps_sorted[-1]
has_ami = any(s.get("ami_referenced") for s in snaps)
cost_score = get_cost_tier(total_monthly)
age_score = get_age_tier(last_snap["age_days"])
vol_deleted_bonus = 1
score = cost_score + age_score + vol_deleted_bonus
group_entries.append({
"entry_type": "chain_group",
"volume_id": vol_id,
"snapshot_count": len(snaps),
"snapshots": snaps_sorted,
"total_monthly_usd": round(total_monthly, 4),
"savings_usd": round(total_monthly, 4),
"total_data_gib": round(total_data_gib, 1),
"age_days": last_snap["age_days"],
"cost_score": cost_score,
"age_score": age_score,
"vol_deleted_bonus": vol_deleted_bonus,
"score": score,
"ami_referenced": has_ami,
})
# 個別エントリ(グループ化されたスナップショットを除外)
individual_entries = []
for snap in scored:
if snap["snapshot_id"] not in del_chain_snap_ids:
entry = dict(snap)
entry["entry_type"] = "individual"
individual_entries.append(entry)
# 統合してスコア降順 → savings降順でソート
all_entries = group_entries + individual_entries
all_entries.sort(key=lambda e: (e["score"], e["savings_usd"]), reverse=True)
return all_entries
# ============================================================
# Phase 5: レポート出力
# ============================================================
def score_label(score):
"""スコアからアクション区分ラベルを返す"""
if score >= 8:
return "要対応"
elif score >= 5:
return "確認推奨"
elif score >= 1:
return "低優先"
else:
return "対象外"
def _score_range(label):
"""アクション区分ラベルからスコア範囲文字列を返す"""
if label == "要対応":
return "8-10"
elif label == "確認推奨":
return "5-7"
elif label == "低優先":
return "1-4"
else:
return "0"
def _display_chain_type(snap):
"""表示用のチェーンタイプ文字列を返す"""
ct = snap["chain_type"]
if snap["volume_deleted"]:
return f"{ct}(DEL)"
return ct
def generate_markdown_report(scored, ranked_entries, args):
"""Markdown レポートを生成"""
now_str = datetime.now().strftime("%Y-%m-%d %H:%M")
lines = []
lines.append("# EBS Snapshot Priority Score Report")
lines.append("")
lines.append(f"- Generated: {now_str}")
lines.append(f"- Region: {args.region}")
lines.append(f"- Price: {args.price} USD/GiB-month")
lines.append("")
# --- スコアリング基準 ---
lines.append("## Scoring Criteria (0-10)")
lines.append("")
lines.append("### Cost Tier (0-6)")
lines.append("")
lines.append("| Threshold | Score |")
lines.append("|---|---|")
lines.append("| >= 50 USD/month | 6 |")
lines.append("| >= 20 USD/month | 5 |")
lines.append("| >= 10 USD/month | 4 |")
lines.append("| >= 5 USD/month | 3 |")
lines.append("| >= 1 USD/month | 2 |")
lines.append("| >= 0.50 USD/month | 1 |")
lines.append("| < 0.50 USD/month | 0 |")
lines.append("")
lines.append("### Age Tier (0-3)")
lines.append("")
lines.append("| Threshold | Score |")
lines.append("|---|---|")
lines.append("| >= 3 years (1095 days) | 3 |")
lines.append("| >= 1 year (365 days) | 2 |")
lines.append("| >= 6 months (180 days) | 1 |")
lines.append("| < 6 months | 0 |")
lines.append("")
lines.append("### Volume Deleted Bonus (0-1)")
lines.append("")
lines.append("| Condition | Score |")
lines.append("|---|---|")
lines.append("| Source volume deleted | +1 |")
lines.append("| Source volume exists | +0 |")
lines.append("")
lines.append("### Action Guide")
lines.append("")
lines.append("| Score | Action |")
lines.append("|---|---|")
lines.append("| 8-10 | 要対応: 削除可否を確認してください |")
lines.append("| 5-7 | 確認推奨: 次回メンテ時に確認をお願いします |")
lines.append("| 1-4 | 低優先: 一覧として共有のみ |")
lines.append("| 0 | 対象外: 対応不要 |")
lines.append("")
# --- サマリ ---
total_count = len(scored)
vol_deleted_count = sum(1 for s in scored if s["volume_deleted"])
vol_exists_count = total_count - vol_deleted_count
sole_count = sum(1 for s in scored if s["chain_type"] == "sole")
first_count = sum(1 for s in scored if s["chain_type"] == "first")
middle_count = sum(1 for s in scored if s["chain_type"] == "middle")
last_count = sum(1 for s in scored if s["chain_type"] == "last")
archived_count = sum(1 for s in scored if s["storage_tier"] == "archive")
ami_ref_count = sum(1 for s in scored if s.get("ami_referenced"))
total_monthly = sum(s["monthly_cost_usd"] for s in scored)
total_savings = sum(e["savings_usd"] for e in ranked_entries)
# スコア別集計
action_counts = defaultdict(lambda: {"count": 0, "savings": 0.0})
for e in ranked_entries:
label = score_label(e["score"])
action_counts[label]["count"] += 1
action_counts[label]["savings"] += e["savings_usd"]
lines.append("## Summary")
lines.append("")
lines.append("| Item | Value |")
lines.append("|---|---|")
lines.append(f"| Total Snapshots | {total_count} |")
lines.append(f"| Volume Deleted | {vol_deleted_count} |")
lines.append(f"| Volume Exists | {vol_exists_count} |")
lines.append(f"| Chain Type: sole | {sole_count} |")
lines.append(f"| Chain Type: first | {first_count} |")
lines.append(f"| Chain Type: middle | {middle_count} |")
lines.append(f"| Chain Type: last | {last_count} |")
if archived_count > 0:
lines.append(f"| Archived | {archived_count} |")
if ami_ref_count > 0:
lines.append(f"| AMI Referenced | {ami_ref_count} |")
lines.append(f"| Total Monthly Cost | {total_monthly:.2f} USD |")
lines.append(f"| Total Potential Monthly Savings | {total_savings:.2f} USD |")
lines.append("")
# --- スコア別集計 ---
lines.append("### By Action")
lines.append("")
lines.append("| Action | Count | Monthly Savings |")
lines.append("|---|---|---|")
for label in ["要対応", "確認推奨", "低優先", "対象外"]:
ac = action_counts[label]
lines.append(f"| {label} (Score {_score_range(label)}) | {ac['count']} | {ac['savings']:.2f} USD |")
lines.append("")
# --- 削除候補 Top N ---
top_n = args.top
top_list = ranked_entries[:top_n]
lines.append(f"## Top {top_n} Deletion Candidates")
lines.append("")
lines.append(
"| Rank | Score | Action | ID | Type | Age (days) | Data (GiB) | Savings/mo | Note |"
)
lines.append("|---|---|---|---|---|---|---|---|---|")
for rank, entry in enumerate(top_list, 1):
label = score_label(entry["score"])
score_detail = f"{entry['score']}({entry['cost_score']}+{entry['age_score']}+{entry['vol_deleted_bonus']})"
note = "AMI" if entry.get("ami_referenced") else ""
if entry["entry_type"] == "chain_group":
lines.append(
f"| {rank} | {score_detail} | {label} | {entry['volume_id']} | "
f"chain(DEL) {entry['snapshot_count']}snaps | {entry['age_days']} | "
f"{entry['total_data_gib']:.1f} | {entry['savings_usd']:.2f} USD | {note} |"
)
else:
meas = " *" if entry.get("measurement") == "fallback_volume_size" else ""
chain_display = _display_chain_type(entry)
lines.append(
f"| {rank} | {score_detail} | {label} | {entry['snapshot_id']} | "
f"{chain_display} | {entry['age_days']} | "
f"{entry['actual_data_gib']:.1f}{meas} | {entry['savings_usd']:.2f} USD | {note} |"
)
lines.append("")
if any(
e.get("measurement") == "fallback_volume_size"
for e in top_list
if e["entry_type"] == "individual"
):
lines.append("\\* VolumeSize ベースの概算値")
lines.append("")
lines.append("- **AMI**: AMI が参照中(削除前に `deregister-image` が必要)")
lines.append("- **(DEL)**: ソースボリューム削除済み")
lines.append("- **chain(DEL)**: ボリューム削除済みチェーン(全スナップショットの削除を推奨)")
lines.append("")
# --- chain(DEL) グループの内訳 ---
chain_groups_in_top = [e for e in top_list if e["entry_type"] == "chain_group"]
if chain_groups_in_top:
lines.append("### Volume Deleted Chain Details")
lines.append("")
for entry in chain_groups_in_top:
lines.append(
f"#### {entry['volume_id']} ({entry['snapshot_count']} snapshots, "
f"{entry['savings_usd']:.2f} USD/month)"
)
lines.append("")
lines.append("| # | SnapshotId | Type | Date | Data (GiB) | Monthly Cost | Note |")
lines.append("|---|---|---|---|---|---|---|")
for j, snap in enumerate(entry["snapshots"], 1):
date_str = snap["start_time"].strftime("%Y-%m-%d")
snap_note = "AMI" if snap.get("ami_referenced") else ""
lines.append(
f"| {j} | {snap['snapshot_id']} | {snap['chain_type']} | {date_str} | "
f"{snap['actual_data_gib']:.1f} | {snap['monthly_cost_usd']:.2f} USD | {snap_note} |"
)
lines.append("")
# --- 注意事項 ---
lines.append("## Notes")
lines.append("")
lines.append("- sole/last: 削除すると月額コスト分が削減されます")
lines.append("- first/middle: 削除してもデータが隣接スナップショットに吸収されるため、即時の削減効果は限定的です")
lines.append("- **ソースボリューム削除済み**: チェーン全削除が前提のため、全スナップショットの月額コスト全額を削減見込みとして計上しています")
lines.append("- EBS Direct API (`ListSnapshotBlocks` / `ListChangedBlocks`) で実データ量を計測しています")
lines.append("- **AMI参照スナップショット**: 削除するには先に `aws ec2 deregister-image` でAMIの登録解除が必要です")
lines.append("- **制限事項**: AMI参照やスナップショットコピー等によるクロスボリュームのブロック共有は検出できません。実際の課金額より過大評価になる場合があります。")
lines.append("")
return "\n".join(lines)
def generate_csv_report(ranked_entries, output_path):
"""CSV レポートを生成
chain_group エントリはグループ行 + 個別スナップショット行を出力する。
"""
fieldnames = [
"rank",
"entry_type",
"score",
"cost_score",
"age_score",
"vol_deleted_bonus",
"action",
"snapshot_id",
"volume_id",
"chain_type",
"volume_deleted",
"snapshot_count",
"age_days",
"volume_size_gib",
"actual_data_gib",
"measurement",
"monthly_cost_usd",
"savings_usd",
"cumulative_cost_usd",
"ami_referenced",
"ami_id",
"start_time",
"name",
"description",
"encrypted",
"storage_tier",
]
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for rank, entry in enumerate(ranked_entries, 1):
if entry["entry_type"] == "chain_group":
# グループヘッダ行
writer.writerow({
"rank": rank,
"entry_type": "chain_group",
"score": entry["score"],
"cost_score": entry["cost_score"],
"age_score": entry["age_score"],
"vol_deleted_bonus": entry["vol_deleted_bonus"],
"action": score_label(entry["score"]),
"snapshot_id": "",
"volume_id": entry["volume_id"],
"chain_type": "chain(DEL)",
"volume_deleted": True,
"snapshot_count": entry["snapshot_count"],
"age_days": entry["age_days"],
"volume_size_gib": "",
"actual_data_gib": entry["total_data_gib"],
"measurement": "",
"monthly_cost_usd": entry["total_monthly_usd"],
"savings_usd": entry["savings_usd"],
"cumulative_cost_usd": "",
"ami_referenced": entry["ami_referenced"],
"ami_id": "",
"start_time": "",
"name": "",
"description": "",
"encrypted": "",
"storage_tier": "",
})
# チェーン内の個別スナップショット行
for snap in entry["snapshots"]:
writer.writerow({
"rank": rank,
"entry_type": "chain_member",
"score": "",
"cost_score": "",
"age_score": "",
"vol_deleted_bonus": "",
"action": "",
"snapshot_id": snap["snapshot_id"],
"volume_id": snap["volume_id"],
"chain_type": snap["chain_type"],
"volume_deleted": True,
"snapshot_count": "",
"age_days": snap["age_days"],
"volume_size_gib": snap["volume_size_gib"],
"actual_data_gib": snap["actual_data_gib"],
"measurement": snap["measurement"],
"monthly_cost_usd": snap["monthly_cost_usd"],
"savings_usd": snap["monthly_cost_usd"],
"cumulative_cost_usd": snap["cumulative_cost_usd"],
"ami_referenced": snap.get("ami_referenced", False),
"ami_id": snap.get("ami_id", ""),
"start_time": snap["start_time"].isoformat(),
"name": snap["name"],
"description": snap["description"],
"encrypted": snap["encrypted"],
"storage_tier": snap["storage_tier"],
})
else:
writer.writerow({
"rank": rank,
"entry_type": "individual",
"score": entry["score"],
"cost_score": entry["cost_score"],
"age_score": entry["age_score"],
"vol_deleted_bonus": entry["vol_deleted_bonus"],
"action": score_label(entry["score"]),
"snapshot_id": entry["snapshot_id"],
"volume_id": entry["volume_id"],
"chain_type": entry["chain_type"],
"volume_deleted": entry["volume_deleted"],
"snapshot_count": 1,
"age_days": entry["age_days"],
"volume_size_gib": entry["volume_size_gib"],
"actual_data_gib": entry["actual_data_gib"],
"measurement": entry["measurement"],
"monthly_cost_usd": entry["monthly_cost_usd"],
"savings_usd": entry["savings_usd"],
"cumulative_cost_usd": entry["cumulative_cost_usd"],
"ami_referenced": entry.get("ami_referenced", False),
"ami_id": entry.get("ami_id", ""),
"start_time": entry["start_time"].isoformat(),
"name": entry["name"],
"description": entry["description"],
"encrypted": entry["encrypted"],
"storage_tier": entry["storage_tier"],
})
def print_console_summary(scored, ranked_entries, top_n):
"""コンソールにサマリを表示"""
total = len(scored)
total_monthly = sum(s["monthly_cost_usd"] for s in scored)
total_savings = sum(e["savings_usd"] for e in ranked_entries)
print(f"\n{'='*80}")
print(f" サマリ")
print(f"{'='*80}")
print(f"スナップショット合計: {total} 件")
vol_del = sum(1 for s in scored if s["volume_deleted"])
vol_exists = total - vol_del
print(f" ボリューム削除済み: {vol_del} 件 / ボリューム存在: {vol_exists} 件")
sole = sum(1 for s in scored if s["chain_type"] == "sole")
first = sum(1 for s in scored if s["chain_type"] == "first")
middle = sum(1 for s in scored if s["chain_type"] == "middle")
last = sum(1 for s in scored if s["chain_type"] == "last")
print(f" sole: {sole} / first: {first} / middle: {middle} / last: {last}")
print(f"月額コスト合計: {total_monthly:.2f} USD")
print(f"削減見込み合計: {total_savings:.2f} USD")
# --- スコア別集計 ---
action_counts = defaultdict(lambda: {"count": 0, "savings": 0.0})
for e in ranked_entries:
label = score_label(e["score"])
action_counts[label]["count"] += 1
action_counts[label]["savings"] += e["savings_usd"]
print(f"\n--- アクション別集計 ---")
for label in ["要対応", "確認推奨", "低優先", "対象外"]:
ac = action_counts[label]
range_str = _score_range(label)
print(f" {label} (Score {range_str}): {ac['count']:>4} 件 {ac['savings']:>10.2f} USD/月")
# --- 削除候補 Top N ---
display_entries = ranked_entries[:top_n]
print(f"\n{'='*80}")
print(f" 削除候補 Top {min(top_n, len(ranked_entries))}")
print(f"{'='*80}")
header = f"{'Rank':>4} {'Score':>5} {'Detail':>9} {'Action':<6} {'ID':<24} {'Type':<12} {'Age':>5} {'Data(GiB)':>10} {'Savings':>10} {'Note':>4}"
print(header)
print("-" * len(header))
for rank, entry in enumerate(display_entries, 1):
detail = f"({entry['cost_score']}+{entry['age_score']}+{entry['vol_deleted_bonus']})"
label = score_label(entry["score"])
note = "AMI" if entry.get("ami_referenced") else ""
if entry["entry_type"] == "chain_group":
print(
f"{rank:>4} {entry['score']:>5} {detail:>9} {label:<6} {entry['volume_id']:<24} "
f"{'chain(DEL)':<12} {entry['age_days']:>5} {entry['total_data_gib']:>9.1f} "
f"{entry['savings_usd']:>9.2f}$ {note:>4}"
)
for snap in entry["snapshots"]:
date_str = snap["start_time"].strftime("%Y-%m-%d")
snap_note = "AMI" if snap.get("ami_referenced") else ""
print(
f" └ {snap['snapshot_id']} {snap['chain_type']:<7} {date_str} "
f"{snap['actual_data_gib']:>8.1f} GiB {snap['monthly_cost_usd']:>7.2f}$/mo {snap_note}"
)
else:
meas = "*" if entry.get("measurement") == "fallback_volume_size" else " "
chain_display = _display_chain_type(entry)
print(
f"{rank:>4} {entry['score']:>5} {detail:>9} {label:<6} {entry['snapshot_id']:<24} "
f"{chain_display:<12} {entry['age_days']:>5} {entry['actual_data_gib']:>9.1f}{meas} "
f"{entry['savings_usd']:>9.2f}$ {note:>4}"
)
if any(
e.get("measurement") == "fallback_volume_size"
for e in display_entries
if e["entry_type"] == "individual"
):
print(" * VolumeSize ベースの概算値")
print(" AMI=AMI参照あり(削除前に deregister-image 必要)")
print(" (DEL)=ソースボリューム削除済み")
print(" chain(DEL)=ボリューム削除済みチェーン(全スナップショットの削除を推奨)")
# ============================================================
# メイン
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="EBS Snapshot Priority Scorer - ティアベーススコアリングによるスナップショット削除優先順位の判定"
)
parser.add_argument(
"--region",
default="ap-northeast-1",
help="リージョン(デフォルト: ap-northeast-1)",
)
parser.add_argument(
"--price",
type=float,
default=DEFAULT_PRICE_PER_GB_MONTH,
help=f"スナップショット単価 USD/GiB-month(デフォルト: {DEFAULT_PRICE_PER_GB_MONTH})",
)
parser.add_argument(
"--top",
type=int,
default=20,
help="上位N件を表示(デフォルト: 20)",
)
parser.add_argument(
"--output",
default=".",
help="レポート出力先ディレクトリ(デフォルト: カレント)",
)
parser.add_argument(
"--min-age-months",
type=int,
default=0,
help="直近Nヶ月以内に作成されたスナップショットを除外(デフォルト: 0=除外なし)",
)
parser.add_argument(
"--min-savings",
type=float,
default=0.0,
help="月額削減額がN USD未満のスナップショットをレポートから除外(デフォルト: 0=除外なし)",
)
args = parser.parse_args()
# --- セッション作成 ---
session = boto3.Session(region_name=args.region)
ec2_client = session.client("ec2")
ebs_client = session.client("ebs")
# --- Phase 1: 収集 ---
print("Phase 1: スナップショット情報を取得中...")
snapshots = get_all_snapshots(ec2_client)
print(f" 取得件数: {len(snapshots)}")
if not snapshots:
print("スナップショットが見つかりませんでした。")
sys.exit(0)
# --- 年齢フィルタ ---
if args.min_age_months > 0:
try:
from dateutil.relativedelta import relativedelta
cutoff = datetime.now(timezone.utc) - relativedelta(months=args.min_age_months)
except ImportError:
from datetime import timedelta
cutoff = datetime.now(timezone.utc) - timedelta(days=args.min_age_months * 30)
before = len(snapshots)
snapshots = [s for s in snapshots if s["StartTime"] < cutoff]
excluded = before - len(snapshots)
print(f" フィルタ: 直近 {args.min_age_months} ヶ月以内を除外 → {excluded} 件除外、{len(snapshots)} 件が対象")
if not snapshots:
print("フィルタ後のスナップショットが0件です。")
sys.exit(0)
print(" ボリューム情報を取得中...")
existing_volumes = get_existing_volume_ids(ec2_client)
print(f" 現存ボリューム: {len(existing_volumes)} 件")
# --- Phase 1.5: AMI 参照検出 ---
print("Phase 1.5: AMI参照を検出中...")
ami_snapshot_map = get_ami_snapshot_map(ec2_client)
ami_count = len(ami_snapshot_map)
print(f" AMI参照スナップショット: {ami_count} 件")
# --- Phase 2: 分類 ---
print("Phase 2: スナップショットを分類中...")
classified = classify_snapshots(snapshots, existing_volumes, ami_snapshot_map)
vol_del = sum(1 for s in classified if s["volume_deleted"])
vol_exists = len(classified) - vol_del
sole = sum(1 for s in classified if s["chain_type"] == "sole")
first = sum(1 for s in classified if s["chain_type"] == "first")
middle = sum(1 for s in classified if s["chain_type"] == "middle")
last = sum(1 for s in classified if s["chain_type"] == "last")
ami_ref_count = sum(1 for s in classified if s["ami_referenced"])
print(f" ボリューム削除済み: {vol_del} / ボリューム存在: {vol_exists}")
print(f" sole: {sole} / first: {first} / middle: {middle} / last: {last}")
if ami_ref_count > 0:
print(f" AMI参照あり: {ami_ref_count} 件(削除前に deregister-image が必要)")
# --- Phase 3: コスト計測 ---
print("Phase 3: コスト計測中(EBS Direct API)...")
classified = measure_costs(ebs_client, classified, args.price)
# --- Phase 4: スコアリング ---
print("Phase 4: スコアリング中...")
scored = calculate_scores(classified)
ranked_entries = build_ranked_entries(scored)
chain_groups = sum(1 for e in ranked_entries if e["entry_type"] == "chain_group")
if chain_groups:
print(f" ボリューム削除済みチェーン: {chain_groups} グループにまとめました")
# --- Savings フィルタ ---
if args.min_savings > 0:
before = len(ranked_entries)
ranked_entries = [e for e in ranked_entries if e["savings_usd"] >= args.min_savings]
excluded = before - len(ranked_entries)
print(f" フィルタ: Savings {args.min_savings} USD/月 未満を除外 → {excluded} 件除外、{len(ranked_entries)} 件が対象")
# --- Phase 5: レポート出力 ---
print("Phase 5: レポート出力中...")
os.makedirs(args.output, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
md_path = os.path.join(
args.output, f"ebs-snapshot-score-report_{timestamp}.md"
)
csv_path = os.path.join(
args.output, f"ebs-snapshot-score-report_{timestamp}.csv"
)
md_report = generate_markdown_report(scored, ranked_entries, args)
with open(md_path, "w", encoding="utf-8") as f:
f.write(md_report)
generate_csv_report(ranked_entries, csv_path)
print(f"\n Markdown: {md_path}")
print(f" CSV: {csv_path}")
print_console_summary(scored, ranked_entries, args.top)
if __name__ == "__main__":
main()
- 実行コマンド
python ebs_snapshot_scorer.py \
--region ap-northeast-1 \
--min-age-months 6 \
--min-savings 0.50 \
--top 20 \
--output ./results
オプションは以下のとおりです。
| オプション | デフォルト | 説明 |
|---|---|---|
| --region | ap-northeast-1 | リージョン |
| --price | 0.05 | スナップショット単価(USD/GiB-month) |
| --top | 20 | 上位N件を表示 |
| --min-age-months | 0(除外なし) | 指定値ヵ月以内に作成されたスナップショットを除外 |
| --min-savings | 0(除外なし) | 月額削減額が指定値未満のスナップショットを除外 |
| --output | カレントディレクトリ | レポート出力先 |
実行してみる
実行してみると以下のように実行結果を確認することができます。(結果は編集したダミーです)
================================================================================
サマリ
================================================================================
スナップショット合計: 312 件
ボリューム削除済み: 289 件 / ボリューム存在: 23 件
sole: 278 / first: 10 / middle: 14 / last: 10
月額コスト合計: 287.34 USD
削減見込み合計: 173.99 USD
--- アクション別集計 ---
要対応 (Score 8-10): 3 件 110.15 USD/月
確認推奨 (Score 5-7): 7 件 63.84 USD/月
低優先 (Score 1-4): 0 件 0.00 USD/月
対象外 (Score 0): 0 件 0.00 USD/月
================================================================================
削除候補 Top 10
================================================================================
Rank Score Detail Action ID Type Age Data(GiB) Savings Note
--------------------------------------------------------------------------------------------------
1 9 (6+2+1) 要対応 vol-0a1b2c3d4e5f67890 chain(DEL) 487 1700.8 85.04$ AMI
└ snap-0aa11bb22cc33dd01 first 2023-03-15 180.0 GiB 9.00$/mo AMI
└ snap-0aa11bb22cc33dd02 middle 2023-10-20 400.0 GiB 20.00$/mo AMI
└ snap-0aa11bb22cc33dd03 middle 2023-10-20 1.0 GiB 0.05$/mo AMI
└ snap-0aa11bb22cc33dd04 middle 2024-02-08 440.0 GiB 22.00$/mo AMI
└ snap-0aa11bb22cc33dd05 middle 2024-06-15 330.0 GiB 16.50$/mo AMI
└ snap-0aa11bb22cc33dd06 middle 2024-09-22 250.0 GiB 12.50$/mo AMI
└ snap-0aa11bb22cc33dd07 last 2025-01-28 99.8 GiB 4.99$/mo AMI
2 8 (4+3+1) 要対応 snap-01122334455667701 sole(DEL) 2187 265.4 13.27$
3 8 (4+3+1) 要対応 snap-01122334455667702 sole(DEL) 2187 236.8 11.84$
4 7 (5+2+0) 確認推奨 snap-01122334455667703 last 543 568.6 28.43$
5 7 (4+2+1) 確認推奨 snap-01122334455667704 sole(DEL) 738 234.2 11.71$ AMI
6 7 (3+3+1) 確認推奨 vol-0f1e2d3c4b5a09876 chain(DEL) 1350 175.0 8.75$ AMI
└ snap-0ff11ee22dd33cc01 first 2021-05-15 75.0 GiB 3.75$/mo AMI
└ snap-0ff11ee22dd33cc02 middle 2021-08-22 30.0 GiB 1.50$/mo
└ snap-0ff11ee22dd33cc03 middle 2022-04-10 20.0 GiB 1.00$/mo
└ snap-0ff11ee22dd33cc04 last 2022-09-18 50.0 GiB 2.50$/mo
7 6 (2+3+1) 確認推奨 snap-01122334455667705 sole(DEL) 2820 84.6 4.23$
8 6 (2+3+1) 確認推奨 snap-01122334455667706 sole(DEL) 1456 78.4 3.92$
9 6 (2+3+1) 確認推奨 snap-01122334455667707 sole(DEL) 1892 71.2 3.56$ AMI
10 6 (2+3+1) 確認推奨 snap-01122334455667708 sole(DEL) 1203 64.8 3.24$ AMI
AMI=AMI参照あり(削除前に deregister-image 必要)
(DEL)=ソースボリューム削除済み
chain(DEL)=ボリューム削除済みチェーン(全スナップショットの削除を推奨)
一部制約事項
AMI参照によるブロック共有については実際のコスト削減効果より過大に評価してしまうケースがあります。
AMIに紐づいているスナップショットは、複数ボリュームのスナップショット間でブロックを共有していることがあります。この場合ListSnapshotBlocksでは論理的な全ブロックを返すため、共有ブロックを二重にカウントしてしまいます。
レポートではAMI参照スナップショットに「AMI」マークを表示するようにしています。
まとめ
今回はティアベーススコアリングで削除優先順位をつけるスクリプトを作成してみました。
数が多いと棚卸しや確認を行うにも対応に人的コストがかかるので、対応が後回しになりがちです。
もちろん全て対応できるのがベストではあるのですが、まずは優先順位をつけて対応することで効率的にコストの削減ができますのでまず一部の対応から始めてみるのをお勧めします。
この記事がEBSスナップショットのコスト最適化の対応の一助になれば幸いです。
以上、クラウド事業本部の木村がお届けしました。
参考







