Step Functionsを利用してS3からRedshiftへデータをロードしてみた

2022.10.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、八木です。

最近のデータ分析基盤は、データソースのデータをそのまま保存するデータレイクと、データレイクのデータを加工して分析を行うDWH(データウェアハウス)を利用することが多いです。
このような構成の場合、データの取得から分析までの大まかな流れは以下のようになります。

  1. データソースからデータレイクへデータを取得
  2. データレイクからDWHへデータをロード
  3. DWHで分析

AWSの場合、データソースはRDSやDynamoDBなどのデータベース、データレイクはS3、DWHはRedshiftといったサービスがあります。
また、上記のようなワークフローの管理には Step Functions や Amazon Managed Workflows for Apache Airflow(MWAA)を利用します。

今回の記事では、Step Functions(ワークフロー管理)を利用して、上記フローの「2」の部分である、S3(データレイク)からRedshift(DWH)へデータをロードする方法をご紹介します。

S3からRedshiftへデータをロードする方法

まず、Redshiftへのデータ取り込み方法を決めます。

S3からRedshiftへデータをロードする方法は多数あり、一番シンプルなものはRedshiftのCOPYコマンドです。このコマンドは、S3に保存された構造化データをRedshiftのテーブルにそのままロードします。以下のように、S3のオブジェクトパスと、RedshiftにアタッチしたIAMロールを指定することで、簡単に実行できます。

COPY table_name FROM 's3://<your-bucket-name>/some_file.csv' 
credentials 'aws_iam_role=arn:aws:iam::<aws-account-id>:role/<role-name>'
FORMAT CSV;

S3のデータをSQLで加工してから取り込みたい場合はRedshift Spectrumを利用します。Redshift Spectrumを利用するには、S3をデータソースとした外部テーブルを作成し、クエリを実行します。
作成した外部テーブルでSELECTした結果をRedshiftのデーブルにINSERTすることで、S3のデータをSQLで整形してRedshiftに取り込むことができます。

INSERT INTO redshift_table (SELECT * FROM s3_external_schema.source_table);

その他、Redshift以外のサービスを使うアプローチでは、Glueジョブを利用したETLでデータを書き込むことができます。

今回はシンプルなCOPYコマンドを使用して、データを取り込んでいきます。

Step FunctionsからRedshiftにクエリを実行する方法

通常、Redshiftでクエリを実行する場合は、一般的なDBと同様にコネクションを貼る必要があります。
しかしStep Functionsからは直接Redshiftにコネクションを貼ることはできません。そこで役立つのがRedshift Data APIです。

Redshift Data APIは、HTTPエンドポイントを通して、クエリを実行する機能です。Redshiftのコネクションは必要ありません。また、エンドポイントはAWSのエンドポイントであるため、VPC外からクエリの実行が可能です。

今回はこのData APIを使用して、Step Functionsからクエリを実行します。


引用:ETL orchestration using the Amazon Redshift Data API and AWS Step Functions with AWS SDK integration

やってみた

では、実際にワークフローを作ってみます。

今回作成する主なリソースは以下です。

  • S3バケット(データレイク)
  • Redshift Serverless(DWH)
  • Step Functions(ワークフロー管理)

DWHにはRedshift Serverlessを利用しますが、プロビジョンドのRedshiftクラスターを使用することもできます。その場合にはIAMの権限が一部変わってくるため、ご注意ください。

S3バケットの作成

まず、データレイクとなるS3バケットを作成します。 リージョンはどこでも良いですが、データ転送料金及びレイテンシを小さくするため、この後作成するRedshiftと同じリージョンにすると良いでしょう。
他はデフォルト設定のままにします。

作成したバケットに、サンプルデータをアップロードしておきます。

users.csv

1, yamada, 35
2, mori, 26
3, tanaka, 67

Redshift Serverlessの作成

続いてDWHとなるRedshift Serverlessを作成します。
こちらは以下の記事で紹介されているCloudFormationを利用して作成します。

AWSTemplateFormatVersion: 2010-09-09
Parameters:
  WorkgroupName: 
    Type: String
    Default: default-wg
  BaseCapacity: 
    Type: Number
    Default: 32
  EnhancedVpcRouting: 
    Type: String
    Default: false
  NamespaceName: 
    Type: String
    Default: default-ns
  PubliclyAccessible: 
    Type: String
    Default: false
  SecurityGroupIds: 
    Type: CommaDelimitedList
    Default: sg-abcd1234
  SubnetIds: 
    Type: CommaDelimitedList
    Default: subnet-aaaa1234,subnet-bbbb1234,subnet-cccc1234
  NamespaceName: 
    Type: String
    Default: default-ns
  AdminUsername:
    Type: String
    Default: awsuser
  AdminUserPassword:
    Type: String
    MinLength: 8
    MaxLength: 41
    AllowedPattern: "[a-zA-Z0-9]*"
    ConstraintDescription: must contain only alphanumeric characters.
  DbName: 
    Type: String
    Default: dev
  KmsKeyId: 
    Type: String
    Default: 11111111-2222-3333-4444-555555555555

Resources:
  RedshiftServerlessWorkGroup:
    Type: AWS::RedshiftServerless::Workgroup
    Properties: 
      WorkgroupName: !Ref WorkgroupName
      BaseCapacity: !Ref BaseCapacity
      EnhancedVpcRouting: !Ref EnhancedVpcRouting
      NamespaceName: !Ref RedshiftServerlessNamespace
      PubliclyAccessible: !Ref PubliclyAccessible
      SecurityGroupIds: !Ref SecurityGroupIds
      SubnetIds: !Ref SubnetIds
      ConfigParameters: 
        - ParameterKey: "search_path"
          ParameterValue: "$user"
  RedshiftServerlessNamespace:
    Type: AWS::RedshiftServerless::Namespace
    Properties: 
      NamespaceName: !Ref NamespaceName
      AdminUsername: !Ref AdminUsername
      AdminUserPassword: !Ref AdminUserPassword
      KmsKeyId: !Ref KmsKeyId
      DbName: !Ref DbName
      IamRoles: 
      - Fn::GetAtt:
        - RedshiftServerlessRole
        - Arn

  RedshiftServerlessRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "AmazonRedshiftServerlessRole-${NamespaceName}"
      Path: "/"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - redshift-serverless.amazonaws.com
                - redshift.amazonaws.com
                - sagemaker.amazonaws.com
            Action: "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonRedshiftAllCommandsFullAccess
        - !Ref AmazonRedshiftCommandsAccessPolicy
  AmazonRedshiftCommandsAccessPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      ManagedPolicyName: !Sub "AmazonRedshiftCommandsAccessPolicy-${NamespaceName}"
      Description: Policy allowing access to S3
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Action:
          - s3:GetObject
          - s3:GetBucketAcl
          - s3:GetBucketCors
          - s3:GetEncryptionConfiguration
          - s3:GetBucketLocation
          - s3:ListBucket
          - s3:ListAllMyBuckets
          - s3:ListMultipartUploadParts
          - s3:ListBucketMultipartUploads
          - s3:PutObject
          - s3:PutBucketAcl
          - s3:PutBucketCors
          - s3:DeleteObject
          - s3:AbortMultipartUpload
          - s3:CreateBucket
          Resource: arn:aws:s3:::*

次に作成したRedshift内に、S3からロードしたデータを保存するテーブルを作成します。

マネジメントコンソールのRedshift Serverlessの画面からクエリエディタv2を開き、ユーザ名とパスワードを指定して、作成したデータベースに接続します。


以下のコマンドを実行して、テーブルを作成します。

CREATE TABLE public.user
(
    id INTEGER NOT NULL,
    name VARCHAR(22) NOT NULL,
    age INTEGER NOT NULL
);

これでデータレイク及びDWHの環境が作成できました。

Step Functionsステートマシンの作成

今回のコア部分である、ステートマシンを作成していきます。
このステートマシンでは、Data APIをつかってCOPYコマンドを実行し、S3からRedshiftにデータをロードします。

Step Functionsのステートマシンの作成で、「コードでワークフローを記述」を選択し、以下のASL(Amazon States Language)を定義に入力します。

{
  "Comment": "A description of my state machine",
  "StartAt": "ExecuteStatement",
  "States": {
    "ExecuteStatement": {
      "Type": "Task",
      "Parameters": {
        "WorkgroupName": "default-wg",
        "Database": "dev",
        "Sql": "COPY public.user FROM 's3://<bucket-name>/users.csv' credentials 'aws_iam_role=<redshift-serverless-role-arn>' FORMAT CSV;"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
      "Next": "Wait"
    },
    "Wait": {
      "Type": "Wait",
      "Next": "DescribeStatement",
      "Seconds": 5
    },
    "DescribeStatement": {
      "Type": "Task",
      "Parameters": {
        "Id.$": "$.Id"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement",
      "Next": "Choice"
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Status",
          "StringEquals": "FINISHED",
          "Next": "Success"
        },
        {
          "Or": [
            {
              "Variable": "$.Status",
              "StringEquals": "SUBMITTED"
            },
            {
              "Variable": "$.Status",
              "StringEquals": "STARTED"
            },
            {
              "Variable": "$.Status",
              "StringEquals": "PICKED"
            }
          ],
          "Next": "Wait"
        }
      ],
      "Default": "Fail"
    },
    "Success": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail"
    }
  }
}

バケット名、RedshiftのIAMロール名は、作成したリソースのものに変更してください。

アクセス許可は「新しいロールの作成」を選択して、ステートマシンを作成します。

ここで作成したステートマシンの解説です。

まず、Redshift Data APIのExecuteStatementでCOPYコマンドを実行します。

COPY user FROM 's3://<bucket-name>/users.csv' 
credentials 'aws_iam_role=<redshift-iam-role-arn>'
FORMAT CSV;

この処理は非同期処理なので、クエリの完了まで待つには自前でポーリングを行う必要があります。

クエリの実行後5秒待ち、DescribeStatementでクエリの実行状況を確認します。
クエリが完了($.Status == "FINISHED")していれば、ステートマシンを終了します。
クエリが完了していない($.Status == "SUBMITTED" or $.Status == "STARTED" or $.Status == "PICKED")場合は再び5秒待ち、DescribeStatementでクエリを確認します。
クエリがエラーを発生させていた場合は、失敗としてステートマシンを終了します。

ステートメントで同期実行が対応されていれば、今回のようにワークフローでポーリング処理を入れる必要はありません。しかし今回使用したExecuteStatementはSDK統合であり、同期実行がサポートされていなかったため、自前でポーリング処理を行なっています。

以上が処理の流れです。

最後に、作成したステートマシンがRedshiftでコマンドを実行できるように権限を与えます。
必要な権限は「Redshift Data APIを利用するIAM権限」と「Redshift内のユーザがテーブルに書き込みを行う権限」です。

まず「Redshift Data APIを利用するIAM権限」を付与します。
ステートマシンにアタッチされているIAMロールに、以下のインラインポリシーを追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "redshift-data:DescribeStatement",
                "redshift-data:ExecuteStatement",
                "redshift-serverless:GetCredentials"
            ],
            "Resource": "*"
        }
    ]
}

次は「Redshiftユーザがテーブルに書き込みを行う権限」です。
再度クエリエディタv2にログインし、以下のSQLを実行します。

CREATE USER "IAMR:<step-functions-iam-role-name>" PASSWORD DISABLE;

GRANT INSERT
ON public.user
TO "IAMR:<state-machine-iam-role-name>";

``にはステートマシンにアタッチされているIAMロール名を指定します。ARNではないので注意してください。

以上で全ての設定が完了しました。

動作確認

では実際にステートマシンを実行して、S3からRedshiftにデータをロードできることを確認してみます。

作成したステートマシンを手動で実行します。

数秒で実行完了しました。

クエリエディタでテーブルを確認してみると、問題なくデータがロードされていました。

実行イベント履歴を見てみると、クエリ時間が短かったため、1度のポーリングで終了していました。

複数回ポーリングが行われる場面も見たいので、待機時間を5秒から1秒に変更して再度実行してみます。


1度目のDescribeStatementでは処理が終わっておらず、2度目で処理が完了しました。

本番運用の際の注意点

今回はアイディアに注目するため、ステートマシンできるだけシンプルに構成しました。
実際に業務利用する場合は考慮が必要な点があります。

まずエラーハンドリング、リトライ、及び通知の実装をしましょう。上記のステートマシンでは、クエリの実行や結果の取得操作でエラーが発生した場合、管理者が気づくことができません。Catch処理やRetry処理を入れ、適切にハンドリングしましょう。また、ステートマシン自体がエラー終了する可能性もあるため、EventBridgeでのエラー検出もご検討ください。

また、Redshiftにクエリが集中している場合、クエリが完了するまで長時間かかる可能性があります。Step Functions標準ワークフローは状態遷移の回数で課金されるため、クエリが長時間かかる場合、ポーリング処理のループ回数が多くなり利用料金が増えてしまいます。ループ回数を減らすには、ポーリング間隔(Waitの時間)を長くします。Redshiftの利用状況及びクエリの実行時間を考慮して、ポーリング間隔を決定すると良いでしょう。加えて、ステートマシン自体にタイムアウトを設定することも有効です。

最後に

今回はワークフロー管理にStep Functionsを利用する際の、Redshiftへのデータ取り込み方法を紹介しました。
実装内容はCOPYコマンドを利用しましたが、Redshift Spectrumなどを利用することも可能です。

また、データソースからデータレイクへのデータ取得もフローに組み込むことで、処理フローをより簡潔にできるでしょう。
ぜひStep Functionsによるワークフロー管理をご検討ください。

以上、データアナリティクス事業本部コンサルティングチームの八木(@goat0613)でした!

参考リンク

ETL orchestration using the Amazon Redshift Data API and AWS Step Functions with AWS SDK integration
ステートマシン構造
Amazon Redshift Data API
Data APIを使用した Amazon Redshift Serverless への接続
IAM 認証を使用したデータベースユーザー認証情報の生成
AWS Step Functionsの料金