S3 Event Trigerから共通State Machine(Step Function)を呼び出す方法を検証する

今回のブログではETLの前処理を想定し、IFごとにS3パスに置かれたデータに対して、特定の共通処理を実行する仕組みを検証しました。

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部のnkhrです。今回のブログでは、ETLの前処理を想定し、IFごとのS3ファイルに対して、共通の処理を実行するための構成(前処理自体は行っていません)を検証します。

具体的には、S3 Event Triger → Step Function(特定ファイル時にGlue呼び出し)→Glue Jobでどのような情報を取得して、前処理に渡せるかを検証します。今回はCloud Trailのイベント(S3:PutObject)をEventBridgeで検知し、EventBridgeからStep Functionを起動します。

EventBridgeやStep FunctionなどAWSサービスの仕様について記載している箇所は、2022/01時点の仕様に基づいて記載しています。

S3 Bucketとサンプルデータ作成

S3バケットは、以下の構成とします。upload_completedファイルはデータのアップロード完了後、最後にアップロードされる完了マークファイルとします。この完了マークファイルのアップロードを検知して、Glue Jobを実行します。

  • if_001/
    • yyyy=2022/mm=01/dd=03/
      • accesslog1.jsonl
      • accesslog2.jsonl
      • upload_completed  ※ファイルアップロード完了マークファイル
    • yyyy=2022/mm=01/dd=04/
      • accesslog1.jsonl
      • upload_completed

accesslogのファイル内容は以下のようなファイルとします。(1ファイルの行数は6行)

EventBridgeへのイベント通知

CloudTrailイベントの通知設定

S3 EventをEvent Bridgeで取得するために、CloudTrailでS3 Eventを記録します。CloudTrailの画面から新しいCloudTrailを作成します。2021/12以降は、S3 Event(Object Level)を直接Event Bridgeで取得できるため、証跡の取得以外の目的でCloudTrailを利用する必要はありません。

CloudTrailでS3PutObjectを取得するため、Data Eventの記録を設定します。Data EventはCloud TrailのEvent Hisotryからは確認できないため、事前確認したい場合は、S3バケットのファイルを直接確認するか、Cloud Watch Logsに出力して確認します。

S3から直接イベント通知する設定(追記)

S3バケットのPropertiesから、Event Bridgeのイベント通知をONにすることで、CloudTrailなしにEventを送信できます。CloudTailでの証跡記録が必要ない場合は、こちらの方法が利用できます。ONに変更後にEventBridge側でイベントが取得できるようになるまでに、5分程度はかかるようです。(公式サイト:Using the S3 console参照

Step Functionのサンプル作成

Step Functionのサンプルを作成します。後程Glue Jobの作成と起動を設定しますが、このタイミングでは何も行わないState Machineを作成します。

State Machine名を「s3event-common」とします。Roleは「Create new role」から自動で作成します。自動作成を選択すると「StepFunctions-s3event-common-role-[uuid]」という名称で作成されます。このRoleには、デフォルトでXRAYに関する以下の4つのPermission(2022/01時点)が設定されています。

  • xray:PutTraceSegments
  • xray:PutTelemetryRecords
  • xray:GetSamplingRules
  • xray:GetSamplingTargets

EventBridgeでRule作成

Amazon EventBridgeの[Create Rule]からS3 PutObjectイベント用のRuleを作成し、イベント発生時にStateMachineを起動します。

Event Pattern

特定S3バケットのPutObjectイベントを検知するEvent Patternを設定します。EventPatternはワイルドカードやSuffix一致が使えないため(2022/01時点)、すべてのPutObjectイベントを検知します。後続のStateMachineで、完了マークファイル「upload_completed」を判別します。

  • CloudTrailからのEventを取得する場合

  • S3から直接EventBridgeにイベントを取得する場合(追記)

  • detail_type: ["Object Created"]には、PutObject以外にも以下のActionイベントが含まれます。
    • PutObject
    • POST Object
    • CopyObject
    • CompleteMultipartUpload

Event Target

イベントターゲットには、作成したState Machineを指定します。Event Target用のRoleを自動作成とした場合、対象のターゲット(今回はState Machine)を起動するためのRoleが自動で作成されます。

その他の設定

その他として、以下の設定が可能です。今回は全てデフォルトにしました。

  • EventBus選択
    • Eventを受け取るバスをカスタム作成した場合は、選択可能。EventBusごとにResource Policyを指定できます。デフォルトでは「default」EventBus(Policy設定なし)があります。
    • 選択したEventBusのRuleを適用するかのEnable/Disableも設定可能です。
  • Retry Policy設定
    • 最大24HまでRetryが可能です。Retryの設定は以下の2つを定義できます。
    • Maximum age of event (HH:mm)
      • 処理できなかったイベントを保存する期間を指定。デフォルトは24H。
    • Retry attempts
      • エラー発生時にリトライする最大回数を指定。デフォルトは185回。
  • Dead-letter queue設定
    • 処理できなかったイベントをSQSに送信するかの設定。下記の3パターンを指定可能。
      • None(デフォルト)
      • 同じアカウントの既存のSQSに送信(ドロップダウンリストからSQSを指定)
      • 他のアカウントのSQSに送信(SQSのARNを指定、Resource base policyが必要)

Step FunctionからGlueジョブ呼び出し設定

ここまでの設定で、対象バケットにファイルをアップロードすると、StateMachineが起動することを確認できます。

次に、StateMachineの設定を変更し、Glueジョブを呼び出します。StateMachineへのInputとして、CloudTrailで取得されたEventのJSONが渡されるため、その一部(下記)をGlue Jobに渡します。

CloudTrailからのEvent JSONと、S3から直接Event Bridgeに送られるJSONファイルは、内容が異なります。

初回の投稿ブログでは、CloudTrailのEvent JSONを利用してStateMachine、Glue Jobを実行していました。現バージョンのブログでは、S3から直接Event Bridgeに送られるJSONファイルを使って、StateMachineとGlue Jobを実行するようにコードを修正しています。

  • S3 to EventBridge Jsonファイルの場合(追加)
    • $.account:AWSアカウント番号
    • $.region:Region
    • $.time : 実行時刻
    • $.detail.bucket.name: BucketName
    • $.detail.object.key: オブジェクトKey名
  • CloudTrail Event Jsonファイルの場合
    • $.account:AWSアカウント番号
    • $.region:Region
    • $.time : 実行時刻
    • $.detail.userIdentity.arn :実行User/RoleのARN(S3からの直接Event送信ではUser/Roleは取得不可)
    • $.detail.resources[0].type:Type
    • $.detail.resources[0].ARN:アップデートKeyのARN(このARNにBucket名とオブジェクトKey名が含まれる)

Glue Jobの作成

Glue Jobでは以下を作成します。

  • [事前準備] Glue JobからS3やCloudWatch LogsにアクセスできるRoleを作成(※通常は以下のPolicyの中で必要なリソースとActionに絞る)
    • AmazonS3FullAccess
    • AWSGlueServiceRole
  • Glue Jobを作成
    • Job Name: test_job (Step Functionから起動するために利用)
    • Max Concurency:3
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME",
    "accountid",
    "region",
    "bucketname",
    "object_key"])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # print arguments
    print(f"accountid: {args['accountid']}")
    print(f"region: {args['region']}")
    print(f"bucketname: {args['bucketname']}")
    print(f"object_key: {args['object_key']}")
    
    bucket = args['bucketname']
    target_dir = args['object_key'].replace("upload_completed","")
    
    # upload_completedのファイルのパスにあるすべてのファイルを取得
    # upload_completedのみ除く
    df = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={
    "paths": [f"s3://{bucket}/{target_dir}"],
    'exclusions': f"[\"s3://{bucket}/{target_dir}/upload_completed\"]",
    "recurse": True
    }
    )
    
    # ファイル行数を表示
    print('Count: {0}'.format(df.count()))
    # ファイル内のスキーマを表示
    df.printSchema()
    job.commit()

Step FunctionからGlue Jobの起動設定

  • Glue Job実行Policyの追加
    • 最初に作成したStep FunctionのRole「StepFunctions-s3event-common-role-[uuid]」に以下のPolicyを追加

  • Passにしていた部分をGlue Jobの起動に置き換え

{
  "Comment": "A description of my state machine",
  "StartAt": "Choice",
  "States": {
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.detail.object.key",
          "StringMatches": "*/upload_completed",
          "Next": "Glue StartJobRun"
        }
      ],
      "Default": "Pass"
    },
    "Pass": {
      "Type": "Pass",
      "End": true
    },
    "Glue StartJobRun": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "test_job",
        "Arguments": {
          "--accountid.$": "$.account",
          "--region.$": "$.region",
          "--time.$": "$.time",
          "--bucketname.$": "$.detail.bucket.name",
          "--object_key.$": "$.detail.object.key"
        }
      },
      "End": true,
      "ResultPath": "$.result"
    }
  }
}

今回の実行では、Standardワークフロー(状態遷移数で課金)で実行しましたが、この構成の場合はファイル数分の遷移が発生するため、Expressワークフロー(リクエスト数とメモリ利用時間)の方が適しているかもしれません。

動作確認

実行手順

以下の手順で実行し、GlueからCloudWatch Logsに出力されたログを確認する。

  1. 以下の2つのフォルダをバケット配下に作成
    • if_001/yyyy=2022/mm=01/dd=03/
    • if_001/yyyy=2022/mm=01/dd=04/

  2. 以下の2ファイルをアップロード

    • if_001/yyyy=2022/mm=01/dd=03/accesslog1.jsonl
    • if_001/yyyy=2022/mm=01/dd=03/accesslog2.jsonl

  3. 完了マークファイルをアップロード

    • if_001/yyyy=2022/mm=01/dd=03/upload_completed

  4. 以下の1ファイルをアップロード

    • if_001/yyyy=2022/mm=01/dd=04/accesslog1.jsonl
  5. 完了マークファイルをアップロード
    • if_001/yyyy=2022/mm=01/dd=04/upload_completed

完了確認

Glue jobが2回実行され、それぞれの実行ログの結果として取得した行数が1回目のジョブは12行、2回目のジョブは6行となっていることを確認する。

<Glue Job1結果>

<Glue Job2結果>

まとめ

今回は、S3の完了マークファイルのアップロードを検知し、State Machine⇒Glueジョブ呼び出す仕組みを検証しました。

イベント駆動の仕組みは、一度作成すれば、同じような処理が発生した場合に自動処理できるため、上手く仕組み化できれば有用だと思います。ただし、実際にイベント駆動でデータ加工を行う場合は、イベントのロストや再送などに対して、ジョブ実行の冪等性担保や、イベントが実行されていない場合の検知の仕組み(どこでチェックするかはケースバイケース)が必要になります。

以上、nkhrでした。