データアナリティクス事業本部のueharaです。
今回は、ECSとStep Functionsでdbtを動かし、Redshiftへデータを連携するデータパイプラインを構築してみたいと思います。
はじめに
今回作成するアーキテクチャは、AWSビッグデータブログのこちらの記事で紹介されている以下のアーキテクチャです。
dbtを含むDockerイメージをECRで管理し、それをStep FunctionsからトリガーされるECSタスクとしてFargateで実行します。
その他、Redshiftへの接続情報はSecrets Managerで管理し、dbtの実行により作成されるドキュメントはS3に保存する形になります。
前提
今回は、VPCやサブネットが準備されており、そこでRedshift Serverlessが既に動いていることを想定します。(RedshiftはServerlessでなくても問題ありません)
また、今回はECSをプライベートサブネットで起動するので、以下のVPCエンドポイントが既に構築され、適切にセキュリティグループが設定されていることととします。
サービス | エンドポイントの種類 |
---|---|
com.amazonaws.ap-northeast-1.s3 | Gateway |
com.amazonaws.ap-northeast-1.secretsmanager | Interface |
com.amazonaws.ap-northeast-1.ecr.dkr | Interface |
com.amazonaws.ap-northeast-1.ecr.api | Interface |
com.amazonaws.ap-northeast-1.logs | Interface |
com.amazonaws.ap-northeast-1.monitoring | Interface |
CloudFormationテンプレートの準備
ここでは、適用する順番でCloudFormationテンプレートを記載します。
Secrets Manager
Secrets Managerのテンプレートは以下の通りです。
01_Secrets_Manager.yml
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::SecretsManager-2020-07-23
Parameters:
RedshiftSecretName:
Type: String
Default: "my-redshift-secret"
RedshiftHostName:
Type: String
RedshiftDBName:
Type: String
RedshiftUserName:
Type: String
RedshiftUserPassword:
Type: String
NoEcho: true
Resources:
# Secrets Manager
DbtRedshiftSecret:
Type: 'AWS::SecretsManager::Secret'
Properties:
Name: !Ref RedshiftSecretName
Description: for dbt elt
SecretString: !Sub
- '{"engine":"redshift","username":"${username}","password":"${passwd}","host":"${host}","dbClusterIdentifier":"${dbname}"}'
- username: !Ref RedshiftUserName
passwd: !Ref RedshiftUserPassword
host: !Ref RedshiftHostName
dbname: !Ref RedshiftDBName
Outputs:
DbtRedshiftSecret:
Value: !Ref DbtRedshiftSecret
Description: secret for dbt elt
Export:
Name: DbtRedshiftSecret
RedshiftSecretName:
Value: !Ref RedshiftSecretName
Description: redshift secret name
Export:
Name: RedshiftSecretName
パラメーターは以下の通りです。
Parameter | 設定値 | 例 |
---|---|---|
RedshiftSecretName | シークレット名 | my-redshift-secret |
RedshiftHostName | Redshiftのホスト名 | (クラスタ名).(AccountID).ap-northeast-1.redshift-serverless.amazonaws.com |
RedshiftDBName | DB名 | dev |
RedshiftUserName | ユーザ名 | dbt_user |
RedshiftUserPassword | パスワード | dbt_Pass_#123 |
ECR
ECRのテンプレートは以下の通りです。
02_ECR.yml
AWSTemplateFormatVersion: "2010-09-09"
Description: "Create ECR"
Resources:
# ECR
DbtEcr:
Type: "AWS::ECR::Repository"
Properties:
RepositoryName: "my-dbt-ecr"
Outputs:
DbtEcrRepoUri:
Value: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/my-dbt-ecr"
Export:
Name: DbtEcrRepoUri
ここではレポジトリ名を決め打ちでmy-dbt-ecr
としています。
ECS
ECSのテンプレートは以下の通りです。
03_ECS.yml
AWSTemplateFormatVersion: "2010-09-09"
Description: "Create ECS"
Parameters:
MyVpcId:
Type: String
Resources:
# ECS Cluster
DbtEcsCluster:
Type: "AWS::ECS::Cluster"
Properties:
ClusterName: "my-dbt-ecs-cls"
# ECS LogGroup
DbtEcsLogGroup:
Type: "AWS::Logs::LogGroup"
Properties:
LogGroupName: "/ecs/logs/my-dbt-ecs-lg"
# ECS TaskDefinition
DbtEcsTaskDefinition:
Type: "AWS::ECS::TaskDefinition"
Properties:
Cpu: 256
ExecutionRoleArn: !Ref DbtEcsTaskExecutionRole
TaskRoleArn: !Ref DbtEcsTaskRole
Family: "my-dbt-ecs-task"
Memory: 512
NetworkMode: awsvpc
RequiresCompatibilities:
- FARGATE
ContainerDefinitions:
- Name: "my-dbt-container"
Image: !ImportValue DbtEcrRepoUri
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-group: !Ref DbtEcsLogGroup
awslogs-region: !Ref "AWS::Region"
awslogs-stream-prefix: "ecs"
MemoryReservation: 128
Environment:
- Name: secret_name
Value: !ImportValue RedshiftSecretName
- Name: region_name
Value: !Ref "AWS::Region"
# IAM Role
DbtEcsTaskExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: ecs-tasks.amazonaws.com
Action: sts:AssumeRole
Path: "/"
Policies:
- PolicyName: ECSTaskExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- "logs:CreateLogStream"
- "logs:PutLogEvents"
- "logs:CreateLogGroup"
- "ecr:GetAuthorizationToken"
- "ecr:BatchCheckLayerAvailability"
- "ecr:GetDownloadUrlForLayer"
- "ecr:BatchGetImage"
Resource: '*'
DbtEcsTaskRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: ecs-tasks.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/AmazonS3FullAccess"
- "arn:aws:iam::aws:policy/AmazonRedshiftDataFullAccess"
Path: "/"
Policies:
- PolicyName: ECSTaskPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- "secretsmanager:GetSecretValue"
- "secretsmanager:DescribeSecret"
Resource: '*'
# SG
DbtEcsSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: "my-dbt-ecs-sg"
GroupDescription: Security Group for ECS Task
VpcId: !Ref MyVpcId
Tags:
- Key: "Name"
Value: "my-dbt-ecs-sg"
Outputs:
DbtEcsClusterArn:
Description: "ECS Cluster ARN"
Value: !GetAtt DbtEcsCluster.Arn
Export:
Name: DbtEcsClusterArn
DbtEcsLogGroup:
Description: "LogGroup"
Value: !Ref DbtEcsLogGroup
Export:
Name: DbtEcsLogGroup
DbtEcsTaskExecutionRole:
Description: "Role for ECS Exec"
Value: !Ref DbtEcsTaskExecutionRole
Export:
Name: DbtEcsTaskExecutionRole
DbtEcsTaskExecutionRoleArn:
Description: "Role ARN"
Value: !GetAtt DbtEcsTaskExecutionRole.Arn
Export:
Name: DbtEcsTaskExecutionRoleArn
DbtEcsTaskRole:
Description: "Role for ECS"
Value: !Ref DbtEcsTaskRole
Export:
Name: DbtEcsTaskRole
DbtEcsTaskRoleArn:
Description: "Role ARN"
Value: !GetAtt DbtEcsTaskRole.Arn
Export:
Name: DbtEcsTaskRoleArn
DbtEcsTaskDefinition:
Description: "ARN of the ECS Task Definition"
Value: !Ref DbtEcsTaskDefinition
Export:
Name: DbtEcsTaskDefinition
DbtEcsSecurityGroup:
Description: "SG for ECS"
Value: !Ref DbtEcsSecurityGroup
Export:
Name: DbtEcsSecurityGroup
ここではクラスタ名をmy-dbt-ecs-cls
、ロググループ名を/ecs/logs/my-dbt-ecs-lg
、タスク定義をmy-dbt-ecs-task
で決め打ちしています。
タスク定義の環境変数にはSecret ManagerのURIと、リージョンを設定しています。(ECSタスク実行時に認証情報を取得するのに必要なため)
パラメーターは以下の通りです。
Parameter | 設定値 | 例 |
---|---|---|
MyVpcId | VpcId | vpc-xxxxx |
Step Functions
今回はStep FunctionsもCloudFormationで作成してみます。
テンプレートは以下の通りです。
04_StepFunctions.yml
AWSTemplateFormatVersion: "2010-09-09"
Description: Step Functions to run ECS task
Parameters:
MyVpcId:
Type: String
MyPrivateSubnet1:
Type: String
MyPrivateSubnet2:
Type: String
MyPrivateSubnet3:
Type: String
Resources:
DbtSfn:
Type: "AWS::StepFunctions::StateMachine"
Properties:
StateMachineName: "my-run-task-sf"
DefinitionString: !Sub
- |-
{
"Comment": "State Machine to run a dbt ECS task",
"StartAt": "RunEcsFargateTask",
"States": {
"RunEcsFargateTask": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"Cluster": "${DbtEcsClusterArn}",
"LaunchType": "FARGATE",
"TaskDefinition": "${DbtEcsTaskDefinition}",
"NetworkConfiguration": {
"AwsvpcConfiguration": {
"Subnets": [
"${MyPrivateSubnet1}",
"${MyPrivateSubnet2}",
"${MyPrivateSubnet3}"
],
"SecurityGroups": [
"${SecurityGroup}"
],
"AssignPublicIp": "DISABLED"
}
}
},
"End": true
}
}
}
- DbtEcsClusterArn: !ImportValue DbtEcsClusterArn
DbtEcsTaskDefinition: !ImportValue DbtEcsTaskDefinition
SecurityGroup: !ImportValue DbtEcsSecurityGroup
MyPrivateSubnet1: !Ref MyPrivateSubnet1
MyPrivateSubnet2: !Ref MyPrivateSubnet2
MyPrivateSubnet3: !Ref MyPrivateSubnet3
RoleArn: !GetAtt DbtSfnExecutionRole.Arn
DbtSfnExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: my-state-machine-policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- ecs:RunTask
Resource:
- !ImportValue DbtEcsTaskDefinition
- Effect: Allow
Action:
- iam:PassRole
Resource:
- !ImportValue DbtEcsTaskExecutionRoleArn
- !ImportValue DbtEcsTaskRoleArn
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
- Effect: Allow
Action:
- ecs:StopTask
- ecs:DescribeTasks
Resource: "*"
- Effect: "Allow"
Action:
- "events:PutTargets"
- "events:PutRule"
- "events:DescribeRule"
Resource: "*"
Outputs:
DbtSfn:
Description: "StepFunctions to run ECS task"
Value: !Ref DbtSfn
Export:
Name: DbtSfn
ステートマシン名はmy-run-task-sf
で決め打ちしています。
Redshift ServelessはVPC内の3つのAZに配置された3つ以上のサブネットが必要であり、今回は3つのサブネットを利用した環境を想定しているため、パラメーターの設定も以下の通りとしています。
Parameter | 設定値 | 例 |
---|---|---|
MyVpcId | VpcId | vpc-xxxxx |
MyPrivateSubnet1 | プライベートサブネット1 | subnet-xxxxx |
MyPrivateSubnet2 | プライベートサブネット2 | subnet-xxxxx |
MyPrivateSubnet3 | プライベートサブネット3 | subnet-xxxxx |
デプロイ
CloudFormationの準備ができたので、デプロイをします。
CloudFormationの適用
上記で準備したCloudFormationを順番に適用して下さい。
プロジェクトの作成
インフラが用意できたら、Dockerイメージを作成します。
まず、適当なフォルダを作成し、dbt init
によりプロジェクトを作成します。
なお、ここではあくまで プロジェクトファイルを用意したいだけなので、対話形式で聞かれる接続情報等は全てダミーでOK です。(作成されるprofiles.ymlは後で置き換えます)
$ dbt init dbt_src --profiles-dir .
... # 接続情報等聞かれるがダミーでOK
実行が完了すると以下のようにプロジェクトフォルダが作成されているかと思います。logs
はいらないので削除します。
.
├ dbt_src
└ logs # いらないので削除
次に、dbt_src
内のprofiles.yml
を以下に書き換えます。
profiles.yml
dbt_src:
target: dev
outputs:
dev:
type: redshift
host: "{{ env_var('DBT_REDSHIFT_HOST') }}"
user: "{{ env_var('DBT_REDSHIFT_USER') }}"
password: "{{ env_var('DBT_REDSHIFT_PASSWORD') }}"
port: 5439
dbname: dev
schema: public
threads: 4
keepalives_idle: 240
connect_timeout: 10
sslmode: require
追加ファイルの作成
次に、dbt_src
フォルダにexport_redshift_connection.py
を作成します。
export_redshift_connection.py
import boto3
from botocore.exceptions import ClientError
import json
import os
secret_name = os.environ.get("secret_name")
region_name = os.environ.get("region_name")
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
# Decrypts secret using the associated KMS key.
secret = get_secret_value_response['SecretString']
db_secret = json.loads(secret)
dbhost = db_secret['host']
dbname = db_secret['dbClusterIdentifier']
username = db_secret['username']
password = db_secret['password']
host = dbhost.split(":")[0]
with open(".redshift_credentials",'w') as file:
file.write(f"export DBT_REDSHIFT_HOST={host}")
file.write("\n")
file.write(f"export DBT_REDSHIFT_USER={username}")
file.write("\n")
file.write(f"export DBT_REDSHIFT_DB={dbname}")
file.write("\n")
file.write(f"export DBT_REDSHIFT_PASSWORD='{password}'")
file.close()
これはSecret ManagerからRedshiftの接続情報を取得するスクリプトで、AWS公式サンプルのこちらをそのまま利用させて頂いてます。
スクリプト上部で環境変数からsecret_name
とregion_name
を取得していますが、こちらが先のECSのCloudFormationテンプレートで設定した環境変数の値になります。
同様に、dbt_src
フォルダrun_dbt.sh
を作成し、以下を記載します。
run_dbt.sh
#!/bin/bash
set -e
echo "Running python script to retreive redshift connection params"
python3 ./export_redshift_connection.py
echo "Exporting Redshift credentials as environment variables to be used by dbt"
. ./.redshift_credentials
echo "Running DBT commands"
echo "Run all model files holding business logic"
dbt run --profiles-dir . --project-dir .
echo "Generate documentation files"
dbt docs generate --profiles-dir . --project-dir .
echo ""
echo "Copying dbt documentation files for hosting"
aws s3 cp --recursive --exclude="*" --include="*.json" --include="*.html" target/ s3://<バケット名>/dbt_poc_test/
こちらにはECSタスク起動時に実行されるスクリプトを記載しています。
流れとしては「Redshift接続情報取得 → dbt run
実行 → dbt docs generate
実行 → ドキュメントをS3にコピー」となっています。
※最後のコピー先のS3バケット名はご自身の環境に置き換えて下さい。
dbt init
をした際にデフォルトでサンプルファイルが入っているので、今回はそちらの動きを確認します。
ここまでできたらフォルダは以下のようになっているかと思います。
.
└ dbt_src
├ ... (その他プロジェクトファイル)
├ profiles.yml
├ export_redshift_connection.py
└ run_dbt.sh
Dockerイメージの作成、Push
各種ファイルが用意できたので、Dockerイメージを作成します。
dbt_src
フォルダと同じ階層にDockerfileを作成します。
.
├ dbt_src
└ Dockerfile
Dockerfileは次の通り記載をします。
Dockerfile
FROM python:3
ADD dbt_src /dbt_src
RUN pip install -U pip
# Install DBT libraries
RUN pip install --no-cache-dir dbt-core
RUN pip install --no-cache-dir dbt-redshift
RUN pip install --no-cache-dir boto3
RUN pip install --no-cache-dir awscli
WORKDIR /dbt_src
RUN chmod -R 755 .
ENTRYPOINT [ "/bin/sh", "-c" ]
CMD ["./run_dbt.sh"]
記載内容を簡単に説明すると、まずベースイメージはPython3です。
ADD
を使用して、先に作成したdbt_src
フォルダをコンテナ内の/dbt_src
にコピーしています。
その後、pipによる必要モジュールのインストールや、Workディレクトリの設定、権限設定をしています。
最終的にCMD
にrun_dbt.sh
を指定し、コンテナが起動したときにスクリプトが走るようにしています。
これで準備は完了したので、ビルドをPushをしたいと思います。
AWSマネジメントコンソールからECRにアクセスし、リポジトリからmy-dbt-ecr
を開くと「プッシュコマンド表示」とあるので、ここからビルドとPush方法を確認することができます。
私は明示的にAWSで使用するアカウントのプロファイルを設定したいのと、M1 Mac上でのビルドであったため以下手順で実行しました。
# プロファイルを明示的に指定
$ aws ecr get-login-password --region ap-northeast-1 --profile (AWSプロファイル) | docker login --username AWS --password-stdin (AccountID).dkr.ecr.ap-northeast-1.amazonaws.com
# アーキテクチャを明示的に指定
$ docker build -t my-dbt-ecr . --platform=linux/amd64
$ docker tag my-dbt-ecr:latest (AccountID).dkr.ecr.ap-northeast-1.amazonaws.com/my-dbt-ecr:latest
$ docker push (AccountID).dkr.ecr.ap-northeast-1.amazonaws.com/my-dbt-ecr:latest
Pushができたら、マネジメントコンソールからアップロードできているか確認して下さい。
実行確認
最後に、Step Functionsを実行し動作確認を行います。
AWSマネジメントコンソールのStep Functionsから作成したmy-run-task-sf
にアクセスします。
「実行を開始」ボタンから実行を開始し、RunEcsFargateTaskが成功したら処理が正常に実行できています。
Redshiftのクエリエディタv2を用いて、サンプルデータがRedshiftにロードできていることが確認できます。
S3バケットにも、ドキュメントがアップロードできていることが確認できました。
最後に
今回は、ECSとStep Functionsでdbtを動かし、Redshiftへデータを連携するデータパイプラインを構築してみました。
参考になりましたら幸いです。