AWS Step FunctionsでLambdaを組み合わせたバッチ処理を作る

AWS re:Invent 2016で発表されたAWS Step Functionsは複数のLambda Functionを組み合わせて分散アプリケーションを作ることができるサービスです(Lambdaのほか、EC2、ECSでも可能)。
今回はバッチ処理の中でよくある一連の処理(ファイル読み込み、編集、DB登録)をStep Functionsで組み合わせてみます。

各Stepの概要

Step FunctionsではDSL(Amazon States Language)を使って状態遷移を実装します。DSLで指定できるStepは以下の通りです。

State 説明
Pass 何もしない。inputデータをoutputへそのまま渡す
Task 処理の実行単位
Choice 分岐
Wait 開始まで待つ、日時指定可能
Succeed 成功
Fail 失敗
Pararell 並列処理

Amazon States Languageの詳細

バッチ処理

それでは状態遷移を実装していきましょう。以下の処理を行います。

  1. CSVファイルを取得してS3バケットに保存
  2. ファイルが存在するかどうか判定。存在すれば編集処理へ、存在しなければ終了
  3. 上で保存されたCSVファイルを編集して別のS3バケットに保存
  4. 編集したCSVをRDSに保存、同時に処理対象が存在したことを管理者に通知

これらの状態を管理するために、Task, Choice, Pass, ParallelのStepを組み合わせてDSLを書いていくと、このようになりました。

ScreenShot-stepfn

Amazon States Language

{
  "Comment": "fetch csv and insert records",
  "StartAt": "Fetch_CSV",
  "States": {
    "Fetch_CSV": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FetchCSV",
      "Next": "Record_Exists?"
    },
    "Record_Exists?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.response_cd",
          "StringEquals": "1",
          "Next": "Modify"
        },
        {
          "Variable": "$.response_cd",
          "StringEquals": "2",
          "Next": "No_File"
        }
      ],
      "Default": "Modify"
    },
    "No_File": {
      "Type": "Pass",
      "End": true
    },
    "Modify": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:ModifyCSV",
      "Next": "Insert"
    },
    "Insert": {
      "Type": "Parallel",
      "Next": "Insert_Finished",
      "Branches": [
        {
          "StartAt": "Insert_Records",
          "States": {
            "Insert_Records": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:InsertRecord",
              "End": true
            }
          }
        },
        {
          "StartAt": "Notify",
          "States": {
            "Notify": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:NotifySLK",
              "End": true
            }
          }
        }
      ]
    },
    "Insert_Finished": {
      "Type": "Pass",
      "End": true
    }
  }
}

各タスクの説明

それぞれのタスクの説明をします。
Lambda Functionはそれぞれの処理を実装すれば良いのですが、Step Functionsで状態を管理するための実装もあるのでコードも一応載せておきます。

1.CSVの取得

CSV取得処理では、ファイルが存在するかどうかでレスポンスコードを変えています。 後続のファイル存在有無判定でこのコードを使用します。

Amazon States Language

"Fetch_CSV": {
  "Type": "Task",
  "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FetchCSV",
  "Next": "Record_Exists?"
},

Lambda Function

import requests
import boto3

def handler(event, context):
  s3 = boto3.resource('s3')
  response = requests.get('http://www.example.com/test.csv')

  # ファイルが存在しない場合はレスポンスコード:2 を返却
  if not (response.status_code == 200): return {u'response_cd': u'2'}

  obj = s3.Bucket('test-bucket').Object('test.csv')
  res = obj.put(
    Body="".join(response.text).encode('utf-8'),
    ContentEncoding='utf-8',
    ContentType='text/plane'
  )

  # ダウンロード処理を行った場合はレスポンスコード:1 を返却
  return {u'response_cd': u'1'}

2.ファイル存在有無の判定

判定は前処理のレスポンスコードを使用しています。 Choiceタスクの中でレスポンスコードを見て、分岐させています。

Amazon States Language

"Record_Exists?": {
  "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.response_cd",
        "StringEquals": "1",
        "Next": "Modify"
      },
      {
        "Variable": "$.response_cd",
        "StringEquals": "2",
        "Next": "No_File"
      }
    ],
  "Default": "Modify"
},

3. ファイルが存在しない場合の終了処理

前の処理で処理対象のファイルが存在しなかった場合、バッチの実行を終了します。

Amazon States Language

"No_File": {
  "Type": "Pass",
  "End": true
},

4.ファイル編集処理,

ここでは、取得したCSVの中から必要なデータのみを抽出して別のS3バケットに保存します。
(編集処理の中身は特に関係ありません。)

Amazon States Language

"Modify": {
  "Type": "Task",
  "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:ModifyCSV",
  "Next": "Insert"
},

Lambda Function

import boto3

def handler(event, context):
  s3 = boto3.resource('s3')
  bucket = s3.Bucket('test-bucket')
  obj = s3.Object('test-bucket', 'test.csv')
  body = obj.get()['Body'].read().decode('utf-8')

  content = []
  for i, line in enumerate(body.split('\r\n')):
    if i < 2: continue
    content.append(','.join(line.split(',')[0:4]) + "\n")

  obj2 = s3.Bucket('test-bucket2').Object('modify.csv')
  obj2.put(
    Body="".join(content).encode('utf-8'),
    ContentEncoding='utf-8',
    ContentType='text/plane'
  )

5. CSVをRDSに保存、バッチ管理者へ処理をすることの通知処理

このバッチでは処理対象が存在した場合は管理者へ通知するという前提で処理を組み立てることにします。 CSVファイルの内容をRDSに保存する処理と管理者への通知は逐次処理で行う必要もないのでParallelタスクを使って並列実行します。 (db登録処理の内容は特に関係ありません。)

Amazon States Language

"Insert": {
  "Type": "Parallel",
  "Next": "Insert_Finished",
  "Branches": [
    {
      "StartAt": "Insert_Records",
      "States": {
        "Insert_Records": {
          "Type": "Task",
          "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:InsertRecord"\
,
          "End": true
        }
      }
    },
    {
      "StartAt": "Notify",
      "States": {
        "Notify": {
          "Type": "Task",
          "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:NotifySLK",
          "End": true
        }
      }
    }
  ]
},

Lambda Function

import boto3
import psycopg2

def handler(event, content):
  s3 = boto3.resource('s3')
  obj = s3.Object('test-bucket2', 'modify.csv')
  body = obj.get()['Body'].read().decode('utf-8')

  try:
    conn = psycopg2.connect(
      host="hostname",
      database="dbname",
      port="5432",
      user="username",
      password="password"
    )
    cur = conn.cursor()

    for i, line in enumerate(body.split('\n')):
      if i < 1: continue
      if not len(line.split(',')) == 4: continue
      date, usd, gbp, eur = [i for i in line.split(',')]
      cur.execute("insert into exchange values(%s, %s, %s,%s)", [date, usd, gbp, eur])

    conn.commit()
    cur.close()
    conn.close()
  except:
    print("error")

まとめ

AWS Step Functionsを使うとLambda Functionの実装で状態を持つことなく複数Lambdaを組み立てた処理を構築することができます。
それぞれのFunctionは状態を持たずに別のサービス(Step Functions)で状態を管理する仕組みは、書いていて関数型プログラミングで処理を組み立てているような印象を受けました。状態を持たないので保守性、可読性の高いコードを書くことが容易になります。
もちろんマネージドサービスを使うことによる、運用コストが下がることやスケールの容易さも魅力です。