[Boto3 Adv-Cal DAY4]AWS Transfer for SFTPサーバ起動を対話式で実行するロジックに挑戦してみた

boto3 で楽しむ AWS PythonLife 一人AdventCalendarです。

boto3のドキュメントを通して、サービス別にどういった事が出来るのかを理解したり、管理コンソールを通さずにTerminalだけで完結できるように検証していくことが目的になります。

4日目はAWS Transfer for SFTPサーバ起動を対話ベースでの作成を試してみました。

目次

boto3を通してAWS Transfer for SFTPで出来ること

ドキュメントは下記リンク先です。

ざっくりと以下のことができます。

  • SFTPサーバの操作(作成・削除・停止・開始)
  • ユーザの操作(作成・削除)
  • SSH公開鍵の操作(追加・削除)

今回の操作

対話ベースでパラメータを指定して、インスタンスを作成します。

  1. Identity Providerの指定
  2. インスタンスの作成
  3. IAM Roleの指定
  4. ユーザ名の指定
  5. 公開鍵の指定

実行

途中で失敗した場合、サーバとユーザのうち作成されたものがあったら削除します。

% python main.py
Input Profile name [default]>> advent_calendar

Select provider type >>
[0] SERVICE_MANAGED
[1] API_GATEWAY
>> 0

Select ARN
[0] arn:aws:iam::XXXXXXXXXXXX:role/XXXXXXXXXXXXXX
>> 0

Input user name >>test

Input key_path [~/.ssh/id_rsa.pub]>>

main.py

import boto3
import subprocess
import argparse
import os
import re

class TransferWizard:
    _client_name = 'transfer'
    _session = None

    def __init__(self, profile_name):
        self._session = boto3.Session(profile_name=profile_name)

    @property
    def session(self):
        return self._session

    def get_client(self, client_name=None):
        if not client_name:
            client_name = self._client_name
        return self.session.client(client_name)

    @property
    def client_name(self):
        return self._client_name

    def get_iam_arn_list(self):
        res = self.get_client('iam').list_roles()
        return [role['Arn'] for role in res['Roles']]

    @classmethod
    def get_identity_provider_type(cls):
        return ["SERVICE_MANAGED", "API_GATEWAY"]

    def create_server(self, **kwargs):
        return self.get_client().create_server(**kwargs)

    def create_user(self, **kwargs):
        return self.get_client().create_user(**kwargs)

    def set_public_key(self, server_id, user_name, key_body=None):
        default_public_key = '~/.ssh/id_rsa.pub'
        if not key_body:
            file_path = input('\nInput key_path [{}]>>'.format(default_public_key))
            if len(file_path) == 0:
                file_path = default_public_key
            if re.match(r'^~', file_path):
                file_path = os.path.expanduser(file_path)
            hundle = open(file_path, "r")
            key_body = hundle.read()
        params = {'ServerId': server_id, 'SshPublicKeyBody': key_body, 'UserName': user_name}
        return self.get_client().import_ssh_public_key(**params)

    @classmethod
    def prompt_user_name(cls):
        user_name = None
        while True:
            user_name = input('\nInput user name >>')
            if user_name and len(user_name) != 0:
                break
        return user_name

    @classmethod
    def prompt_identity_provider(cls):
        provider_types = cls.get_identity_provider_type()
        provider_type = provider_types[0]
        while True:
            print('\nSelect provider type >>')
            for _type in provider_types:
                print('[{}] {}'.format(provider_types.index(_type), _type))
            index = input('>> ')
            if len(index) != 0 and int(index) < len(provider_types):
                provider_type = provider_types[int(index)]
                break
        return provider_type

    def prompt_iam_arn(self):
        arn = None
        arns = self.get_iam_arn_list()
        while True:
            print('\nSelect ARN')
            for arn in arns:
                print('[{}] {}'.format(arns.index(arn), arn))
            index = input('>> ')
            if len(index) != 0 and int(index) < len(arns):
                arn = arns[int(index)]
                break
        return arn

    @staticmethod
    def prompt():
        params = {}
        default_profile_name = 'default'
        profile_name = input('Input Profile name [{}]>> '.format(default_profile_name))
        if len(profile_name) == 0:
            profile_name = default_profile_name
        transfer = TransferWizard(profile_name)
        server = None
        user = None
        try:       
            server_params = {}
            server_params['IdentityProviderType'] = transfer.prompt_identity_provider()
            server = transfer.create_server(**server_params)

            user_params = {}
            user_params['Role'] = transfer.prompt_iam_arn()
            user_params['ServerId'] = server['ServerId']
            user_params['UserName'] = transfer.prompt_user_name()
            user = transfer.create_user(**user_params)

            transfer.set_public_key(server_id=server['ServerId'], user_name=user['UserName'])
        except:
            print('Catch Error.')
            if user:
                print('Delete user: {}'.format(user['UserName']))
                transfer.get_client().delete_user(ServerId=server['ServerId'], UserName=user['UserName'])
                print('Delete server: {}'.format(server['ServerId']))
                transfer.get_client().delete_server(ServerId=server['ServerId'])

if __name__ == '__main__':
    TransferWizard.prompt()

IdentityProviderType

API_GATEWAYを指定すると、必要なパラメータが変わってきます。

Bucket指定

今回は指定していませんが、create_user()HomeDirectoryにて引数指定することで可能になります。

まとめ

操作は割とシンプルに行えました。一括ユーザ追加操作等には特に効果的だと思います。