Athena で S3 と MySQL を JOIN する

本記事では、CDK を使い、Amazon Athena で S3 と MySQL を JOIN する構成をご紹介します。 S3 と他のデータソースを組み合わせた分析の参考になれば幸いです。
2024.05.10

目的

クラスメソッドタイランドの清水です。
本記事では Amazon Athena を使って、S3 と MySQL を JOIN するクエリを発行します。
大まかに以下のような構成になります。

前提条件・知識

  • AWS アカウントを作成済み
  • IAM Role, Policy, Cloud9 の環境を作成できる権限がある
  • 使いたいAWS アカウントのリージョンで cdk bootstrap コマンドを実行済み
  • aws cli の使い方

手順

環境構築

Cloud9 を使って CDK をデプロイしたり、必要なコマンドを実行するための環境を構築します。
もし手元に環境があればこの工程はスキップしても問題ありません。

まずは Cloud9 の環境が使う EC2 インスタンスにアタッチするロールを作成します。
ロールには以下のポリシーを関連付けます。
⚠️ 最小権限ではありません。実際のプロジェクトで使うときは、最小権限を設定したほうが良いです。

  • AWSCloud9SSMInstanceProfile: Session Manager で Cloud9 の環境に接続するためです。
  • AmazonS3FullAccess: S3 にオブジェクトをアップロード、削除するためです。
  • 以下の Inline Policy:cdk のデプロイに必要な権限、CloudFormation の Stack を Describe する権限です。
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
                "arn:aws:iam::*:role/cdk-*"
            ]
        },
        {
            "Sid": "CfnDescribe",
            "Effect": "Allow",
            "Action": [
                "cloudformation:DescribeStacks"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

そして、Cloud9 の環境を作成し、EC2 インスタンスにロールをアタッチします。

ロールのアタッチは以下の記事を参考にしていただければと思います。

CDK で全リソースを作成する

まずは以下のコマンドで、本ブログ用の Git リポジトリ https://github.com/yuta-cmth/blog-athena-join-s3-and-mysql.git をクローンし、CDK コマンドでリソースを作成します。

git clone https://github.com/yuta-cmth/blog-athena-join-s3-and-mysql.git
cd blog-athena-join-s3-and-mysql
npm i
cdk deploy --require-approve never

以下のような出力が確認できたら成功です。

✨ Total time: 550.1s

成功すると以下のようなリソースが作成されます。

冒頭の構成図より複雑になっていますが、これには大きく2つの理由があります。

  1. Lambda の MySQL connector が VPC によるプライベートネットワークを使っているため
  2. MySQL の認証情報が Secrets Manager に保存されているため

ひとまず MySQL と S3 を JOIN することを優先にして、解説を最後に回します。
また、この時点で RDS の DB は以下の SQL で初期化されています。

CREATE TABLE IF NOT EXISTS `blog_athena_join_s3_mysql`.`master_dimension` (
  `key` VARCHAR(256) NOT NULL,
  `name` VARCHAR(256) NOT NULL,
  PRIMARY KEY (`key`));

INSERT INTO `blog_athena_join_s3_mysql`.`master_dimension` (`key`, `name`) 
VALUES ('tokyo', '東京'), ('osaka', '大阪'), ('nagoya', '名古屋'), ('fukuoka', '福岡');

つまり、RDS MySQL のテーブル blog_athena_join_s3_mysql.master_dimension の中身は以下のような状態になっています。
key にはローマ字の都道府県名、name には漢字の都道府県名が入っています。

key name
fukuoka 福岡
nagoya 名古屋
osaka 大阪
tokyo 東京

CDK の中で DB を初期化する点についても後ほど触れます。

S3 にサンプルデータをアップロードする

続いて、以下のコマンドで S3 にサンプルのデータを入れます。

bucket_name=$(aws cloudformation describe-stacks --stack-name BlogAthenaJoinS3AndMysqlStack --output text --query 'Stacks[0].Outputs[?OutputKey==`BucketName`].OutputValue')
aws s3 cp ./s3_test_data/data "s3://${bucket_name}/data" --recursive

これで CloudFormation で作成した S3 バケット名を取得し、そのバケットに以下の CSV ファイルをアップロードしました。
※4都道府県の2000年から2004年の年間の平均気温と湿度のデータです。私が適当に作ったので全く正確ではないです。

year,city,temperature,humidity
2000,tokyo,23,51
2000,osaka,25,51
2000,nagoya,20,58
2000,fukuoka,21,79
2001,tokyo,25,48
2001,osaka,20,69
2001,nagoya,23,41
2001,fukuoka,25,54
2002,tokyo,20,63
2002,osaka,24,78
2002,nagoya,23,57
2002,fukuoka,22,71
2003,tokyo,20,42
2003,osaka,24,74
2003,nagoya,24,55
2003,fukuoka,25,57
2004,tokyo,20,73
2004,osaka,20,54
2004,nagoya,22,75
2004,fukuoka,25,79

列 city にローマ字の都道府県名が入っており、前に書いた MySQL のテーブル blog_athena_join_s3_mysql.master_dimension の列 key と一致します。
あとで Athena で city と key を紐づけて CSV と MySQL を JOIN します。

Glue のテーブルを作成

Athena で S3 のデータを取得するために、Glue のテーブルを作成します。
マネジメントコンソールで Athena > Query editor > Saved queries を開き、Workgroup で blog_athena_work_group を選択し、 blog_athena_saved_query_create の id をクリックします。
Saved queries に表示されているクエリは CDK のこの部分で作成しています。

すると以下のように Athena のエディターに CREATE 文が表示されるので、Run をクリックします。
うまくいくとエディターの左ペインにテーブル weather_data を追加されます。

Athena で S3 と MySQL を JOIN する

先ほどと同じようにマネジメントコンソールで Athena > Query editor > Saved queries を開き、Workgroup で blog_athena_work_group を選択し、 blog_athena_saved_query_select の id をクリックします。

すると、Athena のエディターが開き、以下のクエリが表示されます。

select year,
	m.name,
	temperature,
	humidity
from "blog_athena_join_s3_mysql_db"."weather_data" as w
	INNER JOIN "lambda:athena-mysql-connector"."blog_athena_join_s3_mysql"."master_dimension" as m ON w.city = m.key
order by year, m.name;

"blog_athena_join_s3_mysql_db"."weather_data" は S3 にアップロードした CSV を参照し、"lambda:athena-mysql-connector"."blog_athena_join_s3_mysql"."master_dimension" は MySQL にアクセスするための Federated Query の Lambda 関数を参照します。
標準的な SQL と同じく、INNER JOIN ~ ON で S3 と MySQL を JOIN すると、以下のように tokyo, osaka, nagoya, fukuoka が漢字になりました。

以上で Athena で S3 と MySQL を JOIN する構築が完了しました。

ポイント

今回の CDK で作成された以下のリソースのポイントをおさらいします。

ポイント① Athena は Federated Query の Lambda を使って MySQL にアクセスする

まずは Federated Query とは何か、またその特徴について簡単にまとめます。
Athena で S3 以外のデータソース(MySQL, DynamoDB など)にアクセスする場合は、Federated Query を使う必要があります。
Federated Query を使う場合、Athena とデータソースの間に Lambda が入ります。
Lambda の役割は Athena のクエリとデータソースへのクエリをうまく変換することです。
よく使われるデータソースにアクセスするための Lambda はだいたい AWS が提供してくれています。
今回は MySQL にアクセスしたいので、AthenaMySQLConnector を使いました。

さて、ここから大事なポイントに入ります。
AWS の Federated Query のドキュメント Amazon Athena 横串検索の使用に以下のような記述があります。

Secrets Manager – AWS Secrets Manager で Athena 横串検索機能を使用するには、Secrets Manager に Amazon VPC プライベートエンドポイントを設定する必要があります。詳細については、「AWS Secrets Manager ユーザーガイド」の「Secrets Manager VPC プライベートエンドポイントを作成する」を参照してください。

今回の構築でも Secrets Manager に MySQL の認証情報を入れているので、 Secrets Manager VPC プライベートエンドポイントを作成する必要があります。

プライベートエンドポイントを含む、Athena -> MySQL へのアクセスまでのイメージは以下のシーケンス図のような感じです。
Lambda → Secrets Manager の矢印の上の紫色の盾マークがプライベートエンドポイントを示しています。

VPC プライベートエンドポイントを作っているのは ソースコードのこの部分になります。

    vpc.addInterfaceEndpoint("SecretsManagerEndpoint", {
      service: ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
    });

ポイント② RDS の DB を初期化

これは構成図と関係ない部分ですが、書いておかないと分かりにくいポイントかと思います。

通常 CDK で RDS の DB を作成すると、DB の中身は空っぽです。
後で CREATE TABLEINSERT INTO でデータを入れてもいいのですが、手順が増えるので CDK でこれらを実行しています。

ソースコードはこの部分です。

仕組みを簡単に説明すると、 CDK でリソースを立ち上げる中で Lambda で SDK を実行することができるので、RDS DB を作成したあとに Lambda(JavaScript) で以下の SQL を実行してデータを初期化しています。

DROP TABLE IF EXISTS `blog_athena_join_s3_mysql`.`master_dimension`;

CREATE TABLE IF NOT EXISTS `blog_athena_join_s3_mysql`.`master_dimension` (
  `key` VARCHAR(256) NOT NULL,
  `name` VARCHAR(256) NOT NULL,
  PRIMARY KEY (`key`));

INSERT INTO `blog_athena_join_s3_mysql`.`master_dimension` (`key`, `name`) 
VALUES ('tokyo', '東京'), ('osaka', '大阪'), ('nagoya', '名古屋'), ('fukuoka', '福岡');

これで CDK の中で DB を初期化できます。

AWS の公式ブログ "Use AWS CDK to initialize Amazon RDS instances"[1] を参考にしています。

ポイント③ CDK で Athena の Federated Query の Lambda を作成する

CDK の L2,L3 Constructs は用意されていないので、L1 Construct を使う必要があります。

TypeScript でも型付けされていない部分多くややこしかったので、AWS 公式 Workshop "Amazon Athena Workshop"[2] の Hands on Labs > Getting Started > Self Paced Labs > CloudFormation Templateで紹介されていた CloudFormation のテンプレート Athena-Federation-Workshop.yaml (クリックすると自動で yaml のダウンロードが始まります)を参考にして設定しました。

CDK のソースコードはこちらになります。

    const mysqlConnectorLambdaName = "athena-mysql-connector";
    // MySQL connector used in Athena federated query
    const mysqlConnector = new serverless.CfnApplication(
      this,
      "BlogAthenaJoinS3MySQLConnector",
      {
        location: {
          applicationId:
            "arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaMySQLConnector",
          semanticVersion: "2024.15.2",
        },
        parameters: {
          LambdaFunctionName: mysqlConnectorLambdaName,
          DefaultConnectionString: `mysql://jdbc:mysql://${database.dbInstanceEndpointAddress}:${database.dbInstanceEndpointPort}/blog_athena_join_s3_mysql?\${${dbSecret.secretName}}`,
          SecretNamePrefix: "BlogAthenaJoinS3MySQL",
          SpillBucket: bucket.bucketName,
          SpillPrefix: "athena-mysql-connector-spill",
          LambdaTimeout: "30",
          LambdaMemory: "3008",
          DisableSpillEncryption: "false",
          SecurityGroupIds: mysqlConnectorSecurityGroup.securityGroupId,
          SubnetIds: vpc.isolatedSubnets
            .map((subnet) => subnet.subnetId)
            .join(","),
        },
      }
    );

リソースの削除

以下のコマンドで CDK で作成したリソースを削除します。

aws s3 rm "s3://${bucket_name}" --recursive
cdk destroy --force

次に、Cloud9 の環境と、Cloud9 のインスタンスにアタッチした IAM Role を削除すれば完了です。

最後に

本記事では Athena で S3 と MySQL を JOIN する構成を紹介しました。
コンセプトはシンプルですが、実際には VPC プライベートエンドポイントを作ったり、Secrets Manager を使ったりするので、これらに慣れていないと躓くかもしれません。
Athena を使ったデータ分析の参考になれば幸いです。

参考

[1] Use AWS CDK to initialize Amazon RDS instances. (2021, November 18). Amazon Web Services. https://aws.amazon.com/blogs/infrastructure-and-automation/use-aws-cdk-to-initialize-amazon-rds-instances/
[2] Amazon Athena Workshop. (n.d.). Retrieved April 22, 2024, from https://catalog.us-east-1.prod.workshops.aws/workshops/9981f1a1-abdc-49b5-8387-cb01d238bb78/en-US/20-howtostart/201-self-paced/2013-cloudformation