AppSyncのクエリからStep Functionsのステートマシンを起動して時間のかかる処理を非同期に実行する

非同期アーキテクチャの強力な味方
2020.04.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AppSync勉強中@大阪の岩田です。

AppSyncではDynamoDBやLambda等のAWSサービスをデータソースとして指定することが可能ですが、データソースにHTTPエンドポイントを選択してAppSyncからの呼び出し時に適切なSIGv4の署名を付与することで、実質全てのAWSサービスと連携することが可能です。

以下のAWSブログで紹介されているAppSyncからStep Functionsのステートマシンを起動する処理を試してみたので手順をご紹介します。

Invoke AWS services directly from AWS AppSync

なお今回は勉強のために手作業多めでやりましたが、SAMテンプレート等含んだソースコード一式が以下のリポジトリで公開されています。

https://github.com/aws-samples/aws-appsync-long-query

作るもの

ざっくり以下のような環境を作成します。

※画像は前述のAWSブログから引用

GraphQLのオペレーションとしてsearchというクエリを定義。このクエリはAppSyncの制限時間である30秒以内に実行が完了しないような重い処理を想定しています。searchクエリの内部処理としてStep Functionsのステートマシンを利用して諸々の処理を行い、処理完了後にLambdaからpublishResultというミューテーションを実行、このミューテーションをトリガーにonSearchResultをサブスクライブしているクライアントにsearchクエリの結果が通知されるというアーキテクチャです。

APIの作成

それでは実際にAPIを作成していきましょう

IAMロールの作成

まずAppSyncからStep Functionsのステートマシンを実行するためにIAMロールを作成します。以下の画像のように

  • Step Functionsのステートマシンを実行する権限
  • AppSyncからロールを引き受けるための信頼関係

を設定します。

データソースの作成

続いてAppSyncの環境を設定していきます。新しいAPIを作成したあと、データソースを作成します。

タイプNoneのデータソースを作成

後ほどリゾルバーを作成する際に利用するためにデータソースタイプ:なしでダミーのデータソースを作成します。

タイプHTTPのデータソースを作成

続いてStepFunctions用にタイプ:HTTPのデータソースを作成します。マネコンからだとエンドポイントのURL以外の項目が指定できないので、ここはCLIから作成します。CLI用のインプットとして以下のJSONファイルを用意します。

http_ds.json

{
    "endpoint": "https://states.ap-northeast-1.amazonaws.com/",
    "authorizationConfig": {
        "authorizationType": "AWS_IAM",
        "awsIamConfig": {
            "signingRegion": "ap-northeast-1",
            "signingServiceName": "states"
        }
    }
}

ポイントはauthorizationConfig以下の設定です。この設定を入れておくことで、データソースに紐付けたIAMロールの認証情報を使用して

  • 東京リージョンの
  • Step Functions向けに

SIGv4の署名が自動作成されて、Authorizationヘッダにセットされるようになります。

JSONファイルが用意できたら以下のコマンドでデータソースを作成します。--service-role-arn には先程作成したIAMロールのARNを指定します。

$ aws appsync create-data-source --api-id <AppSyncのAPI ID> --name StepFunctionHttpDataSource --type HTTP --http-config file://http_ds.json  --service-role-arn <先程作成したIAMロールのARN>

以下のようなレスポンスが返ってくれば成功です。

{
    "dataSource": {
        "dataSourceArn": "arn:aws:appsync:ap-northeast-1:123456789012:apis/<AppSyncのAPI ID>/datasources/StepFunctionHttpDataSource",
        "name": "StepFunctionHttpDataSource",
        "type": "HTTP",
        "serviceRoleArn": "<先程作成したIAMロールのARN>",
        "httpConfig": {
            "endpoint": "https://states.ap-northeast-1.amazonaws.com/",
            "authorizationConfig": {
                "authorizationType": "AWS_IAM",
                "awsIamConfig": {
                    "signingRegion": "ap-northeast-1",
                    "signingServiceName": "states"
                }
            }
        }
    }
}

GraphQLスキーマの作成

続いてGraphQLのスキーマを作成します。AppSyncのスキーマページに以下のコードを貼り付けます。

type Result {
  id: ID!
  status: ResultStatus!
  listings: [String]
}

enum ResultStatus {
  PENDING
  COMPLETE
  ERROR
}

input ResultInput {
  id: ID!
  status: ResultStatus!
  listings: [String]!
}

type Query {
  # called by client to initiate long running search
  search(text: String!): Result
}

type Mutation {
  # called by backend when search is complete
  publishResult(result: ResultInput): Result
}

type Subscription {
  onSearchResult(id: ID!): Result
    @aws_subscribe(mutations: [ "publishResult" ])
}

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

注意事項として前述のAWSブログ及びGitHubのコードではサブスクリプション設定が

type Subscription {
  onSearchResult(id: ID!): [Result]
    @aws_subscribe(mutations: [ "publishResult" ])
}

となっていますが、このままだと型が一致せずにエラーになるので

type Subscription {
  onSearchResult(id: ID!): Result
    @aws_subscribe(mutations: [ "publishResult" ])
}

に変更しています。

リゾルバの設定

スキーマが登録できたらリゾルバーを定義していきます。

ミューテーションpublishResultのリゾルバーを設定

まずpublishResultのリゾルバーです。

データソースには先程作成したダミーのデータソースを選択し、リクエストマッピングテンプレートに以下通り入力します。

{
  "version": "2017-02-28",
  "payload": {
    "id": "${ctx.arguments.result.id}",
    "status": "${ctx.arguments.result.status}",
    "listings": "${ctx.arguments.result.listings}"
  }
}

レスポンスマッピングテンプレートは以下の通りです。

$util.toJson($ctx.arguments.result)

クエリsearchのリゾルバーを設定

続いてクエリsearchのリゾルバーです。データソースに先程作成したStep Functions用のデータソースを選択し、リクエストマッピングテンプレートを以下の通り設定します。

$util.qr($ctx.stash.put("executionId", $util.autoId()))

{
  "version": "2018-05-29",
  "method": "POST",
  "resourcePath": "/",
  "params": {
    "headers": {
      "content-type": "application/x-amz-json-1.0",
      "x-amz-target":"AWSStepFunctions.StartExecution"
    },
    "body": {
      "stateMachineArn": "arn:aws:states:<REGION>:<AWSアカウントID>:stateMachine:<後ほど作成するステートマシンの名前>",
      "input": "{ \"name\": \"$ctx.stash.executionId\" }"
    }
  }
}

ポイントはheadersでHTTPヘッダーを設定している部分です。ここでStep Functionsのステートマシンを起動するように指定し、bodyの部分でステートマシンのARNと入力値を設定します。

レスポンスマッピングテンプレートは以下の通りです。

{
  "id": "${ctx.stash.executionId}",
  "status": "PENDING"
}

Lambdaの準備

AppSyncの設定ができたので、Step Functionsから呼び出すLambdaを用意します。

Lambda レイヤーの作成

今回用意するコードはaxiosを利用するので、事前にaxios用のレイヤーを作成しておきしょう。ローカル環境で以下のコードを実行し、レイヤーを作成します。

$ mkdir nodejs && cd nodejs
$ npm install axios@0.18.1
$ cd ..
$ zip -r nodejs.zip  nodejs
$ aws lambda publish-layer-version --layer-name nodejs-axios --zip-file fileb://nodejs.zip --compatible-runtimes nodejs12.x

Lambda Functionの作成

AppSyncの設定ができたので、Step Functionsから呼び出すLambdaを用意します。Node.js 12.xを選択し、以下のコードを貼り付けます。

// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

////
const axios = require('axios');

////
const PublishResultMutation = `mutation PublishResult(
    $id: ID!,
    $status: ResultStatus!,
    $listings: [String]!
  ) {
    publishResult(result: {id: $id, status: $status, listings: $listings}) {
      id
      status
      listings
    }
  }`;

/**
 * Executes mutation on AppSync API.
 */
const executeMutation = async(id) => {
  const mutation = {
    query: PublishResultMutation,
    operationName: 'PublishResult',
    variables: {
      id: id,
      status: 'COMPLETE',
      listings: [ "foo", "bar" ]
    },
  };

  try {
    let response = await axios({
      method: 'POST',
      url: process.env.APPSYNC_ENDPOINT,
      data: JSON.stringify(mutation),
      headers: {
        'Content-Type': 'application/json',
        'x-api-key': process.env.APPSYNC_API_KEY,
      }
    });
    console.log(response.data);
  } catch (error) {
    console.error(`[ERROR] ${error.response.status} - ${error.response.data}`);
    throw error;
  }
};

/**
 * Main handler function.
 */
exports.handler = async(event) => {
  await executeMutation(event.name);
  return { message: `finished` };
}

コードを貼り付けたら先程作成したレイヤーを紐付けます。

続いて環境変数を以下のように設定します。

  • APPSYNC_API_KEY: <AppSyncのAPIキー>
  • APPSYNC_ENDPOINT: <AppSyncのエンドポイント>

Step Fuctionsのステートマシン作成

最後にStep Fuctionsのステートマシンを作成します。先程AppSyncのリクエストマッピングテンプレートで指定したのと同じ名前でステートマシンを作成します。

{
  "Comment": "Wait and return a result",
  "StartAt": "Wait For One Minute",
  "States": {
    "Wait For One Minute": {
      "Type": "Wait",
      "Seconds": 60,
      "Next": "Return Result"
    },
    "Return Result": {
      "Type": "Task",
      "Resource": "<先程作成したLambdaのARN>",
      "ResultPath": "$",
      "TimeoutSeconds": 10,
      "Catch": [{
        "ErrorEquals": [ "States.ALL" ],
        "ResultPath": "$.error",
        "Next": "HandleError"
      }],
      "End": true
    },
    "HandleError": {
      "Type": "Fail",
      "Cause": "$.error"
    }
  }
}

この例ではaws-appsync-long-queryという名前で作成しました。内容的には1分間Waitしてから指定されたLambdaを起動するだけの処理です。1分間Waitすることで時間のかかる処理をシミュレートしています。

これで準備完了です!

やってみる

準備ができたので実際に試してみます。まずAppSyncのコンソールから以下のクエリを実行します。

query Search {
  search(text: "123") {
    id
    status
  }
}

実行すると以下のようなレスポンスが返却されるので、idをコピーします。

{
  "data": {
    "search": {
      "id": "a0466741-6816-4cf8-b19c-c74c7cb738c8",
      "status": "PENDING"
    }
  }
}

先程コピーしたidを指定してサブスクライブを実行します。

subscription {
  onSearchResult(id: "<先程コピーしたID>") {
   status
    listings
  }
}

1分ほど経過するとStep Functionsで処理した結果が自動で表示されます。

無事成功です!

まとめ

AppSyncとStep Functionsを連携する方法についてご紹介しました。サーバーレスアーキテクチャやマイクロサービスアーキテクチャを採用したシステムでは複数のサービスを連携して非同期に処理を実行することが多くなると思います。AppSyncのリゾルバーとしてStep Functionsを使うというのは非常な強力な選択肢になるのではないでしょうか?