Step FunctionsとPandasを使ってサーバーレスETL入門

Pandasたのしい
2021.03.23

こんにちは、クラスメソッドの岡です。

今回Step Functionsを使って簡単なETL処理を試す機会があったので実際に作ったものを公開します。

サーバーレスでETL処理、といえばAWS Glueが浮かぶかと思いますが、今回はGlueは使わず、LambdaのPythonランタイムでPandasを使ってS3のデータとDynamoDBのデータを結合するような処理を行ってみたいと思います。

ちなみに私はデータ分析に関する知識はほぼ皆無ですが、PythonライブラリPandasを使う事で簡単にデータ処理を行えました。

シナリオ

今回はIoTデバイスから送られてくる時系列データがS3に出力されている前提として、そのファイルとDynamoDBにあるデバイスのマスタデータと結合して分析データとして別のS3バケットに出力する、といったシナリオを想定しています。

構成

サンプルコード

今回はServerless Frameworkを使ってデプロイします。

コードはここのリポジトリにまとめてあります。

環境

  • Serverless Framework
  • Pipenv
  • Pyenv
  • Pandas

S3バケットとDynamoDBテーブルを作成

今回は2つのスタックに分割します。

  • S3 & DynamoDB
  • Lambda & Step Functions
service: serverless-etl-sample-resources

provider:
  name: aws
  region: ap-northeast-1
  stage: ${opt:stage, self:custom.defaultStage}
  stackName: ${self:custom.appName}-resources
custom:
  appName: serverless-etl-sample
  defaultStage: dev

resources:
  Resources:
    DeviceTable:
      Type: "AWS::DynamoDB::Table"
      Properties:
        TableName: Devices
        AttributeDefinitions:
          - AttributeName: user_id
            AttributeType: S
          - AttributeName: device_id
            AttributeType: S
        KeySchema:
          - AttributeName: user_id
            KeyType: HASH
          - AttributeName: device_id
            KeyType: RANGE
        BillingMode: PAY_PER_REQUEST
    DevicesRawDataBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}
    DevicesDataAnalyticsBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}
  Outputs:
    DevicesRawDataBucketName:
      Description: S3 Bucket for devices raw data
      Value: !Ref DevicesRawDataBucket
    DevicesDataAnalyticsBucketName:
      Description: S3 Bucket for analytics data
      Value: !Ref DevicesDataAnalyticsBucket
  • DynamoDBテーブル: Devices
  • S3バケット(デバイスの時系列データ): devices-raw-data-${AWS::AccountId}-${AWS::Region}
  • S3バケット(分析用データ): devices-data-analytics-${AWS::AccountId}-${AWS::Region}

slsコマンドでデプロイします。

$ sls deploy --config serverless-resources.yml

Lambda & Step Functions を作成

service: serverless-etl-sample

useDotenv: true

plugins:
  - serverless-python-requirements
  - serverless-pseudo-parameters
  - serverless-step-functions
provider:
  name: aws
  region: ap-northeast-1
  stage: ${opt:stage, self:custom.defaultStage}
  runtime: python3.8
  stackName: ${self:custom.appName}
  apiName: ${self:custom.appName}
  lambdaHashingVersion: 20201221
  deploymentBucket:
    name: ${cf:${self:custom.appName}-resources.ServerlessDeploymentBucketName}
  iam:
    role:
      statement:
        - Effect: Allow
          Action:
            - dynamodb:PutItem
            - dynamodb:DeleteItem
            - dynamodb:GetItem
            - dynamodb:Scan
            - dynamodb:Query
            - dynamodb:UpdateItem
          Resource:
            - arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/Devices
        - Effect: Allow
          Action:
            - s3:GetObject
            - s3:headObject
            - s3:ListBucket
            - s3:PutObject
          Resource:
            - arn:aws:s3:::devices-raw-data-#{AWS::AccountId}-#{AWS::Region}
            - arn:aws:s3:::devices-raw-data-#{AWS::AccountId}-#{AWS::Region}/*
            - arn:aws:s3:::devices-data-analytics-#{AWS::AccountId}-#{AWS::Region}
            - arn:aws:s3:::devices-data-analytics-#{AWS::AccountId}-#{AWS::Region}/*
  environment:
    LOG_LEVEL: DEBUG
    POWERTOOLS_SERVICE_NAME: ${self:custom.appName}
    DEVICES_TABLE_NAME: Devices
    DEVICES_RAW_DATA_BUCKET_NAME: ${cf:${self:custom.appName}-resources.DevicesRawDataBucketName}
    DEVICES_DATA_ANALYTICS_BUCKET_NAME: ${cf:${self:custom.appName}-resources.DevicesDataAnalyticsBucketName}

custom:
  appName: serverless-etl-sample
  defaultStage: dev
  pythonRequirements:
    dockerizePip: true
    slim: true
    usePipenv: true
    layer: true

package:
  individually: true
  exclude:
    - .git/**
    - .venv/**
    - tests/**
    - README.md
    - pyrightconfig.json
    - package**
    - scripts/**
    - sample/**
    - Pipfile**
    - node_modules/**

functions:
  LoadFile:
    name: load_file
    handler: src/handlers/load_file.handler
    description: "ファイルロード"
    layers:
      - !Ref PythonRequirementsLambdaLayer
  DataJoin:
    name: data_join
    handler: src/handlers/data_join.handler
    description: "データ結合"
    layers:
      - !Ref PythonRequirementsLambdaLayer

stepFunctions:
  stateMachines:
    BatchStateMachine:
      name: BatchStateMachine
      definition:
        Comment: Load the raw data file
        StartAt: LoadFile
        States:
          LoadFile:
            Type: Task
            Resource: !GetAtt LoadFile.Arn
            Next: FileExists
          FileExists:
            Type: Choice
            Choices:
              - Variable: "$.file_exist"
                BooleanEquals: true
                Next: DataJoin
              - Variable: "$.file_exist"
                BooleanEquals: false
                Next: NoFile
            Default: DataJoin
          NoFile:
            Type: Pass
            End: true
          DataJoin:
            Type: Task
            Resource: !GetAtt DataJoin.Arn
            End: true

Step Functionsの定義

Step Functionsの定義にはslsのプラグイン Serverless Step Functions を利用しています。

各プロパティについてはプラグインのページを参照してください。

実際にデプロイされたフローは以下です。

LoadFile(Task) でS3にアクセスしてファイルの有無を確認、ファイルがあれば結合処理の DataJoin(Task) に、なければ NoFile(Pass) で処理を終了する、という流れです。 本来であればエラーが起きた際に通知処理を起動したり、エラーハンドリングのフローも含むかと思いますが、ここでは結合処理がメインのため省略して極力シンプルなフローにしています。

依存ライブラリをLayersでデプロイ

こちらもslsのプラグイン Serverless Python Requirements を使って依存ライブラリをLayersとしてデプロイしています。ライブラリ管理はPipenvを使っているので以下のように定義しています。

custom:
  pythonRequirements:
    usePipenv: true
    layer: true

Pandasのデプロイに注意

今回はデータの結合処理にPandasを使っていますが、Linux以外の環境でパッケージングするとLambda実行時に以下のようなImport Errorが発生します。

Original error was: No module named 'numpy.core._multiarray_umath'

serverless-python-requirementsのdockerizePipを有効化すると、LambdaのDockerイメージでビルド処理を行ってくれて上記のエラーを回避できます。 ついでにslimオプションを有効化してサイズを削減します。

custom:
  pythonRequirements:
    dockerizePip: true
    slim: true

【Lambda】 load_fileのコード

load_fileの中ではS3オブジェクトが存在するかどうか、のみを確認しています。オブジェクトがなければ、{'file_exist': False}を返却。オブジェクトがあれば後続のLambdaにバケット名・オブジェクトキーを渡します。

import os
import boto3
import botocore
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
DEVICES_RAW_DATA_BUCKET_NAME = os.getenv(
    'DEVICES_RAW_DATA_BUCKET_NAME')


@logger.inject_lambda_context(log_event=True)
def handler(event, context: LambdaContext):
    logger.info(event)

    s3 = boto3.resource('s3')
    key = 'devices-raw-data.json'

    # ファイルがあるか確認
    try:
        logger.debug(DEVICES_RAW_DATA_BUCKET_NAME)
        s3.Object(DEVICES_RAW_DATA_BUCKET_NAME, key).load()
    except botocore.exceptions.ClientError as e:
        logger.warn(e)
        logger.info('File Not Exist.')
        
        return {'file_exist': False'}

    logger.info('File Exist.')
    
    return {
        'file_exist': True,
        'bucket_name': DEVICES_RAW_DATA_BUCKET_NAME,
        's3_object_key': key}

Loggerの処理でaws_lambda_powertoolsを利用していますが、必須ではありません。 aws_lambda_powertoolsの使い方については以下の記事を参考にしてください。

【Lambda】 data_join

import json
import os
from datetime import datetime
import boto3
import botocore
import pandas as pd
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
DEVICES_DATA_ANALYTICS_BUCKET_NAME = os.getenv(
    'DEVICES_DATA_ANALYTICS_BUCKET_NAME')


@logger.inject_lambda_context(log_event=True)
def handler(event, context: LambdaContext):
    logger.info(event)

    # S3からデータを取得
    s3 = boto3.resource('s3')
    obj = s3.Bucket(event['bucket_name']).Object(event['s3_object_key'])
    response = obj.get()
    body = response['Body'].read().decode('utf-8')
    devices_raw_data = json.loads(body)['data']
    logger.debug(devices_raw_data)

    # Dynamoからデータ取得
    dynamodb = boto3.resource('dynamodb')
    TABLE_NAME = os.getenv('DEVICES_TABLE_NAME')
    table = dynamodb.Table(TABLE_NAME)
    response = table.scan()
    devices = response.get('Items', [])
    logger.debug(devices)

    # Pandasでデータ結合
    df_devices_raw_data = pd.DataFrame(devices_raw_data)
    df_devices = pd.DataFrame(devices)
    devices_analysis_data = df_devices_raw_data.merge(df_devices)
    logger.debug(devices_analysis_data)

    # S3出力
    csv_data = devices_analysis_data.to_csv(encoding='utf-8', index=None)

    today = datetime.now().strftime('%Y%m%d')
    key = f'devices_operation_history_data/{today}.csv'
    obj = s3.Bucket(DEVICES_DATA_ANALYTICS_BUCKET_NAME).Object(key)
    obj.put(Body=csv_data)
    return {'result': 'Success.'}

ちょっと詰め込んだ感がありますが、S3とDynamoDBからそれぞれデータ取得、Pandasで結合処理、分析用バケットにCSV出力まで実行します。

Pandasでデータ結合

デバイスの時系列データに対してデバイスのマスタデータを左結合しています。 処理結果は以下のようになります。Pandas便利ですね。

>>> df_devices_raw_data = pd.DataFrame(devices_raw_data)
>>> df_devices = pd.DataFrame(devices)
>>> df_devices_raw_data
   device_id  timestamp     power
0  device_A  1616506124    on
1  device_B  1616506165    on
2  device_A  1616506185   off
3  device_C  1616506197   off

>>> df_devices
   device_id  user_id    type
0  device_A  user_0001    TV
1  device_B  user_0001  エアコン
2  device_C  user_0002    TV

>>> df_devices_raw_data.merge(df_devices)
   device_id  timestamp  power   user_id   type
0  device_A  1616506124    on  user_0001    TV
1  device_A  1616506185   off  user_0001    TV
2  device_B  1616506165    on  user_0001  エアコン
3  device_C  1616506197   off  user_0002    TV

結合結果をCSVで出力

CSVデータへの変換もPandasで実行しています。

csv_data = devices_analysis_data.to_csv(encoding='utf-8', index=None)

DataFrameのオブジェクトに対して、to_csv()を呼ぶだけでCSVデータへ変換できます。

S3への出力はboto3のResource APIを使っています。

    obj = s3.Bucket(DEVICES_DATA_ANALYTICS_BUCKET_NAME).Object(key)
    obj.put(Body=csv_data)

疎通確認

サンプルデータの投入

上記のSLSテンプレートのデプロイ後、AWS CLIが設定されている状態でサンプルコードのディレクトリで以下のスクリプトを実行してください。

$ python scripts/set_sample_data.py

Devicesテーブルにデバイスデータが投入されます。

次に以下の時系列サンプルデータをS3バケットにアップロードします。

{
    "data": [
        {"device_id": "device_u33nk7yy", "timestamp": "1616506124", "power": "on"},
        {"device_id": "device_m6mww3ss", "timestamp": "1616506165", "power": "on"},
        {"device_id": "device_u33nk7yy", "timestamp": "1616506185", "power": "off"},
        {"device_id": "device_sd7ubc8s", "timestamp": "1616506197", "power": "off"}
    ]
}
$ aws s3 cp sample/devices-raw-data.json s3://devices-raw-data-999999999999-ap-northeast-1/

Step Functionsを実行

コンソールからでもOKですが、SLSのプラグインから実行する事も可能です。

$ sls invoke stepf --name BatchStateMachine

{
  executionArn: 'arn:aws:states:ap-northeast-1:999999999999:execution:BatchStateMachine:83b464d9-e4ce-4172-8834-bc8340f5dc1e',
  stateMachineArn: 'arn:aws:states:ap-northeast-1:999999999999:stateMachine:BatchStateMachine',
  name: '83b464d9-e4ce-4172-8834-bc8340f5dc1e',
  status: 'SUCCEEDED',
  startDate: 2021-03-23T14:55:48.505Z,
  stopDate: 2021-03-23T14:55:53.472Z,
  input: '{}',
  inputDetails: { included: true },
  output: '{"result": "Success."}',
  outputDetails: { included: true }
}

出力されたCSVファイルを確認

実際に出力されたCSVは以下のようになっていました。時系列データにデバイスのユーザー情報などが含められていたので想定通りです。

device_id,timestamp,power,user_id,type
device_u33nk7yy,1616506124,on,user_0001,TV
device_u33nk7yy,1616506185,off,user_0001,TV
device_m6mww3ss,1616506165,on,user_0001,エアコン
device_sd7ubc8s,1616506197,off,user_0002,TV