MLワークフローを「AWS Step Functions」でサーバレスに管理する:Amazon SageMaker Advent Calendar 2018

概要

こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の10日目の記事となります。

今回はSageMakerを含めたMLのJOBワークフローを「AWS Step Functions」を使ってサーバレスで管理してみます。「MLワークフローをサーバレスに管理することで何が嬉しいのか」という点については、個人的には下記の3点がメリットになるかと思います。

  • 運用が楽
  • JOBフローが可視化されるのでわかりやすい(今回利用するStepFunctionsを使うとJOBフローが可視化される)
  • JOB稼働時間のみの課金なので、コスト削減に繋がる

    色々と嬉しいことがありますね。今回はserverless-sagemaker-orchestrationにサンプルの構成図があったので、東京リージョンで構成してみました。
    MLワークフローの「全体の一部分のみしか自動化できていない」ものの、一部修正すれば既存のシステムとも組み合わせやすそうで、いい構成だと思います。

    参考:新発表 – AWS Step Functions が コンピュート、データベース、メッセージング、アナリティクス、機械学習 のサービスと統合
    参考:serverless-sagemaker-orchestration

    目次

    1.構成図

    今回実装する構成図は下記の通りです。
    引用:serverless-sagemaker-orchestration

    この構成で「線形回帰モデル」を「スケジュールを指定して」、「利用するデータを指定して」、「新しいモデルを学習・推論用エンドポイントとしてデプロイ(UPDATE)」する工程をサーバレスに管理してみようと思います。

    ただ、この構成図だけだと処理フローがよくわからなかったので、引用元を参考に処理フローをざっくり日本語訳してみました。
    1.「CloudWatch Events」がスケジュールされたタイミングで「Step Functions」の「state machine」を起動する
    2.「state machine」がS3をチェックし、学習に利用するデータを判別し、SageMakerの学習JOBを作成する
    3.「State machine」が学習JOBの終了まで待機する
    4.学習JOBが終了したら、「State machine」が最新のモデルをエンドポイントにデプロイする。
    5.「State machine」がデプロイ終了まで待機する
    6.「State machine」がSlackに学習・デプロイに関する情報を通知する
    引用:serverless-sagemaker-orchestration

    雰囲気は掴めましたが、まだもう少し詳細を把握したいところですね。続いて、各処理についてもう少し細かく見ていきます。

    2.JOBフローの詳細

    この構成では、各JOBフローを如何に捌くか、が重要です。
    なので、今回必要となる「State machine」についてもう少し詳細を見てみましょう。
    なお、ここでの説明もserverless-sagemaker-orchestration をざっくり日本語訳したものです。

    CheckData

    「S3に新しく追加されたデータをチェックし学習に利用するファイルを判別する」部分です。AWS Systems Manager パラメータストアを使って、対象のファイルを選別します。
    もし対象のデータが存在する場合はS3に「manifestファイル」をアップロードし、モデル学習時にこのファイルを参照します。
    もし対象のファイルが存在しない場合は、構成図の「NoNewData」に移動し処理を終了します。

    StartTrainingJob

    S3にアップロードされた「training manifest」を参照して、学習を開始します。
    学習を実行した後は次の「NotifySlack」に移ります。

    NotifySlack

    登録したSlackのチャンネルにmessageを通知します。通知した後は「NotifySlackBranch」に移ります。

    NotifySlackBranch

    学習・デプロイJOBのステータスを監視し、ステータスによって次に何を実行するかを判断します。 学習JOBが実行中なら「CheckStatusWait」、失敗したら「JobFailed」、成功したら「JobCompleted」に移動します。また、デプロイJOBが失敗したら「DeploymentFailed」、成功したら「DeploymentCompleted」に移ります。

    CheckStatusWait

    「指定した時間」ごとに「GetStatus」ステップに移ります。今回は60秒ごとに「GetStatus」ステップに移るようにしました。

    GetStatus

    学習・デプロイJOBのステータスを確認した後に「CheckStatusBranch」ステップに移ります

    CheckStatusBranch

    学習・デプロイJOBのステータスから、次に移動するステップを判断します。
    もし学習・デプロイJOBが「終了 or 失敗」した場合は「NotifySlack」ステップに移ります。 それ以外の場合は「CheckStatusWait」ステップに移ります

    JobCompleted

    学習JOBが終了したことを明示的にするためのステップです。「DeployModel」ステップに移ります

    DeployModel

    学習したモデルを推論用エンドポイントとしてデプロイします。
    もし既にエンドポイントが存在するようならUPDATE,存在しないようなら新しくエンドポイントを作成し,「NotifySlack」ステップに移ります。
    (このデプロイはダウンタイム無しで実行されます。)

    DeploymentCompleted

    デプロイJOBが終了したことを明示的にするためのステップです。「UpdatedParameters」ステップに移ります

    UpdateParameters

    「AWS Systems Manager パラメータストア」をUPDATEし、今回のJOBで利用したデータを記録します。 その後、「End」ステップに移ります。

    JobFailed

    学習JOBが失敗したことを示します

    DeploymentFailed

    デプロイJOBが失敗したことを示します

    3.処理の要点

    「2.JOBフローの詳細」でちょっと細かいところまで確認してしまったので、ここで、「1.構成図」にて記述した各処理ステップの要点を再確認し、フローの全体図をなんとなく把握したいと思います。
    今回の処理フローは下記の通りでした。
    下記のフローを「具体的にどこでどのように制御しているか」を要点を絞って紹介しようと思います。

    1.「CloudWatch Events」がスケジュールされたタイミングで「Step Functions」の「state machine」を起動する
    2.「state machine」がS3をチェックし、学習に利用するデータを判別し、SageMakerの学習JOBを作成する
    3.「State machine」が学習JOBの終了まで待機する
    4.学習JOBが終了したら、「State machine」が最新のモデルをエンドポイントにデプロイする
    5.「State machine」がデプロイ終了まで待機する
    6.「State machine」がSlackに学習・デプロイに関する情報を通知する

    3-1.「CloudWatch Events」がスケジュールされたタイミングで「Step Functions」の「state machine」を起動する

    今回新規に作成した「step functions」のステートマシーンが「Cloud watch events」に登録されているので、スケジュールを指定しての実行が可能です。 これを...

    このようにすると、毎日朝9時に実行できます。(GMTなので東京の9時間遅れ)

    時間指定ではなく、特定のイベントの後に実行させることもできます。
    学習用ファイルの準備(前処理)JOBの後にこのイベントを実行させる、というのが一番いい流れだと思います。

    3-2.「state machine」がS3をチェックし、学習に利用するデータを判別し、SageMakerの学習JOBを作成する

    S3に格納されているファイル、manifestファイル、「AWS Systems Manager パラメータストア」から、学習に利用するデータを絞り込みます。
    この絞り込む条件についても精査が必要なのですが、例えば「古いデータは現時点の社会を反映していない」という考えから「直近2年間分のデータのみを使う」、と絞り込みが可能です。
    一概に「全期間分のデータをとりあえず使った方がいい」とはならないので難しいところですね。

    3-3.「State machine」が学習JOBの終了まで待機する

    「AWS Step Functions」のWaitを使って待機しています。
    現在は60秒待機するようにしていますが、ここも修正可能です。

    3-4.学習JOBが終了したら、「State machine」が最新のモデルをエンドポイントにデプロイする

    今回のテンプレートファイルで作成したLambda関数の「deploy_model.py」の中で実行します。
    エンドポイントの「インスタンスタイプ」、「インスタンス数」等を変更したい時は、Lamdaの「環境変数」を変更することで修正可能です。

    3-5.「State machine」がデプロイ終了まで待機する

    「3-3.「State machine」が学習JOBの終了まで待機する」と同様です。

    3-6.「State machine」がSlackに学習・デプロイに関する情報を通知する

    JOBの実行結果をSlackに通知してくれます。

    これは今回のテンプレートファイルで作成したLambda関数の「notify_slack.py」の中で実行されています。
    デフォルトだと通知しないように作成されるので、もしSlackでの通知が必要なようなら、「ENABLED」、「CHANNEL」、「ACCESS_TOKEN」の3つの環境変数を設定しましょう。

    全体のフロー等、細かいことの説明は以上で終了です。
    続いて、早速東京リージョンに実装していきます。

    4.やってみる

    Running the Exampleを参考に、早速やってみようと思います。

    4-0.Slackのワークスペースを作成する

    JOBの実行結果をSlackに通知させたい場合は、Slack側の準備が必要です。もし、Slackへの通知が不要なら「4-0.Slackのワークスペースを作成する」は読み飛ばしてください。

    今回、私は既に作成していたワークスペースを利用しました。もしこれからワークスペースを作成する、という方はSlack ワークスペースを作成するの通りにワークスペースを作成しましょう。

    続いて、「app」を作成します。

    app名とSlackのワークスペース名を指定して、「Create app」をクリックします。

    credential情報が確認できます。外部に漏れないように厳重に管理しましょう。

    「OAuth & Permissions」をクリックします。

    「Select Permission Scopes」に「chat:write:bot」と記述すると、対象のappに権限を付与できるので、選択した後に「Save Changes」をクリックします。

    続いて、Appをワークスペースにインストールするために、「Install App」欄の「Install App to Workspace」をクリックします。

    このAppはワークスペースに通知を送るよ、という確認をした後に「Authorize」します。

    「OAuth Access Token」が発行されます。これも後ほど使うので控えておきましょう。

    また、通知先の「Slackチャンネル」も控えておいてください。私は「general」に送ることとします。
    以上で、Slackの準備は完了です。

    4-1.cloudformationで一気に環境構築

    ドキュメントには「コンソール画面上からcloudformationを使ってオレゴンリージョンに環境構築」するやり方と「手動で指定したリージョンに環境構築する」、2パターンのやり方を紹介していました。
    折角なので今回は「手動で東京リージョンに環境構築」してみようと思います。

    東京リージョンに手動で環境構築

    手動とはいえ、基本的には「cloud formationテンプレート」を利用します。

    まずは、東京リージョンに対象となるS3バケットを作成した後、ローカルPCの環境変数に下記をセットします。

    S3BUCKET=[REPLACE_WITH_BUCKET_TO_UPLOAD_TEMPLATE_ARTIFACTS_TO]
    REGION=[REPLACE_WITH_REGION_YOU_WISH_TO_DEPLOY_TO]
    STACKNAME=[REPLACE_WITH_DESIRED_STACK_NAME]
    SAGEMAKERROLE=[REPLACE_WITH_SAGEMAKER_EXECUTION_ROLE]
    ATOKEN=[REPLACE_WITH_OAUTH_ACCESS_TOKEN]
    CHANNEL=[REPLACE_WITH_SLACK_CHANNEL_NAME]
    

    続いて、テンプレートファイルをcloneしてきます。  

    git clone https://github.com/aws-samples/serverless-sagemaker-orchestration.git
    cd serverless-sagemaker-orchestration/cloudformation
    

    「continuous_sagemaker.serverless.yaml」がポツンと存在しているので、このファイルをaws cloudformation packageコマンドを使ってS3にアップロードします。

    hoge:cloudformation yoshim.dayo$ ls -l
    total 32
    -rw-r--r--  1 yoshim.dayo  staff  13850 12  5 19:09 continuous_sagemaker.serverless.yaml
    
    aws cloudformation package --region $REGION --s3-bucket $S3BUCKET --template continuous_sagemaker.serverless.yaml --output-template-file continuous_sagemaker.output.yaml
    

    すると、S3の指定したバケット,ローカルPCに新しくファイルが作成されるので、これらを元にスタックを作成できます。

    hoge:cloudformation yoshim.dayo$ ls -l
    -rw-r--r--  1 yoshim.dayo  staff  14041 12  5 19:15 continuous_sagemaker.output.yaml
    -rw-r--r--  1 yoshim.dayo  staff  13850 12  5 19:09 continuous_sagemaker.serverless.yaml
    
    

    下記のコマンドで、新しく作成されたファイルをもとにスタックを作成できます。

    aws cloudformation deploy --region $REGION --template-file continuous_sagemaker.output.yaml --stack-name $STACKNAME --capabilities CAPABILITY_NAMED_IAM --parameter-overrides SageMakerExecutionRole=$SAGEMAKERROLE SlackAccessToken=$ATOKEN SlackChannel=$CHANNEL
    

    私が実行した際は「S3バケットが既に存在している」とエラーが出ました。どうやら「continuous_sagemaker.output.yaml」でも先ほどと同名のS3バケットを作成しようとしていたようです。
    なので「continuous_sagemaker.output.yaml」の下記の部分をコメントアウトして、再実行したところうまくいきました。

    こんな感じにコメントアウト

      #S3Bucket:
        #Properties:
          #BucketName:
            #Ref: BucketName
        #Type: AWS::S3::Bucket
    

    4-2.学習の準備

    続いて、SageMakerノートブックインスタンスを立ち上げ、データセットをS3バケットにアップロードします。
    基本的にUploading Training Dataを参考に作業を進めていきます。

    まずは、東京リージョンのノートブックインスタンスに「serverless-sagemaker-orchestration.ipynb」をアップロードします。
    このファイルは、先ほどcloneしてきたリポジトリ内にあります。

    続いて、ノートブックインスタンス上で「serverless-sagemaker-orchestration.ipynb」を開きます。
    下記のS3バケットを先ほどから利用しているバケットに修正して、実行してサンプルデータをS3に配置します。

    num_days = 3 # number of days to split housing prices data into.
    model_name = 'LinearLearner-HomePrices' # If you modified the ModelPrefix CloudFormation template change this to the value you modified it to be.
    
    bucket = '<NAME OF YOUR BUCKET HERE>'  # Set this to the name of bucket created by CloudFormation template. Can be found in the output of the template.
    prefix = 'data/{}/train'.format(model_name)
    
    role = get_execution_role()
    region = boto3.Session().region_name
    
    boston = load_boston()
    target = boston.target
    data = [np.ndarray.tolist(row) for row in boston.data[:, :]]
    
    # Add target value as first column as expected by training algorithm
    training_set = [[row[0]] + row[1] for row in zip(target, data)]
    
    # Split data into seperate datasets for each day
    train_by_day = split_data_by_days(training_set, num_days)
    
    # Upload split datasets to S3
    for day in range(num_days):
        current_date = date.today() - timedelta(day)
        key = '{}.csv'.format(current_date)
        write_to_csv(key, train_by_day[day])
        s3_uri = 's3://{}/{}/{}'.format(bucket, prefix, key)
        print('Uploading {} to {}'.format(key, s3_uri))
        boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, key)).upload_file(key)
    

    このように、S3パスにサンプルデータが配置されます。

    4-3.手動実行して稼働確認

    続いて、手動でJOBを実行させてみます。
    もしエラーが出る場合は、cloudWatchのログから原因を調査する必要があります。
    私は下記の4点を修正しました。

  • 「Lambda関数にセットされているIAMポリシー」を修正して「今回利用するS3バケットにアクセスできるようにする」
  • トレーニングJOB名を「実行する度に変わる」ように修正。具体的には、「ServerlessSageMakerOrches-StartTrainingJobFunction」のLambda関数の「start_trainning_job.py」を修正。
  • 「Lamda」の「ServerlessSageMakerOrches-StartTrainingJobFunction」関数の「start_training_job.py」に東京リージョンのコンテナを追加
  • Slackへの通知をするように、環境変数の「ENABLED」、「CHANNEL」、「ACCESS_TOKEN」をLambdaの環境変数に設定
    CONTAINERS = {'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest',
                  'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest',
                  'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest',
                  'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest',
                  'ap-northeast-1': '351501993468.dkr.ecr.ap-northeast-1.amazonaws.com/linear-learner:latest'}
    

    コンテナは下記で調べました。

    import boto3
    from sagemaker.amazon.amazon_estimator import get_image_uri
    
    container = get_image_uri(boto3.Session().region_name, 'linear-learner')
    print(container)
    

    「Step Functions」のコンソール画面上からは、このようにJOBの過程も確認できます。

    ちょっとづつJOBが進んでいく...。今はモデルを学習して、1分ずつステータスを確認しているところですね。

    モデルの学習が終わって、デプロイしているようです。

    SageMakerの画面に移っても、当然デプロイしているところが確認できます。

    デプロイが終わるとこんな感じです。

    ちゃんと推論用エンドポイントがデプロイされていました。

    S3を確認してみたところ、トレーニングデータが格納されているパスに「manifest」ファイルが生成されていました。
    次回の学習時には、この「manifest」ファイルを参照し、「次の学習に利用するデータ」を判別します。

    このファイルは、「CheckData」ステップで「今回のJOBに利用したファイル」を記録して出力しているものです。
    このファイルの詳細については「2.JOBフローの詳細」をご参照ください。

    JOBの実行が完了するとAWS Systems Manager パラメータストアが更新され、「学習に利用したトレーニングファイルの最新日付」を値として保持します。
    次のJOB実行時は「CheckData」ステップで、この値とS3に実際に保存されているファイルを突合させて、学習に利用するファイルを判別し、その結果をmanifestファイルとして出力し、モデル学習時にそのmanifestファイルを参照する、という流れになります。

    また、Slackへの通知を確認してみると、下記のように通知が来ていることが確認できます。

    5.まとめ

    MLのワークフローを,一部とはいえサーバレスで実現することができました。
    運用の簡易化、フローの可視化、コスト削減、等色々とメリットが考えられるこの構成ですが、「あくまでも機械学習のJOBフローのほんの一部」しか管理できていない点にはご注意ください。
    ただ、それでもこの構成はMLワークフローの手間やコストを大いに削減してくれて、更に拡張性もあるいい構成だと思います。

    かなり長くなってしまいましたが、本エントリーの内容は以上になります。 最後までありがとうございました。