AWS IAM Identity Centerの棚卸しスクリプトを書いてみる【Python,Boto3】

2022.10.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

以下アップデートから、 AWS IAM Identity Center (旧 AWS Single-Sign On) の IDストアに対してAPIから参照・更新ができるようになっています。

今回はこれらAPIを使って、IAM Identity Center の棚卸しスクリプトを書いてみます。

IDストア情報(ユーザー, グループ)および、許可セットや割り当て情報 を Markdownファイルとして出力するPythonスクリプトです。

更新履歴

  • 2023/01/19
    • PermissionSetArn情報を出力に記載
    • AWSアカウント一覧を出力に記載
    • Markdownテーブルの余分なスペースを削除

出力イメージ

以下のような Markdownファイル を生成します。

# AWS IAM Identity Center 棚卸し

- 取得開始時刻: {datetime}
- 実行アカウント: {account_name} ({account_id})

## IAM Identity Center 情報

- インスタンスARN: {instance_arn}
- インスタンスストアID: {identity_store_id}

## AWSアカウント一覧

{accounts(Markdownテーブル)}

## ユーザー一覧

{users(Markdownテーブル)}

## グループ一覧

{groups(Markdownテーブル)}

## ユーザーのグループ所属情報

{group_memberships(Markdownテーブル)}

## 許可セット一覧

{permission_sets(Markdownテーブル)}

## 割り当て一覧

{assignments(Markdownテーブル)}

全体(〜ユーザー一覧)

img

※ 目次(TOC)部分は wikiの自動生成機能で表示

グループ一覧

img

ユーザーのグループ所属一覧

img

許可セット一覧

img

割り当て一覧

img

作ったPythonスクリプト

前提

  • AWS API実行に boto3 を使っています
  • Markdownテーブル作成に tabulate を使っています ( 参考 )
  • [参考] スクリプト作成・テストに用いた実行環境は以下のとおりです
    • OS: macOS Monterey version 12.6
    • Python: 3.10.8
    • boto3: 1.24.93
    • tabulate: 0.9.0

Pythonスクリプト( inventory.py )

こちらのGist に上げています(後述の補足にも同じものを記載)。

Pythonスクリプトの実行について

事前に 管理アカウント上に対して AWS APIを実行できるようにします※。 例えば以下のようなコマンドが実行成功するか確かめてください。

  • aws sts get-caller-identity → 管理アカウントのアカウントIDであること
  • aws sso-admin list-instances → IDストアの情報が出てくること

※IAM Identity Centerを委任しているメンバーアカウント上では正常に動作しません。 管理アカウントへの割り当て情報を取得できないためです。

Pythonスクリプト( inventory.py ) をローカルで実行します。 Markdownファイル( inventory_{YYYY-mm-dd_HH-MM-SS}.md )が生成されます。

以下実行例です。

$ python3 ./inventory.py
# INFO:botocore.credentials:Found credentials in environment variables.
# INFO:root:[start] timestamp: 2022-10-28_13-40-24
# INFO:root:# getting sso instances...
# ...(略)...
# INFO:root:## getting Infra_PRD_NW, IAMReadOnlyAccess assignments...
# INFO:root:-> number of assignments: 113
# INFO:root:# generating report(inventory_2022-10-28_13-40-24.md)...
# INFO:root:[end]

$ ls ./inventory_*.md
# inventory_2022-10-28_13-40-24.md

$ head -n 16 ./inventory_2022-10-28_13-40-24.md
# # AWS IAM Identity Center 棚卸し
#  
# - 取得開始時刻: 2022-10-28_13-40-24
# - 実行アカウント: Payer-example (111111111111)
#  
# ## IAM Identity Center 情報
#  
# - インスタンスARN: arn:aws:sso:::instance/ssoins-example
# - インスタンスストアID: d-example
#  
# ## ユーザー一覧
#  
# | DisplayName | UserName         | UserId                   |
# |-------------|------------------|--------------------------|
# | EXAMPLE_AAA | aaa@example.com  | 1234567890-aaaa-example  |
# | EXAMPLE_BBB | bbb@example.com  | 1234567890-bbbb-example  |

考慮点など

  • 使っているアクションは以下のとおりです
    • sts:GetCallerIdentity - Boto3
    • organizations:ListAccounts - Boto3
    • identitystore:ListUsers - Boto3
    • identitystore:ListGroups - Boto3
    • identitystore:ListGroupMemberships - Boto3
    • sso:ListInstances - Boto3
    • sso:ListPermissionSets - Boto3
    • sso:ListAccountAssignments - Boto3
    • sso:DescribePermissionSet - Boto3
  • 「存在しない(削除した)ユーザー、グループ」に対する割り当て情報があった場合は #DELETED({principal_id}) を記載します (スクリプト作成時に詰まったポイント)
  • (私が試した範囲では起きませんでしたが) APIリクエストが最大レートを超える場合は別途エラーハンドリングを実装する必要があります

おわりに

AWS IAM Identity Center の棚卸しスクリプトを書いてみました。

現状はマネジメントコンソール上では「だれがどのアカウントにどの権限でアクセスできるか」情報は俯瞰しづらいです。こういった棚卸しスクリプトで定期的に確認するのも一つの手です。

参考

補足

Pythonスクリプト( inventory.py )

import boto3
import logging
import re
from tabulate import tabulate
from datetime import datetime
from operator import itemgetter
from itertools import product

logging.basicConfig(level=logging.INFO)

org_client = boto3.client('organizations')
idstore_client = boto3.client('identitystore')
ssoadmin_client = boto3.client('sso-admin')
sts_client = boto3.client('sts')

# テンプレート
TEMPLATE = """# AWS IAM Identity Center 棚卸し

- 取得開始時刻: {datetime}
- 実行アカウント: {account_name} ({account_id})

## IAM Identity Center 情報

- インスタンスARN: {instance_arn}
- インスタンスストアID: {identity_store_id}

## AWSアカウント一覧

{accounts}

## ユーザー一覧

{users}

## グループ一覧

{groups}

## ユーザーのグループ所属一覧

{group_memberships}

## 許可セット一覧

{permission_sets}

## 割り当て一覧

{assignments}
"""


def generate_report(datetime_now='', instance_arn='', identity_store_id='', account_name='', account_id='', accounts='', users='', groups='', group_memberships='', permission_sets='', assignments=''):
    text = TEMPLATE.format(
        datetime=datetime_now,
        instance_arn=instance_arn,
        identity_store_id=identity_store_id,
        account_name=account_name,
        account_id=account_id,
        accounts=accounts,
        users=users,
        groups=groups,
        group_memberships=group_memberships,
        permission_sets=permission_sets,
        assignments=assignments)
    text_fix = re.sub('\| +', '| ', re.sub(' +\|', ' |', text)) #テーブルの余分なスペースを削除
    file_path = f'inventory_{datetime_now}.md'
    with open(file_path, mode='w') as f:
        f.write(text_fix)


def boto3_accounts():
    accounts = []
    paginator = org_client.get_paginator('list_accounts')
    for page in paginator.paginate():
        accounts += page.get('Accounts')
    return accounts


def boto3_users(identity_store_id):
    users = []
    paginator = idstore_client.get_paginator('list_users')
    for page in paginator.paginate(IdentityStoreId=identity_store_id):
        users += page.get('Users')
    return users


def boto3_groups(identity_store_id):
    groups = []
    paginator = idstore_client.get_paginator('list_groups')
    for page in paginator.paginate(IdentityStoreId=identity_store_id):
        groups += page.get('Groups')
    return groups


def boto3_group_memberships(identity_store_id, group_id):
    memberships = []
    paginator = idstore_client.get_paginator(
        'list_group_memberships')
    for page in paginator.paginate(IdentityStoreId=identity_store_id, GroupId=group_id):
        memberships += page.get('GroupMemberships')
    return memberships


def boto3_permission_sets(instance_arn):
    permission_sets = []
    paginator = ssoadmin_client.get_paginator('list_permission_sets')
    for page in paginator.paginate(InstanceArn=instance_arn):
        permission_sets += page.get('PermissionSets')
    return permission_sets


def boto3_account_assignment(instance_arn, account_id, permission_set_arn):
    assignments = []
    paginator = ssoadmin_client.get_paginator('list_account_assignments')
    for page in paginator.paginate(InstanceArn=instance_arn, AccountId=account_id, PermissionSetArn=permission_set_arn):
        assignments += page.get('AccountAssignments')
    return assignments


def _principal_name(assignment, user_id_to_name, group_id_to_name):
    principal_type = assignment.get('PrincipalType')
    principal_id = assignment.get('PrincipalId')
    if principal_type == 'USER' and user_id_to_name.get(principal_id):
        return user_id_to_name.get(principal_id)
    elif principal_type == 'GROUP' and group_id_to_name.get(principal_id):
        return group_id_to_name.get(principal_id)
    else:
        return f'#DELETED({principal_id})'


def main():
    # 実行開始時刻の取得
    datetime_now = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
    logging.info(f'[start] timestamp: {datetime_now}')

    # (事前準備) SSOインスタンス情報の取得
    logging.info('# getting sso instances...')
    instances = ssoadmin_client.list_instances().get('Instances')
    identity_store_id = instances[0].get('IdentityStoreId')
    instance_arn = instances[0].get('InstanceArn')
    logging.info(f'-> instance store id: {identity_store_id}')
    logging.info(f'-> instance arn: {instance_arn}')

    # アカウントID, アカウント名の取得
    logging.info('# getting accounts...')
    accounts = boto3_accounts()
    account_id_to_name = dict([
        (a.get('Id'), a.get('Name')) for a in accounts
    ])
    accounts_tabulate = tabulate(
        sorted([(a.get('Name'), a.get('Id'))
               for a in accounts], key=itemgetter(0)),
        headers=['AccountName', 'AccountId'],
        tablefmt='github'
    )
    logging.info(f'-> number of accounts: {len(account_id_to_name)}')

    # 処理を実行しているアカウント情報の取得
    exec_account_id = sts_client.get_caller_identity().get('Account')
    exec_account_name = account_id_to_name.get(exec_account_id)
    logging.info(f'-> exec account: {exec_account_name}({exec_account_id})')

    # ユーザー一覧の取得
    logging.info('# getting sso users...')
    users = boto3_users(identity_store_id)
    user_id_to_name = dict([
        (u.get('UserId'), u.get('DisplayName')) for u in users
    ])
    users_tabulate = tabulate(
        sorted([(u.get('DisplayName'), u.get('UserName'), u.get('UserId'))
               for u in users], key=itemgetter(0,1)),
        headers=['DisplayName', 'UserName', 'UserId'],
        tablefmt='github'
    )
    logging.info(f'-> number of users: {len(user_id_to_name)}')

    # グループ一覧の取得
    logging.info('# getting sso groups...')
    groups = boto3_groups(identity_store_id)
    group_id_to_name = dict(
        [(g.get('GroupId'), g.get('DisplayName')) for g in groups])
    groups_tabulate = tabulate(
        sorted(
            [(g.get('DisplayName'), g.get('Description'), g.get('GroupId'))
             for g in groups],
            key=itemgetter(0)
        ),
        headers=['DisplayName', 'Description', 'GroupId'],
        tablefmt='github'
    )
    logging.info(f'-> number of groups: {len(group_id_to_name)}')

    # ユーザーのグループ所属一覧の取得
    logging.info('# getting all group memberships...')
    all_memberships = []
    for group_id, group_name in group_id_to_name.items():
        memberships = boto3_group_memberships(identity_store_id, group_id)
        all_memberships += [
            (group_name, user_id_to_name.get(m.get('MemberId').get('UserId')))
            for m in memberships
        ]
    all_memberships_tabulate = tabulate(
        sorted(all_memberships, key=itemgetter(0, 1)),
        headers=['GroupName', 'UserName'],
        tablefmt='github'
    )
    logging.info(f'-> number of memberships: {len(all_memberships)}')

    # 許可セット一覧の取得
    logging.info('# getting permission sets...')
    permission_set_arns = boto3_permission_sets(instance_arn)
    permission_sets = []
    permission_set_arn_to_name = {}
    for arn in permission_set_arns:
        permission_set = ssoadmin_client.describe_permission_set(
            InstanceArn=instance_arn, PermissionSetArn=arn).get('PermissionSet')
        permission_sets.append(permission_set)
        permission_set_arn_to_name[arn] = permission_set.get('Name')
    permission_sets_tabulate = tabulate(
        sorted(
            [(ps.get('Name'), ps.get('Description'), ps.get('PermissionSetArn'))
             for ps in permission_sets],
            key=itemgetter(0)
        ),
        headers=['PermissionSetName', 'Description', 'PermissionSetArn'],
        tablefmt='github'
    )
    logging.info(
        f'-> number of permission sets: {len(permission_set_arn_to_name)}')

    # 割り当て一覧の取得 (アカウント名 x プリンシパル名 x プリンシパル種別 x 許可セット名)
    logging.info('# getting all assignments...')
    all_assignments = []
    for account_id, permission_set_arn in product(account_id_to_name.keys(), permission_set_arns):
        account_name = account_id_to_name.get(account_id)
        permission_set_name = permission_set_arn_to_name.get(
            permission_set_arn)
        logging.info(
            f'## getting {account_name}, {permission_set_name} assignments...')
        assignments = boto3_account_assignment(
            instance_arn, account_id, permission_set_arn)
        for a in assignments:
            principal_type = a.get('PrincipalType')
            principal_name = _principal_name(
                a, user_id_to_name, group_id_to_name)
            all_assignments.append(
                (account_name, principal_type, principal_name, permission_set_name))
    all_assignments_tabulate = tabulate(
        sorted(all_assignments, key=itemgetter(0, 1, 2)),
        headers=['AccountName', 'PrincipalType',
                 'PrincipalName', 'PermissionSetName'],
        tablefmt='github'
    )
    logging.info(f'-> number of assignments: {len(all_assignments)}')

    # 出力
    logging.info(f'# generating report(inventory_{datetime_now}.md)...')
    generate_report(
        datetime_now=datetime_now,
        instance_arn=instance_arn,
        identity_store_id=identity_store_id,
        account_name=exec_account_name,
        account_id=exec_account_id,
        accounts=accounts_tabulate,
        users=users_tabulate,
        groups=groups_tabulate,
        group_memberships=all_memberships_tabulate,
        permission_sets=permission_sets_tabulate,
        assignments=all_assignments_tabulate
    )
    logging.info('[end]')


main()