データアナリティクス事業本部の鈴木です。
Step FunctionsとLambdaからAthenaに検索を実行するためのクラウド構成を作りたいことがあり、AWS CDK v2で1発で作成できると便利なので実装をしてみました。
よくある構成だと思ったので検証した内容をブログにまとめました。
やりたかったこと
Step FunctionsのステートマシンからLambda関数を呼び出し、Athenaに検索を実行する仕組みを作ることが目標です。
Step FunctionsからAthenaへ検索を実行する構成は、大きく以下の2つがあると思いますが、今回はNo.1の例をご紹介します。
# | 方法 | メリット | デメリット |
---|---|---|---|
1 | ステートマシンからLambdaなどのコンピュート系サービスを呼び出してAthenaに検索を実行させる。 | ステートマシンからコンピュート系サービスにパラメータを渡すことで、一つの実装で処理内容を切り替えることができる。例えば読み込むSQLファイルのパスを変えるなど。 | コンピュート系サービスに配置する実装が必要になる。 |
2 | ステートマシンから直接AthenaのAPIを実行する。 | コンピュート系サービスに配置する実装が不要になる。 | ステートマシンの定義に直接SQL文を定義するので、ステートマシンとSQL文が1:1になってしまい、SQL文の分だけ数が増えてしまう。 |
今回はLambda関数を間に挟むことで、Lambda関数に渡すパラメータを変えて実行するSQL文を切り替えたり加工したりしたいという意図があります。
特にStep FunctionsとLambdaは、これまではServelress Frameworkで作成することが多かったのですが、AWS CDK v2で作成したく、作り方を試してみました。
準備
作成する構成
以下のような構成を作成します。特にStep FunctionsとLambdaはAWS CDK v2で作成したく、この記事では特にその部分のコードをご紹介します。
Step FuncitonsからLambda関数を呼び出します。Lambda関数は事前にS3にアップロードしてあるSQLファイルを読み込み、Athenaで実行します。
前提
以下は既に作成されていることとしました。
- SQLファイルを格納するS3バケット
- Athenaから検索するデータが格納されているS3バケットおよびデータ
- Athenaから検索に利用するGlueテーブル
- Athenaでの検索に使うワークグループおよび検索結果を格納するS3バケット
今回は、Step FunctionsとLambdaの箇所について中心に説明しますが、上記のリソースは設定で切り替えるようにするため、その準備自体は省略します。一部工夫が必要だった点については、記事内で触れます。
環境
- AWS CDK:
2.73.0
やってみる
1. AWS CDK v2用のコードの準備
以下のようにして、プロジェクトを作成しました。
# プロジェクト用のディレクトリ作成
mkdir cdk_lambda
# プロジェクトの初期化
cd cdk_lambda
cdk init sample-app --language typescript
# Lambda関数用のコードを格納するディレクトリ
mkdir function.py
以下のように、CdkLambdaStack
クラスを作成しました。
lib/cdk_lambda-stack.ts
import { Stack, StackProps, CfnOutput, Aws } from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import {
aws_iam as iam,
aws_lambda as lambda,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
export class CdkLambdaStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const LambdaLogsPolicy = new iam.ManagedPolicy(this, 'LambdaLogsPolicy', {
// CloudWatchLogsへのログの配信用ポリシー
managedPolicyName: 'LambdaLogsPolicy',
description: 'Lambda logs policy',
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutLogEvents'],
resources: ['*'],
}),
],
});
const LambdaGetSourceDataPolicy = new iam.ManagedPolicy(this, 'LambdaGetSourceDataPolicy', {
// AthenaでSQLファイルおよびソースとなるデータを取得するためのポリシー
managedPolicyName: 'LambdaGetSourceDataPolicy',
description: 'Lambda get source data policy',
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [
`arn:aws:s3:::${this.node.tryGetContext("SQL_BUCKET_NAME")}/*`,
`arn:aws:s3:::${this.node.tryGetContext("DATA_BUCKET_NAME")}/*`,
],
}),
],
});
const lambdaRole = new iam.Role(this, 'lambdaRole', {
roleName: 'lambdaRole',
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
});
// 実運用する際は、権限を絞る。
lambdaRole.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonAthenaFullAccess'));
lambdaRole.addManagedPolicy(LambdaLogsPolicy);
lambdaRole.addManagedPolicy(LambdaGetSourceDataPolicy);
const sampleLambda = new lambda.Function(this, 'cm-nayuts-sample-lambda', {
runtime: lambda.Runtime.PYTHON_3_9,
handler: 'function.lambda_handler',
code: lambda.Code.fromAsset('lambda_function'),
role: lambdaRole
});
sampleLambda.addEnvironment('SQL_BUCKET_NAME', this.node.tryGetContext("SQL_BUCKET_NAME"));
sampleLambda.addEnvironment('SQL_OBJECT_KEY', this.node.tryGetContext("SQL_OBJECT_KEY"));
sampleLambda.addEnvironment('DATABASE_NAME', this.node.tryGetContext("DATABASE_NAME"));
sampleLambda.addEnvironment('WORKGROUP', this.node.tryGetContext("WORKGROUP"));
const stateMachine = new sfn.StateMachine(this, 'MyStateMachine', {
definition: new tasks.LambdaInvoke(this, "MyLambdaTask", {
lambdaFunction: sampleLambda
}).next(new sfn.Succeed(this, "GreetedWorld"))
});
}
}
ポイントは以下です。
- Athenaを実行する際のポリシーは
AmazonAthenaFullAccess
を指定しました。実運用する場合にさらに厳密に権限を絞りたいときは別途ポリシーの準備が必要です。 - Lambda関数ハンドラーは、次の節で記載する
lambda_function
ディレクトリのfunction.py
ファイルに定義したlambda_handler
関数を指定しました。 - Lambda関数で使う環境に関する値は、環境変数として渡すようにしました。
特に3点目は以下の5つを、cdk.json
のcontext
にキーバリューを定義しておき、それを参照する形にしました。
# | キー | バリューの説明 |
---|---|---|
1 | SQL_BUCKET_NAME | Athenaで実行するSQLを記載したファイルをアップロードしておくS3バケット名 |
2 | SQL_OBJECT_KEY | S3バケットにアップロードした、Athenaで実行するSQLを記載したファイルのオブジェクトキー |
3 | DATA_BUCKET_NAME | Athenaで検索するGlueテーブルが参照するS3バケット名 |
4 | DATABASE_NAME | Athenaで検索するGlueテーブルがあるGlueデータベース名 |
5 | WORKGROUP | Athenaで検索に使うワークグループ名 |
2. Lambda関数用のコードの準備
Lambda関数用に、以下のPythonスクリプトを用意しました。
lambda_function/function.py
import os
import json
import boto3
s3 = boto3.resource('s3')
athena = boto3.client('athena', region_name='ap-northeast-1')
SQL_BUCKET_NAME = os.environ['SQL_BUCKET_NAME']
SQL_OBJECT_KEY = os.environ['SQL_OBJECT_KEY']
DATABASE_NAME = os.environ['DATABASE_NAME']
WORKGROUP = os.environ['WORKGROUP']
def lambda_handler(event, context):
bucket = s3.Bucket(SQL_BUCKET_NAME)
obj = bucket.Object(SQL_OBJECT_KEY)
workgroup = WORKGROUP
response = obj.get()
body = response['Body'].read()
sql = body.decode('utf-8')
result = athena.start_query_execution(
QueryString=sql,
QueryExecutionContext={
'Database': DATABASE_NAME
},
WorkGroup=workgroup
)
return result
ポイントは以下になります。
- Athenaで実行するSQLファイルの場所、Athenaのデータベースおよびワークグループは環境変数から取得できるようにしました。
- クエリ結果の保存先は、ワークグループで指定した箇所としました。今回はLambda関数が使うサービスロールに
AmazonAthenaFullAccess
をアタッチしていますが、このポリシーでarn:aws:s3:::aws-athena-query-results-*
へのアクセス許可がされています。Athenaの結果の保存先はアカウントごとにこのような名称のバケットにすることが多いので、ワークグループの粒度で指定されたものを使うという想定でこのようにしました。
3. ほかのリソースの準備
以下について補足程度ですがどのようにしておいたか記載します。
- SQLファイルのS3バケットへの配置
- SQLで検索するテーブル
SQLで検索するテーブル
AWS Glue Data Quality(プレビュー) をAWS Glueコンソールから試してみよう!で以前作成したiris_quality_check
テーブルを使いました。今回は単に任意のテーブルからデータを検索できればそれでいいので、詳細については触れませんが、ご興味があれば記事をご参照ください。
SQLファイルのS3バケットへの配置
以下のように、SQLファイルを配置するS3バケットにアップロードしておきました。
SQL文は、先に記載したiris_quality_check
テーブルからSELECTするものです。
sample.sql
select *
from iris_quality_check
4. スタックのデプロイ
1. AWS CDK v2用のコードの準備でご紹介したプロジェクトは、以下のようにプロジェクトのルートからCDKのコマンドでデプロイしました。
cdk deploy --profile プロファイル名
--profile
オプションについては、AWS CDK Toolkit (cdk command) - AWS Cloud Development Kit (AWS CDK) v2のSpecifying Region and other configurationをご確認ください。
5. ステートマシンの動作確認
作成したステートマシンを実行し、正常に終了するか確認しました。
実行を開始
から、デフォルトの設定で実行してみました。
以下のようにステートマシンの実行成功が確認できました。
Athenaの実行履歴からも、SQLが成功していることが確認できました。
6. 後片付け
CDKから作成したリソースについては、以下のコマンドで削除できます。
cdk destroy --profile プロファイル名
手動で作成したリソースについては、同様に手動で削除しました。今回はS3およびGlueテーブルのため、手動で簡単に削除が可能です。
補足
Athenaのワークグループについて
LambdaからAthenaに検索をかける際、データソースとなるS3バケットへのアクセス権限だけではなく、検索結果を入れるS3バケットへのアクセス権限が必要になります。この権限および格納先指定の設定で少し悩んだので補足として記載します。
LambdaにアタッチするIAMロールに、今回はAmazonAthenaFullAccess
ポリシーをアタッチしました。このポリシーではarn:aws:s3:::aws-athena-query-results-*
へのアクセス許可がされています。
Lambdaから実行したSQL文の結果は、boto3からstart_query_execution
APIにリクエストする際に、ワークグループ名を指定することで間接的に保存先を指定しました。
ワークグループは今回の検証のため、以下のように新しく作成しており、その設定の中で、aws-athena-query-results-
から始まるバケットを指定しました。
最後に
Step FunctionsとLambdaからAthenaに検索を実行するためのクラウド構成について、AWS CDK v2によるコード例をご紹介しました。
今回はシンプルな構成ですが、読み込むSQLファイルをステートマシンから渡されたパラメータをもとに切り替えるなどすれば、1セットのステートマシンとLambda関数で様々な処理が実行できそうです。
参考になりましたら幸いです。