Amazon SWF WorkflowWorkerのクセを読み解く

Amazon SWF

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、せーのです。今日はSWFにてFlowFrameworkを使用する際に私がハマったポイントをお話します。

要件

今回何をやりたいのか、というのをまずご説明します。バッチ処理が複数のサーバーにわかれていまして、その処理をbat1, bat2, bat3と順番に流したい。bat1が終わってからbat2がスタートする、といった感じで管理をしたいわけです。AWSサービスをこの要件に当てはめると幾つかサービスが考えられますが、今回は柔軟性を考慮してSWFを使ってみたいと思います。

swf-batch01

このようにSWFでバッチの流れを管理することで、例えばbat2の処理がすごく時間がかかる場合にはbat2をスケールアウトしてbat2-1〜bat2-100、のように100台くらい並列で並べることで一気に時間短縮になります。またbat2の100台のバッチ処理が全て終了した時点でbat3をスタートする、3回リトライしてダメな場合は捨てる、等の拡張が最小限のコード変更で可能になります。

経緯

SWFをそのまま使用するのはプロセス管理等が大変なのでAWSから提供されていますFlowFrameworkを使用します。JavaとRubyがあるのですが今回はJavaで実装したいと思います。FlowFrameworkではDeciderに当たる部分を[Workflow]クラス、Workerに当たる部分を[Activity]クラスを実装(implements)したクラスを書くことで実現させます。今回書きましたWorkflowとActivityはそれぞれこのようになります。

package com.amazonaws.services.simpleworkflow.flow.examples.modelbatch;

import com.amazonaws.services.simpleworkflow.flow.annotations.Asynchronous;
import com.amazonaws.services.simpleworkflow.flow.core.Promise;
import com.amazonaws.services.simpleworkflow.flow.core.AndPromise;
import com.amazonaws.services.simpleworkflow.flow.core.TryCatch;

/**
 * Implementation of the hello world workflow
 */
public class BatchWorkflowImpl implements BatchWorkflow{

    Batch01ActivitiesClient client01 = new Batch01ActivitiesClientImpl();
    Batch02ActivitiesClient client02 = new Batch02ActivitiesClientImpl();
    Batch03ActivitiesClient client03 = new Batch03ActivitiesClientImpl();

    BatchResult pubData;
    boolean retExit = false;
    @Override
    public void runBatch(BatchResult data) {
        final BatchResult argData = data;
        pubData = data;

        TryCatch f = new TryCatch() {
            @Override
            protected void doTry() throws Throwable {
                System.out.println("batch start");
                
                System.out.println("bat01 start");
                Promise<BatchResult> bat01 = client01.execBatch(argData);
                
                checkprocessResults(bat01);

                System.out.println("bat02 start");
                Promise<BatchResult> bat02 = client02.execBatch(bat01);
                
                checkprocessResults(bat02);

                System.out.println("bat03 start");
                Promise<BatchResult> bat03 = client03.execBatch(bat02);
                
                checkprocessResults(bat03);

                closingBatch(bat03);
            
            }
             
            @Override
            protected void doCatch(Throwable arg0) throws Throwable {
                System.out.println(arg0.toString());
            }
        };
 
    }

    @Asynchronous
    void checkprocessResults(Promise<BatchResult> promise){
        System.out.println("batch check start");
        checkprocessResults(promise.get());      
    }

    @Asynchronous
    void checkprocessResults(BatchResult promise){
        if (promise.ret == 99){
            System.out.println("System error.");
            retExit = true;
        }
        if (promise.ret == 9){
            System.out.println("activity check error.");
            retExit = true;
        }
    }

     @Asynchronous
    void closingBatch(Promise<BatchResult> data){
        System.out.println("バッチ終了");
    }
   
}
package com.amazonaws.services.simpleworkflow.flow.examples.modelbatch;

/**
 * Implementation of the hello world activities
 */
public class Batch01ActivitiesImpl implements Batch01Activities {
    
	@Override
	public BatchResult execBatch(BatchResult data) {
		System.out.println("batch01-Activity");
		String filename = "バッチのパス";
		int ret = 99;
		try {
		      
		      Process proc = Runtime.getRuntime().exec(filename);
		      System.out.println("実行中");
		      ret = proc.waitFor();
		      System.out.println("実行終了。戻り値:" + ret);
		    } catch( Exception e ) {
		      System.out.println(e);
		    }
		data.ret = ret;
		return data;	
	}

}

package com.amazonaws.services.simpleworkflow.flow.examples.modelbatch;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
import com.amazonaws.services.simpleworkflow.flow.examples.common.ConfigHelper;

public class BatchResult {

    public int ret;
    public String executeDate;
    
}

ではこの処理を流してみましょう。処理をスタートするStarterの部分には[WorkflowExecution]クラスを実装したものを使ってworkflowを叩きます。実際に流してみると予想と違う動きをしました。

Workflowのログ

batch start
bat01 start
bat02 start
bat03 start
batch start
bat01 start
batch check start
bat02 start
bat03 start
batch start
bat01 start
batch check start
bat02 start
batch check start
bat03 start
batch start
bat01 start
batch check start
bat02 start
batch check start
bat03 start
batch check start
バッチ終了

Activity01のログ

batch01-Activity
実行中
実行終了。戻り値:0

Workflowの実装クラスは1回しか叩いていないのですがWorkflowの開始を通知するログが何回も吐かれています。他にも後処理を書いていた部分も複数回通っているようです。では何回もバッチ処理が走っているのか、というとActivityのログを見る限りバッチ自体は一回しか通っていないようです。これはどういうことでしょう。

原因

SWFは「メソッド」ではなく「タスク」で動く

SWFではDeciderは管理役、指示役であり、実際の作業はActivityが行います。Activityでの処理が終わると処理がDeciderに戻ってきますので、Activityでの結果を戻り値に含めておくことでDeciderは前に起きた処理の結果を踏まえて次の処理を決定することができます。

swf-batch02

上に書いたコードをそのまま素直に読むとclient01.execBatch(argData);でアクティビティが呼ばれ、その戻り値がPromise bat01に入る、というイメージですが、実際のSWFの動き方は違います。ドキュメントを読んでみましょう。

The worker class runs a loop to poll for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @Execute method to process the task.

(workerクラスは規定されたタスクリストで[decision task]をポーリングし続けます。[decision task]を受け取ったらworkflowの実装インスタンスを作成し、@Executeメソッドを呼び出してタスクを処理します。)

ん?でしじょんたすく?ってなんでしょう。
SWFはタスクによる管理をしていまして、DeciderからActivityを呼び出した時は直接クラスを叩かれるわけではなく、Activity Tasklitの中にActivity Taskをキューイングすることで実現します(ScheduleActivityTaskDecisionAttributes)。SWFのタスクはSQSと違い必ず「1回限り」であり、重複がないことが保証されています
Activityでの処理が終了するとActivityはSWFに対して「終わったよ」という通知を送り(RespondActivityTaskCompleted)SWFはその通知を受け取ってDecision TasklistにDecision Taskを登録します(DecisionTaskScheduledEventAttributes)。DeciderはそのDecision Taskをポーリングしていて、Decision Taskを受け取ったら@Executeメソッドを実行します。つまりActivityが終了するごとにDecision Taskが登録されるので、Activityの回数分@Executeメソッドが実行されることになるのです。図に書くとこのようになります。

swf-batch03

しかしここでひとつの疑問が浮かんできます。Activityの回数分@Executeメソッドが実行されるのであれば、同じ回数分Activityの呼び出し部分も通っているはずなので、バッチ処理も複数回走るはずです。ですがActivityのログを見る限り1回しか通っていないようにみえます。これはどうしてなのでしょうか。実はこれはFlowFrameworkのあるメカニズムが関係しているのです。

Activityの動きはFlowFrameworkで制御する

FlowFrameworkのメカニズムとはなんでしょう。ドキュメントを見てみましょう。

Because activities can be long-running, it's undesirable to have the workflow simply block until it completes. Instead, the AWS Flow Framework manages workflow execution by using a replay mechanism, which relies on the workflow history maintained by Amazon SWF to execute the workflow in episodes.

Each episode replays the workflow logic in a way that executes each activity only once, and ensures that activities and asynchronous methods don't execute until their Promise objects are ready.

(Activityが長時間処理を行う可能性があるため、Activityが終了するまでWorkflowがシンプルにブロックを持つことは望ましくありません。その代わりAWS Flow FrameworkはWorkflowの実行処理を[replayメカニズム]によって管理します。[replayメカニズム]は全てのエピソードをSWFが最後まで実行できるようにWorkflowの実行履歴に依存しています。

各エピソードはそれぞれのActivityが一回のみ実行するようにWorkflowロジックを繰り返し、ActivityとAsynchronousメソッドはPromiseオブジェクトがreadyになるまで実行されないことを保証します。)

http://docs.aws.amazon.com/ja_jp/amazonswf/latest/awsflowguide/awsflow-basics-distributed-execution.html

いろいろな用語が出てきて少し複雑になってきました。ひとつずつ読み解いていきましょう。
FlowFrameworkは[replayメカニズム]というコンセプトにそって分散処理を行います。Workflowクラスを何回もReplay(繰り返し実行)することで、一連の処理が終わるまでWorkflowにプロセスが残り続けることを回避しているわけですね。「エピソード」とはWorkflowクラスの@Executeメソッドが1回通る度に処理される一連の流れを指します。では「実行履歴に依存する」とはどういうことでしょう。SWFの処理の流れをReplayメカニズムを元に図にするとこのようになります。

swf-batch04

@Excecuteメソッドが通る際には「他のActivityの終了に依存しない」Activityが実行されます。他のActivityの終了に依存する、というのは具体的に言うとActivityの呼び出し時に引数として他のActivityのPromiseオブジェクトが記述されている場合を指します。そして1回Activityが実行されるとそれがWorkflowの実行履歴に記録され、Decision Taskに引き継がれます。その状態で@Executeメソッドを再び通るときにはFlowFrameworkがWorkflowの実行履歴をチェックして一度呼び出されたActivityはスキップします。つまり今回で言えばWorkflowは「初回(bat1が実行)」「2回目(bat2が実行)」「3回目(bat3が実行)」という3エピソードに分けられ、例えば2回目のエピソードの際にはbat1は実行履歴で処理終了が確認できるのでスキップ、bat3はbat2の終了に依存しているためスキップされ、bat2のみ実行されることになります。

Asynchronousメソッドは複数回通る可能性がある

ここでもう一つ、陥りがちなポイントを押さえておきましょう。

Asynchronous workflow methods are often used much like activities, because the method defers execution until all input Promise objects are ready. However, the replay mechanism handles asynchronous methods differently than activities.

  • Replay does not guarantee that an asynchronous method will execute only once. It defers execution on an asynchronous method until its input Promise objects are ready, but it then executes that method for all subsequent episodes.
  • When an asynchronous method completes, it does not start a new episode.

(Asyncronous WorkflowメソッドはPromiseオブジェクトがreadyになるまで実行が遅れるところからActivityと同じように扱いますが、ReplayメカニズムはAsynchronousメソッドとActivityに対しては異なるハンドリングをします。

  • ReplayメカニズムAsynchronousメソッドが一回のみ実行されることを保証するものではありません。AsynchronousメソッドはPromiseオブジェクトがreadyになるまでは実行が遅れますが、それ以降は後続のエピソード全てで実行されます。
  • Asynchronousメソッドが終了すると新しいエピソードはスタートしません。

http://docs.aws.amazon.com/ja_jp/amazonswf/latest/awsflowguide/awsflow-basics-distributed-execution.html

このようにAsynchronousメソッドはActivityと違い、引数となるPromiseオブジェクトがreadyになった後は@Executeメソッドが通る度に実行されますPromiseオブジェクトはActivityが終了するとreadyになるので、今回の例で言えば2回目のエピソードが実行される際にはbat1のPromiseオブジェクトが引数になっているAsynchronousメソッドも実行される、ということになります。

対応

ここまでのSWF、そしてFlow Frameworkの動きを踏まえて実装のポイントをまとめてみます。

  • Workflowクラスは繰り返し実行される前提で短時間で終わるように実装する
  • Activityクラスには一度だけ実行されるもの、処理に長時間かかるものを実装する
  • Asynchronousメソッドは何回も通るもの、という前提で実装する

このポイントを押さえて上のクラスを書きなおしてみます。

package com.amazonaws.services.simpleworkflow.flow.examples.modelbatch;

import com.amazonaws.services.simpleworkflow.flow.annotations.Asynchronous;
import com.amazonaws.services.simpleworkflow.flow.core.Promise;
import com.amazonaws.services.simpleworkflow.flow.core.AndPromise;
import com.amazonaws.services.simpleworkflow.flow.core.TryCatch;

/**
 * Implementation of the hello world workflow
 */
public class BatchWorkflowImpl implements BatchWorkflow{

    Batch01ActivitiesClient client01 = new Batch01ActivitiesClientImpl();
    Batch02ActivitiesClient client02 = new Batch02ActivitiesClientImpl();
    Batch03ActivitiesClient client03 = new Batch03ActivitiesClientImpl();

    PreActivitiesClient pre = new PreActivitiesClientImpl();

    BatchResult pubData;
    boolean retExit = false;
    @Override
    public void runBatch(BatchResult data) {
        final BatchResult argData = data;
        pubData = data;

        TryCatch f = new TryCatch() {
            @Override
            protected void doTry() throws Throwable {
                System.out.println("Workflow start(Episode start)");
                
                Promise<String> prebat01 = pre.bat01treatment();
                Promise<BatchResult> bat01 = client01.execBatch(prebat01);
                
                checkprocessResults(bat01);
                
                Promise<String> prebat02 = pre.bat02treatment(bat01);
                Promise<BatchResult> bat02 = client02.execBatch(prebat02);
                
                checkprocessResults(bat02);

                Promise<String> prebat03 = pre.bat03treatment(bat02);                
                Promise<BatchResult> bat03 = client03.execBatch(prebat03);
                
                checkprocessResults(bat03);

                closingBatch(bat03);
            
            }
             
            @Override
            protected void doCatch(Throwable arg0) throws Throwable {
                System.out.println(arg0.toString());
            }
        };
 
    }

    @Asynchronous
    Private Promise<BatchResult> checkprocessResults(Promise<BatchResult> promise){
        System.out.println("batch ");
        checkprocessResults(promise.get());
        return Promise.asPromise(promise);
    }

    @Asynchronous
    void checkprocessResults(BatchResult promise){
        if (promise.ret == 99){
            System.out.println("System error.");
        }
        if (promise.ret == 9){
            System.out.println("activity check error.");
        }
    }

     @Asynchronous
    void closingBatch(Promise<BatchResult> data){
        System.out.println("バッチ終了");
    }
   
}

このようになりました。チェック用メソッドはasPromiseメソッドで継続させることでチェックが終わるまで次にいかないような仕組みを増やしました。

package com.amazonaws.services.simpleworkflow.flow.examples.modelbatch;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
import com.amazonaws.services.simpleworkflow.flow.examples.common.ConfigHelper;

public class BatchResult {

    public int ret;
    public String executeDate;
    public String activityName;
    
}
package com.amazonaws.services.simpleworkflow.flow.examples.modelbatch;

/**
 * Implementation of the hello world activities
 */
public class Batch01ActivitiesImpl implements Batch01Activities {
    
	@Override
	public BatchResult execBatch(BatchResult data) {
		System.out.println("batch01-Activity");
		String filename = "/app/textmodel/store.marketdata/start.sh " + data.dt1 + " " + data.dt2;
		//String filename = "/Users/Tsuyoshi/dev/mtec/prj-mtec/swf/testbatch/test.sh";
		int ret = 99;
		String activityName = "bat01";
		data.activityName = activityName;
		try {
			  System.out.println("bat01 start");
		      Process proc = Runtime.getRuntime().exec(filename);
		      System.out.println("実行中");
		      ret = proc.waitFor();
		      System.out.println("実行終了。戻り値:" + ret);
		    } catch( Exception e ) {
		      System.out.println(e);
		    }
		data.ret = ret;
		return data;	
	}

}
public class PreActivitiesImpl implements PreActivities {

    public boolean bat01treatment(){
        System.out.println("pretreatment bat01");

        return true;
    }

    public boolean bat02treatment(){
        System.out.println("pretreatment bat02");

        return true;
    }

    public boolean bat03treatment(){
        System.out.println("pretreatment bat03");

        return true;
    }
}

これで要件通りの動きをするようになりました。

まとめ

いかがでしたでしょうか。SWFはもうクセがすごい(千鳥風)ですが、本質を捉えてしまうと好きな処理が好きなように動くすごい武器に変わります。一緒に勉強していきましょう。

参考サイト