MLワークフローを「AWS Step Functions」でサーバレスに管理する:Amazon SageMaker Advent Calendar 2018
概要
こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の10日目の記事となります。
今回はSageMakerを含めたMLのJOBワークフローを「AWS Step Functions」を使ってサーバレスで管理してみます。「MLワークフローをサーバレスに管理することで何が嬉しいのか」という点については、個人的には下記の3点がメリットになるかと思います。
色々と嬉しいことがありますね。今回はserverless-sagemaker-orchestrationにサンプルの構成図があったので、東京リージョンで構成してみました。
MLワークフローの「全体の一部分のみしか自動化できていない」ものの、一部修正すれば既存のシステムとも組み合わせやすそうで、いい構成だと思います。
参考:新発表 – AWS Step Functions が コンピュート、データベース、メッセージング、アナリティクス、機械学習 のサービスと統合
参考:serverless-sagemaker-orchestration
目次
1.構成図
今回実装する構成図は下記の通りです。
引用:serverless-sagemaker-orchestration
この構成で「線形回帰モデル」を「スケジュールを指定して」、「利用するデータを指定して」、「新しいモデルを学習・推論用エンドポイントとしてデプロイ(UPDATE)」する工程をサーバレスに管理してみようと思います。
ただ、この構成図だけだと処理フローがよくわからなかったので、引用元を参考に処理フローをざっくり日本語訳してみました。
1.「CloudWatch Events」がスケジュールされたタイミングで「Step Functions」の「state machine」を起動する
2.「state machine」がS3をチェックし、学習に利用するデータを判別し、SageMakerの学習JOBを作成する
3.「State machine」が学習JOBの終了まで待機する
4.学習JOBが終了したら、「State machine」が最新のモデルをエンドポイントにデプロイする。
5.「State machine」がデプロイ終了まで待機する
6.「State machine」がSlackに学習・デプロイに関する情報を通知する
引用:serverless-sagemaker-orchestration
雰囲気は掴めましたが、まだもう少し詳細を把握したいところですね。続いて、各処理についてもう少し細かく見ていきます。
2.JOBフローの詳細
この構成では、各JOBフローを如何に捌くか、が重要です。
なので、今回必要となる「State machine」についてもう少し詳細を見てみましょう。
なお、ここでの説明もserverless-sagemaker-orchestration をざっくり日本語訳したものです。
CheckData
「S3に新しく追加されたデータをチェックし学習に利用するファイルを判別する」部分です。AWS Systems Manager パラメータストアを使って、対象のファイルを選別します。
もし対象のデータが存在する場合はS3に「manifestファイル」をアップロードし、モデル学習時にこのファイルを参照します。
もし対象のファイルが存在しない場合は、構成図の「NoNewData」に移動し処理を終了します。
StartTrainingJob
S3にアップロードされた「training manifest」を参照して、学習を開始します。
学習を実行した後は次の「NotifySlack」に移ります。
NotifySlack
登録したSlackのチャンネルにmessageを通知します。通知した後は「NotifySlackBranch」に移ります。
NotifySlackBranch
学習・デプロイJOBのステータスを監視し、ステータスによって次に何を実行するかを判断します。 学習JOBが実行中なら「CheckStatusWait」、失敗したら「JobFailed」、成功したら「JobCompleted」に移動します。また、デプロイJOBが失敗したら「DeploymentFailed」、成功したら「DeploymentCompleted」に移ります。
CheckStatusWait
「指定した時間」ごとに「GetStatus」ステップに移ります。今回は60秒ごとに「GetStatus」ステップに移るようにしました。
GetStatus
学習・デプロイJOBのステータスを確認した後に「CheckStatusBranch」ステップに移ります
CheckStatusBranch
学習・デプロイJOBのステータスから、次に移動するステップを判断します。
もし学習・デプロイJOBが「終了 or 失敗」した場合は「NotifySlack」ステップに移ります。
それ以外の場合は「CheckStatusWait」ステップに移ります
JobCompleted
学習JOBが終了したことを明示的にするためのステップです。「DeployModel」ステップに移ります
DeployModel
学習したモデルを推論用エンドポイントとしてデプロイします。
もし既にエンドポイントが存在するようならUPDATE,存在しないようなら新しくエンドポイントを作成し,「NotifySlack」ステップに移ります。
(このデプロイはダウンタイム無しで実行されます。)
DeploymentCompleted
デプロイJOBが終了したことを明示的にするためのステップです。「UpdatedParameters」ステップに移ります
UpdateParameters
「AWS Systems Manager パラメータストア」をUPDATEし、今回のJOBで利用したデータを記録します。 その後、「End」ステップに移ります。
JobFailed
学習JOBが失敗したことを示します
DeploymentFailed
デプロイJOBが失敗したことを示します
3.処理の要点
「2.JOBフローの詳細」でちょっと細かいところまで確認してしまったので、ここで、「1.構成図」にて記述した各処理ステップの要点を再確認し、フローの全体図をなんとなく把握したいと思います。
今回の処理フローは下記の通りでした。
下記のフローを「具体的にどこでどのように制御しているか」を要点を絞って紹介しようと思います。
1.「CloudWatch Events」がスケジュールされたタイミングで「Step Functions」の「state machine」を起動する
2.「state machine」がS3をチェックし、学習に利用するデータを判別し、SageMakerの学習JOBを作成する
3.「State machine」が学習JOBの終了まで待機する
4.学習JOBが終了したら、「State machine」が最新のモデルをエンドポイントにデプロイする
5.「State machine」がデプロイ終了まで待機する
6.「State machine」がSlackに学習・デプロイに関する情報を通知する
3-1.「CloudWatch Events」がスケジュールされたタイミングで「Step Functions」の「state machine」を起動する
今回新規に作成した「step functions」のステートマシーンが「Cloud watch events」に登録されているので、スケジュールを指定しての実行が可能です。 これを...
このようにすると、毎日朝9時に実行できます。(GMTなので東京の9時間遅れ)
時間指定ではなく、特定のイベントの後に実行させることもできます。
学習用ファイルの準備(前処理)JOBの後にこのイベントを実行させる、というのが一番いい流れだと思います。
3-2.「state machine」がS3をチェックし、学習に利用するデータを判別し、SageMakerの学習JOBを作成する
S3に格納されているファイル、manifestファイル、「AWS Systems Manager パラメータストア」から、学習に利用するデータを絞り込みます。
この絞り込む条件についても精査が必要なのですが、例えば「古いデータは現時点の社会を反映していない」という考えから「直近2年間分のデータのみを使う」、と絞り込みが可能です。
一概に「全期間分のデータをとりあえず使った方がいい」とはならないので難しいところですね。
3-3.「State machine」が学習JOBの終了まで待機する
「AWS Step Functions」のWaitを使って待機しています。
現在は60秒待機するようにしていますが、ここも修正可能です。
3-4.学習JOBが終了したら、「State machine」が最新のモデルをエンドポイントにデプロイする
今回のテンプレートファイルで作成したLambda関数の「deploy_model.py」の中で実行します。
エンドポイントの「インスタンスタイプ」、「インスタンス数」等を変更したい時は、Lamdaの「環境変数」を変更することで修正可能です。
3-5.「State machine」がデプロイ終了まで待機する
「3-3.「State machine」が学習JOBの終了まで待機する」と同様です。
3-6.「State machine」がSlackに学習・デプロイに関する情報を通知する
JOBの実行結果をSlackに通知してくれます。
これは今回のテンプレートファイルで作成したLambda関数の「notify_slack.py」の中で実行されています。
デフォルトだと通知しないように作成されるので、もしSlackでの通知が必要なようなら、「ENABLED」、「CHANNEL」、「ACCESS_TOKEN」の3つの環境変数を設定しましょう。
全体のフロー等、細かいことの説明は以上で終了です。
続いて、早速東京リージョンに実装していきます。
4.やってみる
Running the Exampleを参考に、早速やってみようと思います。
4-0.Slackのワークスペースを作成する
JOBの実行結果をSlackに通知させたい場合は、Slack側の準備が必要です。もし、Slackへの通知が不要なら「4-0.Slackのワークスペースを作成する」は読み飛ばしてください。
今回、私は既に作成していたワークスペースを利用しました。もしこれからワークスペースを作成する、という方はSlack ワークスペースを作成するの通りにワークスペースを作成しましょう。
続いて、「app」を作成します。
app名とSlackのワークスペース名を指定して、「Create app」をクリックします。
credential情報が確認できます。外部に漏れないように厳重に管理しましょう。
「OAuth & Permissions」をクリックします。
「Select Permission Scopes」に「chat:write:bot」と記述すると、対象のappに権限を付与できるので、選択した後に「Save Changes」をクリックします。
続いて、Appをワークスペースにインストールするために、「Install App」欄の「Install App to Workspace」をクリックします。
このAppはワークスペースに通知を送るよ、という確認をした後に「Authorize」します。
「OAuth Access Token」が発行されます。これも後ほど使うので控えておきましょう。
また、通知先の「Slackチャンネル」も控えておいてください。私は「general」に送ることとします。
以上で、Slackの準備は完了です。
4-1.cloudformationで一気に環境構築
ドキュメントには「コンソール画面上からcloudformationを使ってオレゴンリージョンに環境構築」するやり方と「手動で指定したリージョンに環境構築する」、2パターンのやり方を紹介していました。
折角なので今回は「手動で東京リージョンに環境構築」してみようと思います。
東京リージョンに手動で環境構築
手動とはいえ、基本的には「cloud formationテンプレート」を利用します。
まずは、東京リージョンに対象となるS3バケットを作成した後、ローカルPCの環境変数に下記をセットします。
S3BUCKET=[REPLACE_WITH_BUCKET_TO_UPLOAD_TEMPLATE_ARTIFACTS_TO] REGION=[REPLACE_WITH_REGION_YOU_WISH_TO_DEPLOY_TO] STACKNAME=[REPLACE_WITH_DESIRED_STACK_NAME] SAGEMAKERROLE=[REPLACE_WITH_SAGEMAKER_EXECUTION_ROLE] ATOKEN=[REPLACE_WITH_OAUTH_ACCESS_TOKEN] CHANNEL=[REPLACE_WITH_SLACK_CHANNEL_NAME]
続いて、テンプレートファイルをcloneしてきます。
git clone https://github.com/aws-samples/serverless-sagemaker-orchestration.git cd serverless-sagemaker-orchestration/cloudformation
「continuous_sagemaker.serverless.yaml」がポツンと存在しているので、このファイルをaws cloudformation packageコマンドを使ってS3にアップロードします。
hoge:cloudformation yoshim.dayo$ ls -l total 32 -rw-r--r-- 1 yoshim.dayo staff 13850 12 5 19:09 continuous_sagemaker.serverless.yaml
aws cloudformation package --region $REGION --s3-bucket $S3BUCKET --template continuous_sagemaker.serverless.yaml --output-template-file continuous_sagemaker.output.yaml
すると、S3の指定したバケット,ローカルPCに新しくファイルが作成されるので、これらを元にスタックを作成できます。
hoge:cloudformation yoshim.dayo$ ls -l -rw-r--r-- 1 yoshim.dayo staff 14041 12 5 19:15 continuous_sagemaker.output.yaml -rw-r--r-- 1 yoshim.dayo staff 13850 12 5 19:09 continuous_sagemaker.serverless.yaml
下記のコマンドで、新しく作成されたファイルをもとにスタックを作成できます。
aws cloudformation deploy --region $REGION --template-file continuous_sagemaker.output.yaml --stack-name $STACKNAME --capabilities CAPABILITY_NAMED_IAM --parameter-overrides SageMakerExecutionRole=$SAGEMAKERROLE SlackAccessToken=$ATOKEN SlackChannel=$CHANNEL
私が実行した際は「S3バケットが既に存在している」とエラーが出ました。どうやら「continuous_sagemaker.output.yaml」でも先ほどと同名のS3バケットを作成しようとしていたようです。
なので「continuous_sagemaker.output.yaml」の下記の部分をコメントアウトして、再実行したところうまくいきました。
こんな感じにコメントアウト
#S3Bucket: #Properties: #BucketName: #Ref: BucketName #Type: AWS::S3::Bucket
4-2.学習の準備
続いて、SageMakerノートブックインスタンスを立ち上げ、データセットをS3バケットにアップロードします。
基本的にUploading Training Dataを参考に作業を進めていきます。
まずは、東京リージョンのノートブックインスタンスに「serverless-sagemaker-orchestration.ipynb」をアップロードします。
このファイルは、先ほどcloneしてきたリポジトリ内にあります。
続いて、ノートブックインスタンス上で「serverless-sagemaker-orchestration.ipynb」を開きます。
下記のS3バケットを先ほどから利用しているバケットに修正して、実行してサンプルデータをS3に配置します。
num_days = 3 # number of days to split housing prices data into. model_name = 'LinearLearner-HomePrices' # If you modified the ModelPrefix CloudFormation template change this to the value you modified it to be. bucket = '<NAME OF YOUR BUCKET HERE>' # Set this to the name of bucket created by CloudFormation template. Can be found in the output of the template. prefix = 'data/{}/train'.format(model_name) role = get_execution_role() region = boto3.Session().region_name boston = load_boston() target = boston.target data = [np.ndarray.tolist(row) for row in boston.data[:, :]] # Add target value as first column as expected by training algorithm training_set = [[row[0]] + row[1] for row in zip(target, data)] # Split data into seperate datasets for each day train_by_day = split_data_by_days(training_set, num_days) # Upload split datasets to S3 for day in range(num_days): current_date = date.today() - timedelta(day) key = '{}.csv'.format(current_date) write_to_csv(key, train_by_day[day]) s3_uri = 's3://{}/{}/{}'.format(bucket, prefix, key) print('Uploading {} to {}'.format(key, s3_uri)) boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, key)).upload_file(key)
このように、S3パスにサンプルデータが配置されます。
4-3.手動実行して稼働確認
続いて、手動でJOBを実行させてみます。
もしエラーが出る場合は、cloudWatchのログから原因を調査する必要があります。
私は下記の4点を修正しました。
CONTAINERS = {'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest', 'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest', 'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest', 'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest', 'ap-northeast-1': '351501993468.dkr.ecr.ap-northeast-1.amazonaws.com/linear-learner:latest'}
コンテナは下記で調べました。
import boto3 from sagemaker.amazon.amazon_estimator import get_image_uri container = get_image_uri(boto3.Session().region_name, 'linear-learner') print(container)
「Step Functions」のコンソール画面上からは、このようにJOBの過程も確認できます。
ちょっとづつJOBが進んでいく...。今はモデルを学習して、1分ずつステータスを確認しているところですね。
モデルの学習が終わって、デプロイしているようです。
SageMakerの画面に移っても、当然デプロイしているところが確認できます。
デプロイが終わるとこんな感じです。
ちゃんと推論用エンドポイントがデプロイされていました。
S3を確認してみたところ、トレーニングデータが格納されているパスに「manifest」ファイルが生成されていました。
次回の学習時には、この「manifest」ファイルを参照し、「次の学習に利用するデータ」を判別します。
このファイルは、「CheckData」ステップで「今回のJOBに利用したファイル」を記録して出力しているものです。
このファイルの詳細については「2.JOBフローの詳細」をご参照ください。
JOBの実行が完了するとAWS Systems Manager パラメータストアが更新され、「学習に利用したトレーニングファイルの最新日付」を値として保持します。
次のJOB実行時は「CheckData」ステップで、この値とS3に実際に保存されているファイルを突合させて、学習に利用するファイルを判別し、その結果をmanifestファイルとして出力し、モデル学習時にそのmanifestファイルを参照する、という流れになります。
また、Slackへの通知を確認してみると、下記のように通知が来ていることが確認できます。
5.まとめ
MLのワークフローを,一部とはいえサーバレスで実現することができました。
運用の簡易化、フローの可視化、コスト削減、等色々とメリットが考えられるこの構成ですが、「あくまでも機械学習のJOBフローのほんの一部」しか管理できていない点にはご注意ください。
ただ、それでもこの構成はMLワークフローの手間やコストを大いに削減してくれて、更に拡張性もあるいい構成だと思います。
かなり長くなってしまいましたが、本エントリーの内容は以上になります。 最後までありがとうございました。