この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、クラスメソッドの岡です。
今回Step Functionsを使って簡単なETL処理を試す機会があったので実際に作ったものを公開します。
サーバーレスでETL処理、といえばAWS Glueが浮かぶかと思いますが、今回はGlueは使わず、LambdaのPythonランタイムでPandasを使ってS3のデータとDynamoDBのデータを結合するような処理を行ってみたいと思います。
ちなみに私はデータ分析に関する知識はほぼ皆無ですが、PythonライブラリPandasを使う事で簡単にデータ処理を行えました。
シナリオ
今回はIoTデバイスから送られてくる時系列データがS3に出力されている前提として、そのファイルとDynamoDBにあるデバイスのマスタデータと結合して分析データとして別のS3バケットに出力する、といったシナリオを想定しています。
構成
サンプルコード
今回はServerless Frameworkを使ってデプロイします。
コードはここのリポジトリにまとめてあります。
環境
- Serverless Framework
- Pipenv
- Pyenv
- Pandas
S3バケットとDynamoDBテーブルを作成
今回は2つのスタックに分割します。
- S3 & DynamoDB
- Lambda & Step Functions
service: serverless-etl-sample-resources
provider:
name: aws
region: ap-northeast-1
stage: ${opt:stage, self:custom.defaultStage}
stackName: ${self:custom.appName}-resources
custom:
appName: serverless-etl-sample
defaultStage: dev
resources:
Resources:
DeviceTable:
Type: "AWS::DynamoDB::Table"
Properties:
TableName: Devices
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: device_id
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: device_id
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
DevicesRawDataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}
Outputs:
DevicesRawDataBucketName:
Description: S3 Bucket for devices raw data
Value: !Ref DevicesRawDataBucket
DevicesDataAnalyticsBucketName:
Description: S3 Bucket for analytics data
Value: !Ref DevicesDataAnalyticsBucket
- DynamoDBテーブル: Devices
- S3バケット(デバイスの時系列データ): devices-raw-data-${AWS::AccountId}-${AWS::Region}
- S3バケット(分析用データ): devices-data-analytics-${AWS::AccountId}-${AWS::Region}
slsコマンドでデプロイします。
$ sls deploy --config serverless-resources.yml
Lambda & Step Functions を作成
service: serverless-etl-sample
useDotenv: true
plugins:
- serverless-python-requirements
- serverless-pseudo-parameters
- serverless-step-functions
provider:
name: aws
region: ap-northeast-1
stage: ${opt:stage, self:custom.defaultStage}
runtime: python3.8
stackName: ${self:custom.appName}
apiName: ${self:custom.appName}
lambdaHashingVersion: 20201221
deploymentBucket:
name: ${cf:${self:custom.appName}-resources.ServerlessDeploymentBucketName}
iam:
role:
statement:
- Effect: Allow
Action:
- dynamodb:PutItem
- dynamodb:DeleteItem
- dynamodb:GetItem
- dynamodb:Scan
- dynamodb:Query
- dynamodb:UpdateItem
Resource:
- arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/Devices
- Effect: Allow
Action:
- s3:GetObject
- s3:headObject
- s3:ListBucket
- s3:PutObject
Resource:
- arn:aws:s3:::devices-raw-data-#{AWS::AccountId}-#{AWS::Region}
- arn:aws:s3:::devices-raw-data-#{AWS::AccountId}-#{AWS::Region}/*
- arn:aws:s3:::devices-data-analytics-#{AWS::AccountId}-#{AWS::Region}
- arn:aws:s3:::devices-data-analytics-#{AWS::AccountId}-#{AWS::Region}/*
environment:
LOG_LEVEL: DEBUG
POWERTOOLS_SERVICE_NAME: ${self:custom.appName}
DEVICES_TABLE_NAME: Devices
DEVICES_RAW_DATA_BUCKET_NAME: ${cf:${self:custom.appName}-resources.DevicesRawDataBucketName}
DEVICES_DATA_ANALYTICS_BUCKET_NAME: ${cf:${self:custom.appName}-resources.DevicesDataAnalyticsBucketName}
custom:
appName: serverless-etl-sample
defaultStage: dev
pythonRequirements:
dockerizePip: true
slim: true
usePipenv: true
layer: true
package:
individually: true
exclude:
- .git/**
- .venv/**
- tests/**
- README.md
- pyrightconfig.json
- package**
- scripts/**
- sample/**
- Pipfile**
- node_modules/**
functions:
LoadFile:
name: load_file
handler: src/handlers/load_file.handler
description: "ファイルロード"
layers:
- !Ref PythonRequirementsLambdaLayer
DataJoin:
name: data_join
handler: src/handlers/data_join.handler
description: "データ結合"
layers:
- !Ref PythonRequirementsLambdaLayer
stepFunctions:
stateMachines:
BatchStateMachine:
name: BatchStateMachine
definition:
Comment: Load the raw data file
StartAt: LoadFile
States:
LoadFile:
Type: Task
Resource: !GetAtt LoadFile.Arn
Next: FileExists
FileExists:
Type: Choice
Choices:
- Variable: "$.file_exist"
BooleanEquals: true
Next: DataJoin
- Variable: "$.file_exist"
BooleanEquals: false
Next: NoFile
Default: DataJoin
NoFile:
Type: Pass
End: true
DataJoin:
Type: Task
Resource: !GetAtt DataJoin.Arn
End: true
Step Functionsの定義
Step Functionsの定義にはslsのプラグイン Serverless Step Functions を利用しています。
各プロパティについてはプラグインのページを参照してください。
実際にデプロイされたフローは以下です。
LoadFile(Task) でS3にアクセスしてファイルの有無を確認、ファイルがあれば結合処理の DataJoin(Task) に、なければ NoFile(Pass) で処理を終了する、という流れです。 本来であればエラーが起きた際に通知処理を起動したり、エラーハンドリングのフローも含むかと思いますが、ここでは結合処理がメインのため省略して極力シンプルなフローにしています。
依存ライブラリをLayersでデプロイ
こちらもslsのプラグイン Serverless Python Requirements を使って依存ライブラリをLayersとしてデプロイしています。ライブラリ管理はPipenvを使っているので以下のように定義しています。
custom:
pythonRequirements:
usePipenv: true
layer: true
Pandasのデプロイに注意
今回はデータの結合処理にPandasを使っていますが、Linux以外の環境でパッケージングするとLambda実行時に以下のようなImport Errorが発生します。
Original error was: No module named 'numpy.core._multiarray_umath'
serverless-python-requirementsのdockerizePipを有効化すると、LambdaのDockerイメージでビルド処理を行ってくれて上記のエラーを回避できます。 ついでにslimオプションを有効化してサイズを削減します。
custom:
pythonRequirements:
dockerizePip: true
slim: true
【Lambda】 load_fileのコード
load_fileの中ではS3オブジェクトが存在するかどうか、のみを確認しています。オブジェクトがなければ、{'file_exist': False}を返却。オブジェクトがあれば後続のLambdaにバケット名・オブジェクトキーを渡します。
import os
import boto3
import botocore
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
DEVICES_RAW_DATA_BUCKET_NAME = os.getenv(
'DEVICES_RAW_DATA_BUCKET_NAME')
@logger.inject_lambda_context(log_event=True)
def handler(event, context: LambdaContext):
logger.info(event)
s3 = boto3.resource('s3')
key = 'devices-raw-data.json'
# ファイルがあるか確認
try:
logger.debug(DEVICES_RAW_DATA_BUCKET_NAME)
s3.Object(DEVICES_RAW_DATA_BUCKET_NAME, key).load()
except botocore.exceptions.ClientError as e:
logger.warn(e)
logger.info('File Not Exist.')
return {'file_exist': False'}
logger.info('File Exist.')
return {
'file_exist': True,
'bucket_name': DEVICES_RAW_DATA_BUCKET_NAME,
's3_object_key': key}
Loggerの処理でaws_lambda_powertoolsを利用していますが、必須ではありません。 aws_lambda_powertoolsの使い方については以下の記事を参考にしてください。
【Lambda】 data_join
import json
import os
from datetime import datetime
import boto3
import botocore
import pandas as pd
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
DEVICES_DATA_ANALYTICS_BUCKET_NAME = os.getenv(
'DEVICES_DATA_ANALYTICS_BUCKET_NAME')
@logger.inject_lambda_context(log_event=True)
def handler(event, context: LambdaContext):
logger.info(event)
# S3からデータを取得
s3 = boto3.resource('s3')
obj = s3.Bucket(event['bucket_name']).Object(event['s3_object_key'])
response = obj.get()
body = response['Body'].read().decode('utf-8')
devices_raw_data = json.loads(body)['data']
logger.debug(devices_raw_data)
# Dynamoからデータ取得
dynamodb = boto3.resource('dynamodb')
TABLE_NAME = os.getenv('DEVICES_TABLE_NAME')
table = dynamodb.Table(TABLE_NAME)
response = table.scan()
devices = response.get('Items', [])
logger.debug(devices)
# Pandasでデータ結合
df_devices_raw_data = pd.DataFrame(devices_raw_data)
df_devices = pd.DataFrame(devices)
devices_analysis_data = df_devices_raw_data.merge(df_devices)
logger.debug(devices_analysis_data)
# S3出力
csv_data = devices_analysis_data.to_csv(encoding='utf-8', index=None)
today = datetime.now().strftime('%Y%m%d')
key = f'devices_operation_history_data/{today}.csv'
obj = s3.Bucket(DEVICES_DATA_ANALYTICS_BUCKET_NAME).Object(key)
obj.put(Body=csv_data)
return {'result': 'Success.'}
ちょっと詰め込んだ感がありますが、S3とDynamoDBからそれぞれデータ取得、Pandasで結合処理、分析用バケットにCSV出力まで実行します。
Pandasでデータ結合
デバイスの時系列データに対してデバイスのマスタデータを左結合しています。 処理結果は以下のようになります。Pandas便利ですね。
>>> df_devices_raw_data = pd.DataFrame(devices_raw_data)
>>> df_devices = pd.DataFrame(devices)
>>> df_devices_raw_data
device_id timestamp power
0 device_A 1616506124 on
1 device_B 1616506165 on
2 device_A 1616506185 off
3 device_C 1616506197 off
>>> df_devices
device_id user_id type
0 device_A user_0001 TV
1 device_B user_0001 エアコン
2 device_C user_0002 TV
>>> df_devices_raw_data.merge(df_devices)
device_id timestamp power user_id type
0 device_A 1616506124 on user_0001 TV
1 device_A 1616506185 off user_0001 TV
2 device_B 1616506165 on user_0001 エアコン
3 device_C 1616506197 off user_0002 TV
結合結果をCSVで出力
CSVデータへの変換もPandasで実行しています。
csv_data = devices_analysis_data.to_csv(encoding='utf-8', index=None)
DataFrameのオブジェクトに対して、to_csv()を呼ぶだけでCSVデータへ変換できます。
S3への出力はboto3のResource APIを使っています。
obj = s3.Bucket(DEVICES_DATA_ANALYTICS_BUCKET_NAME).Object(key)
obj.put(Body=csv_data)
疎通確認
サンプルデータの投入
上記のSLSテンプレートのデプロイ後、AWS CLIが設定されている状態でサンプルコードのディレクトリで以下のスクリプトを実行してください。
$ python scripts/set_sample_data.py
Devicesテーブルにデバイスデータが投入されます。
次に以下の時系列サンプルデータをS3バケットにアップロードします。
{
"data": [
{"device_id": "device_u33nk7yy", "timestamp": "1616506124", "power": "on"},
{"device_id": "device_m6mww3ss", "timestamp": "1616506165", "power": "on"},
{"device_id": "device_u33nk7yy", "timestamp": "1616506185", "power": "off"},
{"device_id": "device_sd7ubc8s", "timestamp": "1616506197", "power": "off"}
]
}
$ aws s3 cp sample/devices-raw-data.json s3://devices-raw-data-999999999999-ap-northeast-1/
Step Functionsを実行
コンソールからでもOKですが、SLSのプラグインから実行する事も可能です。
$ sls invoke stepf --name BatchStateMachine
{
executionArn: 'arn:aws:states:ap-northeast-1:999999999999:execution:BatchStateMachine:83b464d9-e4ce-4172-8834-bc8340f5dc1e',
stateMachineArn: 'arn:aws:states:ap-northeast-1:999999999999:stateMachine:BatchStateMachine',
name: '83b464d9-e4ce-4172-8834-bc8340f5dc1e',
status: 'SUCCEEDED',
startDate: 2021-03-23T14:55:48.505Z,
stopDate: 2021-03-23T14:55:53.472Z,
input: '{}',
inputDetails: { included: true },
output: '{"result": "Success."}',
outputDetails: { included: true }
}
出力されたCSVファイルを確認
実際に出力されたCSVは以下のようになっていました。時系列データにデバイスのユーザー情報などが含められていたので想定通りです。
device_id,timestamp,power,user_id,type
device_u33nk7yy,1616506124,on,user_0001,TV
device_u33nk7yy,1616506185,off,user_0001,TV
device_m6mww3ss,1616506165,on,user_0001,エアコン
device_sd7ubc8s,1616506197,off,user_0002,TV