AWS Step Functionsでタスクをローカルマシンに実装する

2016.12.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS Step Functionsは各Stepで実行するタスクを任意のマシン上で実行することができます。このためLambdaとは異なりタイムアウトを気にする必要は無くなります。

今回は公式のチュートリアルを参考にJava + Spring Boot、東京リージョンで実装してみました。以下、フレームワークやリージョンによるコードの変更も踏まえて書きたいと思います。

実装について

以下の手順で実装していきます。

  1. アクティビティの作成
  2. ステートマシンの作成
  3. (ワーカーとなる)タスクの実装
  4. 実行

アクティビティの作成

マネージドコンソールよりStep Functionsを選択し、画面左より「Tasks」を選択して「Create new activity」ボタンを押下します。

step_function_create_new_activitiy

「New Activity」欄に任意の名前を入力して「Create Activity」ボタンを押下します。押下後、作成したアクティビティのarnをメモしておきます。(後ほど使用します)

step_functions_create_activity

ステートマシンの作成

画面左より「Dashboard」を選択して「Create a State Machine」ボタンを押下します。

step_functions_create_new_state_machine

「Give a name to your state machine」欄に任意の名前を入力します。

step_functions_give_a_name

「Code」欄に以下のjsonを入力します。「Resource」の値を先ほど作成したアクティビティのarnに置き換えて下さい。

{
  "Comment": "An example using a Task state.",
  "StartAt": "getGreeting",
  "Version": "1.0",
  "TimeoutSeconds": 300,
  "States": 
  {
    "getGreeting": {
      "Type": "Task",
      "Resource": "arn:aws:states:eu_central-1:123456789012:activity:get-greeting",
      "End": true
    }
  }
}

「Preview」を押下するとジョブフローが表示されます。「Create State Machine」ボタンを押下します。

step_functions_create_state_machine

「IAM role for your state machine executions」というダイアログが表示されるので、デフォルトの「StatesExecutionRole-ap-northeast-1」を選択して「OK」を押下します。

(ワーカーとなる)タスクの実装

タスクを実装します。今回はSpring Bootを使用したのですが、そのプロジェクトのbuild.gradleは以下のようになりました。

buildscript {
	ext {
		springBootVersion = '1.4.2.RELEASE'
	}
	repositories {
		mavenCentral()
	}
	dependencies {
		classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
	}
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

jar {
	baseName = 'stepFunctionsSample'
	version = '0.0.1-SNAPSHOT'
}
sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
	mavenCentral()
}


dependencies {
	compile('org.springframework.boot:spring-boot-starter')
	compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.63'
	testCompile('org.springframework.boot:spring-boot-starter-test')
}

次にmain()を持つApplicationクラスです。

package jp.classmethod;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.stepfunctions.AWSStepFunctionsClient;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;
import com.amazonaws.services.stepfunctions.model.SendTaskFailureRequest;
import com.amazonaws.services.stepfunctions.model.SendTaskSuccessRequest;
import com.amazonaws.util.json.Jackson;
import com.fasterxml.jackson.databind.JsonNode;

@SpringBootApplication
public class StepFunctionsSampleApplication {
	private final String ACCESS_KEY = "アクセスキー";
	private final String SECRET_KEY = "Secretアクセスキー";
	private final String ACTIVITY_ARN = "アクティビティのarn";
	
	public static void main(String[] args) {
		try (ConfigurableApplicationContext ctx = SpringApplication.run(StepFunctionsSampleApplication.class, args)) {
        	StepFunctionsSampleApplication app = ctx.getBean(StepFunctionsSampleApplication.class);
            app.run(args);
        } catch (Exception e) {
            e.printStackTrace();
        }
	}
	
	public void run(String... args) throws Exception {
		System.out.println("Worker start.");
        AWSStepFunctionsClient client = new AWSStepFunctionsClient(
                new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY),
                new ClientConfiguration());
        Region region = Region.getRegion(Regions.AP_NORTHEAST_1);
        client.setRegion(region);
 
        String greetingResult;
        while (true) {
            GetActivityTaskResult getActivityTaskResult =
                    client.getActivityTask(
                            new GetActivityTaskRequest().withActivityArn(
                                    ACTIVITY_ARN));
 
            if (getActivityTaskResult != null) {
                try {
                    JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
                    greetingResult = getGreeting(json.get("who").textValue());
                    client.sendTaskSuccess(
                            new SendTaskSuccessRequest().withOutput(
                                    greetingResult).withTaskToken(getActivityTaskResult.getTaskToken()));
                    System.out.println("Worker sendTaskSuccess.");
                } catch (Exception e) {
                    client.sendTaskFailure(new SendTaskFailureRequest().withTaskToken(
                            getActivityTaskResult.getTaskToken()));
                }
            } else {
                Thread.sleep(1000);
            }
        }
    }
	
	private String getGreeting(String who) throws Exception {
        return "{\"Hello\": \"" + who + "\"}";
    }
 
}

20〜22行目で定数を定義しています。ACCESS_KEY・SECRET_KEYにはそれぞれAWSのキーの値を、ACTIVITY_ARNには先ほど作成したアクティビティのarnを定義してください。また東京リージョンで実行したので38〜39行目でリージョンを指定しています。

実行

ステートマシンの実行、タスク(Java)の実行、ステートマシンの確認という順で実行します。

ステートマシンの実行

画面左より「Dashboard」を選択して「New Execution」を押下します。

step_functions_new_execution

タスクに渡す値として以下のjsonを入力して「Start Execution」を押下します。

{
    "who" : "AWS Step Functions"
}

step_functions_start_execution

タスク(Java)の実行

先ほど実装したJavaを実行します。

ステートマシンの確認

マネージドコンソールより実行したステートマシンをクリックします。右側の「Execution Details」にある「Input」タブを選択すると、タスクに渡したjsonが表示されます。

step_functions_execution_input2

その隣の「Output」タブを選択すると、タスクが返却した値が表示されます。

step_functions_execution_output

最後に実行したステートマシンを終了します。

参考サイト

Tutorial: An Activity State Machine