Serverless Frameworkで構築するStep Functions & AWS Glue Jobを用いたワークフロー

ServerlessFrameworkで作るStep Functions + Glueでつくるワークフロー
2020.02.18

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

好物はインフラとフロントエンドのかじわらゆたかです。 タイトルだけでなんかお腹いっぱい感ありますが、説明したいことを詰め込んだらこの様になりました。

Serverless Frameworkのプラグインを用いたStep Functionsの構築ブログは坂巻が書いたエントリがあるのでそちらを参考にしてください。

Serverless Frameworkで構築するStep Functions

今回のエントリーはこのStep Functionsの中でGlue Jobを動かしてみたといった内容になります。

今回の検証で使ったServerlessFrameworkとStepfunctionsのプラグインのバージョンは下記になります。

  • serverless : 1.63.0
  • serverless-step-functions : 2.17.1

Serverless Framework(AWS Cloudformation) で AWS Glue Jobを使うには

Serverless FrameworkやプラグインでGlueの対応があるわけではないので、Serverless.yml のresources 内にCloudformationの記法で記載していくことになります。

CloudFormationでGlueのジョブを定義する際には以下のようになります。

Type: Sparkの場合

ConvertParquetJob:
  Type: AWS::Glue::Job
  Properties:
    Role: !GetAtt ETLJobRole.Arn
    Name: convert-parquet
    Command:
      Name: glueetl
      ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/convert_parquet.py"
    ExecutionProperty:
      MaxConcurrentRuns: 50
    MaxRetries: 0

Type: Pythonの場合

PrepareFileJob:
  Type: AWS::Glue::Job
  Properties:
    Role: !GetAtt ETLJobRole.Arn
    Name: prepare-file
    Command:
      Name: pythonshell
      ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/prepare_file.py"
      PythonVersion: "3"
    ExecutionProperty:
      MaxConcurrentRuns: 50
    MaxRetries: 0

ドキュメントとしては、CloudFormationのGlue Jobの箇所そのままになります。

AWS::Glue::Job - AWS CloudFormation

記法として変わっているのはCommnadの箇所のNameでSparkかPythonShellの違いを記載しています。 glueetl と書くとSparkJob、pythonshell と書くとPythonのジョブとなります。

あと、上にも書いたとおりGlue JobはServerless Frameworkのサポートを受けているわけではないので、 ScriptLocationへのファイルの配置はServerless Frameworkのデプロイコマンドとは別にアップロードを行う必要があります。

この記事では、中の実装についての解説は行いません Glue Jobの実装についての解説は石川や大高の以下の記事を参考にしてみてください。

AWS Glue のジョブタイプ『Python Shell』が Python 3.6と互換性のあるスクリプトをサポートしました

GlueのSparkジョブでTSVからParquetへ変換してみた

ServerlessFramework のStepFunctions内でGlue Jobを呼び出す。

StepFunctions内でGlue Jobを呼び出す方法については下記のエントリを参考ください

[アップデート]Step FunctionsでDynamoDB + Glue を試してみた #reinvent

JSONですでに書いているStateMachineがあるのであれば、 JSONをYamlに変換するツールで変換する事が可能です。

上記のブログエントリのGlue Jobを呼び出している箇所をJsonからYamlに変換すると以下のようになります。

---
StartAt: PutItem
States:
  PutItem:
    Type: Task
    Resource: arn:aws:states:::dynamodb:putItem
    Parameters:
      TableName: test_glue_db
      Item:
        id:
          S: '1'
    ResultPath: "$.DynamoDB"
    Next: Glue StartJobRun
  Glue StartJobRun:
    Type: Task
    Resource: arn:aws:states:::glue:startJobRun.sync
    Parameters:
      JobName: test_job
    End: true

今回はDynamoDBに入れるのではなく、Python ShellのジョブからSparkのジョブを呼び出すようにしたいので以下のようになりました。

StartAt: PrepareFile
States:
  PrepareFile:
    Type: Task
    Resource: arn:aws:states:::glue:startJobRun.sync
    Parameters:
      JobName: prepare-file
    Next: Glue StartJobRun
  Glue StartJobRun:
    Type: Task
    Resource: arn:aws:states:::glue:startJobRun.sync
    Parameters:
      JobName: prepare-file
    End: true

StepFunctionsのServerless.ymlとして以下のようになります。

service: serverless-glue-stepfunctions

provider:
  name: aws
  runtime: nodejs12.x
  region: ap-northeast-1

plugins:
  - serverless-step-functions

functions:
  Function1:
    handler: handler.hello

stepFunctions:
  stateMachines:
    serverless-framework-stepfunctions-glue:
      name: sls-stepfunctions-glue
      definition:
        StartAt: PrepareFile
        States:
          PrepareFile:
            Type: Task
            Resource: arn:aws:states:::glue:startJobRun.sync
            Parameters:
              JobName: prepare-file
            Next: Glue StartJobRun
          Glue StartJobRun:
            Type: Task
            Resource: arn:aws:states:::glue:startJobRun.sync
            Parameters:
              JobName: convert-parquet
            End: true

resources:
  Resources:
    ETLJobRole:
      Type: AWS::IAM::Role
      Properties:
        AssumeRolePolicyDocument:
          Version: "2012-10-17"
          Statement:
            - Action:
                - sts:AssumeRole
              Effect: Allow
              Principal:
                Service:
                  - "glue.amazonaws.com"
        Policies:
          - PolicyName: "root"
            PolicyDocument:
              Version: "2012-10-17"
              Statement:
                - Action: "*"
                  Effect: Allow
                  Resource: "*"
    PrepareFileJob:
      Type: AWS::Glue::Job
      Properties:
        Role: !GetAtt ETLJobRole.Arn
        Name: prepare-file
        Command:
          Name: pythonshell
          ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/prepare_file.py"
          PythonVersion: "3"
        ExecutionProperty:
          MaxConcurrentRuns: 50
        MaxRetries: 0
    ConvertParquetJob:
      Type: AWS::Glue::Job
      Properties:
        Role: !GetAtt ETLJobRole.Arn
        Name: convert-parquet
        Command:
          Name: glueetl
          ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/convert_parquet.py"
        ExecutionProperty:
          MaxConcurrentRuns: 50
        MaxRetries: 0

あとは、ServerlessFramework のStepFunctions内でGlue Jobを呼び出す。の箇所で書いたとおり、GlueのJobのScriptはS3に配置する必要がありますので、以下のようなデプロイ用のスクリプトを用いて配置を行うと良いかと思います。

export GlueScriptBucket="YourBucketName"
aws s3 cp ./script/ s3://${GlueScriptBucket}/script/ 
sls deploy

結論

ちょっと変則的案方法ではありますがこのような方法をとることで、ServerlessFramework 上でGlueのジョブの定義とStepfunctions の呼び出しを行うことが可能となります。