データアナリティクス事業本部の鈴木です。
今日はクラスメソッドの創業記念日ですね。私はAmazon Athena for Apache Sparkの新機能を試してみたのでご紹介します。
先日、Amazon Athena for Apache SparkでもApache Iceberg・Apache Hudi・Delta Lakeの対応がアナウンスされました。
気になっていたのですが触れていなかったので、ガイド通りの操作ですが、どんなかんじなのか試してみました。
本記事の内容
以下の、Amazon Athena for Apache SparkのApache Icebergフォーマットに関するガイドに操作が記載されているので、実際にノートブックを作って試してみました。
やってみた
1. IAMロールの作成
ワークグループで指定するIAMロールを作成しました。Sparkアプリケーションはこのロールを使って処理を実行します。以下のテンプレートをCloudFormationからデプロイしました。
今回は、AthenaResultBucketName
で指定するS3バケットにテーブルの中身を書き出すようにしました。
AthenaWorkGroupName
はIAMロールを使うワークグループです。
ReadWriteTargetGlueDatabaseName
は操作の中で新しく作成するGlueデータベース名です。
DataBucketName
はガイド中では元データの読み出しはしないので今回は不要ですが、別のバケットから取得したデータを元にテーブルを作りたいときのため残しておきました。
AWSTemplateFormatVersion: "2010-09-09"
Description: Creating AWS Athena Spark Execution Role
Parameters:
AthenaResultBucketName:
Description: Backet Name for Athena Result Bucket.
Type: String
AthenaWorkGroupName:
Description: Athena WorkGroup Name.
Type: String
ReadWriteTargetGlueDatabaseName:
Description: Glue Database Name For Read.
Type: String
DataBucketName:
Description: Backet Name Which Contains Data.
Type: String
Resources:
AWSAthenaSparkExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: athena.amazonaws.com
Action: "sts:AssumeRole"
Condition:
StringEquals:
"aws:SourceAccount": !Sub ${AWS::AccountId}
ArnLike:
"aws:SourceArn": !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}"
Path: "/"
Policies:
- PolicyName: AWSAthenaSparkExecutionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: [
"s3:PutObject",
"s3:ListBucket",
"s3:DeleteObject",
"s3:GetObject"
]
Resource: [
!Sub "arn:aws:s3:::${AthenaResultBucketName}",
!Sub "arn:aws:s3:::${AthenaResultBucketName}/*"
]
- Effect: "Allow"
Action: [
"athena:GetWorkGroup",
"athena:TerminateSession",
"athena:GetSession",
"athena:GetSessionStatus",
"athena:ListSessions",
"athena:StartCalculationExecution",
"athena:GetCalculationExecutionCode",
"athena:StopCalculationExecution",
"athena:ListCalculationExecutions",
"athena:GetCalculationExecution",
"athena:GetCalculationExecutionStatus",
"athena:ListExecutors",
"athena:ExportNotebook",
"athena:UpdateNotebook"
]
Resource: [
!Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}"
]
- Effect: "Allow"
Action: [
"logs:CreateLogStream",
"logs:DescribeLogStreams",
"logs:CreateLogGroup",
"logs:PutLogEvents"
]
Resource: [
!Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena:*",
!Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena*:log-stream:*"
]
- Effect: "Allow"
Action: [
"logs:DescribeLogGroups"
]
Resource: [
!Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*"
]
- Effect: "Allow"
Action: [
"cloudwatch:PutMetricData"
]
Resource: [
!Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*"
]
Condition:
StringEquals:
"cloudwatch:namespace": "AmazonAthenaForApacheSpark"
- PolicyName: AWSAthenaSparkExecutionPolicyForData
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Sid: "AccessS3Buckets"
Action: [
"s3:GetObject"
]
Resource: [
!Sub "arn:aws:s3:::${DataBucketName}/*"
]
- Effect: "Allow"
Sid: "GlueReadDatabases"
Action: [
"glue:GetDatabases"
]
Resource: [
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:*"
]
- Effect: "Allow"
Sid: "GlueReadDatabase"
Action: [
"glue:GetDatabase",
"glue:GetTable",
"glue:GetTables",
"glue:GetPartition",
"glue:GetPartitions"
]
Resource: [
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
]
- Effect: "Allow"
Sid: "GlueCreateDatabase"
Action: [
"glue:CreateDatabase"
]
Resource: [
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
]
- Effect: "Allow"
Sid: "GlueDeleteDatabase"
Action: [
"glue:DeleteDatabase"
]
Resource: [
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
]
- Effect: "Allow"
Sid: "GlueCreateDeleteTable"
Action: [
"glue:CreateTable",
"glue:DeleteTable",
"glue:UpdateTable"
]
Resource: [
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
!Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
]
このIAMロールに類似したものは過去のブログでもご紹介しましたが、今回はテーブルにデータをINSERTしているので、glue:UpdateTable
の許可を追加していることに注意してください。(ハイライト箇所)
2. ワークグループの作成
Athenaのコンソールより、ワークグループを作成しました。
- 分析エンジンは Apache Sparkとしました。
- IAMロールの設定で、先に作成したロールを指定しました。
- 計算結果の設定で、
AthenaResultBucketName
で指定したバケットを指定しました。
3. ノートブックの作成と設定
ノートブックエディタから新しいノートブックを作成しました。
Session
メニューより、 Edit session
を開きました。
セッションの詳細の編集
から、Apache Sparkのプロパティ
で追加のテーブル形式を選べるので、ここでApache Iceberg
を選びました。選択すると以下のように必要とされるプロパティが反映されたので保存しました。
4. テーブルの作成とデータ格納
ガイドに従って、PySparkを実行していきました。
まず、変数設定です。テーブルのデータ保存先バケット名
は、検証ではIAMロールのAthenaResultBucketName
で指定したバケットにしました。自身の環境にあったものとしてください。
DB_NAME = "cm_nayuts_spark_iceberg"
TABLE_NAME = "iceberg_table"
TABLE_S3_LOCATION = "s3://テーブルのデータ保存先バケット名/iceberg/"
データベースを作成しました。
spark.sql("CREATE DATABASE {} LOCATION '{}'".format(DB_NAME, TABLE_S3_LOCATION))
Glueデータカタログをみると、データベースが作成できていました。TABLE_S3_LOCATION
はLocation URI
に対応しました。
Apache Icebergフォーマットのテーブルを作成しました。
spark.sql("""
CREATE TABLE {}.{} (
language string,
users_count int
) USING ICEBERG
""".format(DB_NAME, TABLE_NAME))
iceberg_table
を確認すると、Table properties
でtable_type
がICEBERG
になっていました。
レコードを挿入してSELECTすると、中身が確認できました。
spark.sql("""INSERT INTO {}.{} VALUES ('Golang', 3000)""".format(DB_NAME, TABLE_NAME))
spark.sql("SELECT * FROM {}.{}".format(DB_NAME, TABLE_NAME)).show()
Athena(エンジンバージョン3)からの検索も可能でした。
逆に、Athena(エンジンバージョン3)からテーブルにデータを挿入して、Spark側で読めるか確認しました。
以下のように確認できました。
補足
Athena側でもnon-Hiveテーブルフォーマットとして、Apache Iceberg・Apache Hudi・Delta Lakeが利用できることが、下記のユーザーガイドに記載されています。
Apache Icebergは記事執筆時点では互換性があるようでしたが、そのときのバージョンの都合もあると思うので、ご興味のある方は使用時に一度動作確認をすると良いと思います。
また、テーブルフォーマットの使い分けについては、AWS サービス別資料にて、大変参考になる以下のPDF資料が公開されていましたので合わせてご確認下さい。
最後に
今回は、Amazon Athena for Apache Sparkで利用できるnon-Hiveテーブルフォーマットの例として、Apache Icebergフォーマットを使う際の例をご紹介しました。
試した分には、Athenaエンジンバージョン3なら、Athena SQL側からも相互にテーブルがみられたので使いやすそうですね。
参考になりましたら幸いです。