この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、小澤です。
Hadoopは大規模データに対するバッチ処理を想定しているため、処理に時間がかかることがあります。 EMRではプログラムからステップ実行を行うと、処理が登録されたのち完了を待たずにすぐに制御を返してくれます。
ステップ実行の組み合わせのみで解決するのであればいいのですが、 処理完了後のデータをEMR外から使いたいなどで、処理が終わったのを検知してから次の処理を実行したいことがあります。
boto3からステップ実行
では、まずboto3からステップ実行の追加をしてみます。 今回はならんらかのHiveスクリプトを実行しているものとします。
import boto3
client = boto3.client('emr', 'ap-northeast-1')
cluster_id = <Your EMR Cluster ID>
steps = client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps = [
{
'Name': 'Hive_job',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hive-script',
'--run-hive-script',
'--args',
'-f',
's3://path/to/hive/script',
'-d',
'param1=value1',
'-d',
'param2=value2',
]
}
},
]
)
内容としては、管理コンソールから確認できるステップ実行のコマンドをそのまま与えた感じになります。 「ActionOnFailure」のみは管理コンソールだと日本語になっているので、対応関係を把握しておく必要があります。
選択できるものは
- TERMINATE_JOB_FLOW
- TERMINATE_CLUSTER
- CANCEL_AND_WAIT
- CONTINUE
となります。 また、Steps引数は配列形式でうけとるので複数のステップを一度に追加することもできます。
ステップの実行状況確認
ステップの実行を行った際の戻り値から、実行の状況を確認することができます。 戻り値は、追加したステップのIDのリストになっているので、1つ目のステップのIDは
step_id = steps['StepIds'][0]
のように取得することができます。 このステップIDを指定して、実行の状況を確認することができます。 戻り値から、Steps -> Status -> State と掘っていくことで現在のステータスを取得できます。
step_info = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])
# 追加した1つ目のステップIDを取得
step_status = step_info['Steps'][0]['Status']['State']
この値は
- PENDING
- CANCEL_PENDING
- RUNNING
- COMPLETED
- CANCELLED
- FAILED
- INTERRUPTED
のいずれかとなります。
ステップが終了するまで待機
ここまでくればあとは簡単です。
ステータスの値が取得出たのであとは完了するまで待ちましょう
import time
step_info = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])
step_status = step_info['Steps'][0]['Status']['State']
while step_stutus in ['PENDING', 'RUNNING']:
step_info = client.list_steps(ClusterId=cluster_id, StepIds=[step_id])
step_status = step_info['Steps'][0]['Status']['State']
time.sleep(60)
一定間隔でポーリングして、保留や実行中の状態であることを検知しています。 ここでは成功/失敗やキャンセルを考慮していませんが、これで完了するまで待つという処理が実現できます。
また、今回は1つのステップしか追加していない前提で配列のインデックスを指定していますが、 複数登録している場合はそれぞれで確認してやることとで同様の処理が実現できます。
終わりに
今回はboto3を使ってEMRでステップを追加した後のその結果を利用して後続の処理を行いたい場合の方法を解説しました。 実は、ステップ実行を行わずに直接処理を実行するなどすれば、完了後に後続の処理というのは比較的簡単に行えますが、 roleは与えられているがHiveServer2のポートは解放されていないなどで必要になる場面もあるかと思います。