Amazon Athena for Apache SparkでApache Icebergフォーマットのテーブルを作成してみた

Amazon Athena for Apache Sparkで新しく対応したnon-Hiveテーブルフォーマットのうち、Apache Icebergのサンプルをノートブックから実行しました。
2023.07.07

データアナリティクス事業本部の鈴木です。

今日はクラスメソッドの創業記念日ですね。私はAmazon Athena for Apache Sparkの新機能を試してみたのでご紹介します。

先日、Amazon Athena for Apache SparkでもApache Iceberg・Apache Hudi・Delta Lakeの対応がアナウンスされました。

気になっていたのですが触れていなかったので、ガイド通りの操作ですが、どんなかんじなのか試してみました。

本記事の内容

以下の、Amazon Athena for Apache SparkのApache Icebergフォーマットに関するガイドに操作が記載されているので、実際にノートブックを作って試してみました。

やってみた

1. IAMロールの作成

ワークグループで指定するIAMロールを作成しました。Sparkアプリケーションはこのロールを使って処理を実行します。以下のテンプレートをCloudFormationからデプロイしました。

今回は、AthenaResultBucketNameで指定するS3バケットにテーブルの中身を書き出すようにしました。

AthenaWorkGroupNameはIAMロールを使うワークグループです。

ReadWriteTargetGlueDatabaseNameは操作の中で新しく作成するGlueデータベース名です。

DataBucketNameはガイド中では元データの読み出しはしないので今回は不要ですが、別のバケットから取得したデータを元にテーブルを作りたいときのため残しておきました。

AWSTemplateFormatVersion: "2010-09-09"
Description: Creating AWS Athena Spark Execution Role

Parameters:
  AthenaResultBucketName:
    Description: Backet Name for Athena Result Bucket.
    Type: String
  AthenaWorkGroupName:
    Description: Athena WorkGroup Name.
    Type: String
  ReadWriteTargetGlueDatabaseName:
    Description: Glue Database Name For Read.
    Type: String
  DataBucketName:
    Description: Backet Name Which Contains Data.
    Type: String


Resources:
  AWSAthenaSparkExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: athena.amazonaws.com
            Action: "sts:AssumeRole"
            Condition:
              StringEquals:
                "aws:SourceAccount": !Sub ${AWS::AccountId}
              ArnLike:
                "aws:SourceArn": !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}"

      Path: "/"
      Policies:
        - PolicyName: AWSAthenaSparkExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: [
                  "s3:PutObject",
                  "s3:ListBucket",
                  "s3:DeleteObject",
                  "s3:GetObject"
                ]
                Resource: [
                  !Sub "arn:aws:s3:::${AthenaResultBucketName}",
                  !Sub "arn:aws:s3:::${AthenaResultBucketName}/*"
                ]
              - Effect: "Allow"
                Action: [
                  "athena:GetWorkGroup",
                  "athena:TerminateSession",
                  "athena:GetSession",
                  "athena:GetSessionStatus",
                  "athena:ListSessions",
                  "athena:StartCalculationExecution",
                  "athena:GetCalculationExecutionCode",
                  "athena:StopCalculationExecution",
                  "athena:ListCalculationExecutions",
                  "athena:GetCalculationExecution",
                  "athena:GetCalculationExecutionStatus",
                  "athena:ListExecutors",
                  "athena:ExportNotebook",
                  "athena:UpdateNotebook"
                ]
                Resource: [
                  !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}"
                ]
              - Effect: "Allow"
                Action: [
                  "logs:CreateLogStream",
                  "logs:DescribeLogStreams",
                  "logs:CreateLogGroup",
                  "logs:PutLogEvents"
                ]
                Resource: [
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena:*",
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena*:log-stream:*" 
                ]
              - Effect: "Allow"
                Action: [
                  "logs:DescribeLogGroups"
                ]
                Resource: [
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*"
                ]
              - Effect: "Allow"
                Action: [
                  "cloudwatch:PutMetricData"
                ]
                Resource: [
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*"
                ]
                Condition:
                  StringEquals:
                    "cloudwatch:namespace": "AmazonAthenaForApacheSpark"
        - PolicyName: AWSAthenaSparkExecutionPolicyForData
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Sid: "AccessS3Buckets"
                Action: [
                  "s3:GetObject"
                ]
                Resource: [
                  !Sub "arn:aws:s3:::${DataBucketName}/*"
                ]
              - Effect: "Allow"
                Sid: "GlueReadDatabases"
                Action: [
                  "glue:GetDatabases"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:*"
                ]
              - Effect: "Allow"
                Sid: "GlueReadDatabase"
                Action: [
                  "glue:GetDatabase",
                  "glue:GetTable",
                  "glue:GetTables",
                  "glue:GetPartition",
                  "glue:GetPartitions"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
                ]
              - Effect: "Allow"
                Sid: "GlueCreateDatabase"
                Action: [
                  "glue:CreateDatabase"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
                ]
              - Effect: "Allow"
                Sid: "GlueDeleteDatabase"
                Action: [
                  "glue:DeleteDatabase"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
                ]
              - Effect: "Allow"
                Sid: "GlueCreateDeleteTable"
                Action: [
                  "glue:CreateTable",
                  "glue:DeleteTable",
                  "glue:UpdateTable"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
                ]

このIAMロールに類似したものは過去のブログでもご紹介しましたが、今回はテーブルにデータをINSERTしているので、glue:UpdateTableの許可を追加していることに注意してください。(ハイライト箇所)

2. ワークグループの作成

Athenaのコンソールより、ワークグループを作成しました。

  • 分析エンジンは Apache Sparkとしました。
  • IAMロールの設定で、先に作成したロールを指定しました。
  • 計算結果の設定で、AthenaResultBucketNameで指定したバケットを指定しました。

ワークグループの作成

3. ノートブックの作成と設定

ノートブックエディタから新しいノートブックを作成しました。

新しいノートブックの作成

Sessionメニューより、 Edit sessionを開きました。

Sessionメニュー

セッションの詳細の編集から、Apache Sparkのプロパティで追加のテーブル形式を選べるので、ここでApache Icebergを選びました。選択すると以下のように必要とされるプロパティが反映されたので保存しました。

Icebergの選択

4. テーブルの作成とデータ格納

ガイドに従って、PySparkを実行していきました。

まず、変数設定です。テーブルのデータ保存先バケット名は、検証ではIAMロールのAthenaResultBucketNameで指定したバケットにしました。自身の環境にあったものとしてください。

DB_NAME = "cm_nayuts_spark_iceberg"
TABLE_NAME = "iceberg_table"
TABLE_S3_LOCATION = "s3://テーブルのデータ保存先バケット名/iceberg/"

データベースを作成しました。

spark.sql("CREATE DATABASE {} LOCATION '{}'".format(DB_NAME, TABLE_S3_LOCATION))

Glueデータカタログをみると、データベースが作成できていました。TABLE_S3_LOCATIONLocation URIに対応しました。

作成されたデータベース

Apache Icebergフォーマットのテーブルを作成しました。

spark.sql("""
CREATE TABLE {}.{} (
language string,
users_count int
) USING ICEBERG
""".format(DB_NAME, TABLE_NAME))

iceberg_tableを確認すると、Table propertiestable_typeICEBERGになっていました。

テーブルプロパティ

レコードを挿入してSELECTすると、中身が確認できました。

spark.sql("""INSERT INTO {}.{} VALUES ('Golang', 3000)""".format(DB_NAME, TABLE_NAME))

spark.sql("SELECT * FROM {}.{}".format(DB_NAME, TABLE_NAME)).show()

検索結果1

Athena(エンジンバージョン3)からの検索も可能でした。

Athenaからの検索結果

逆に、Athena(エンジンバージョン3)からテーブルにデータを挿入して、Spark側で読めるか確認しました。

Athenaからのデータの挿入

以下のように確認できました。

検索結果2

補足

Athena側でもnon-Hiveテーブルフォーマットとして、Apache Iceberg・Apache Hudi・Delta Lakeが利用できることが、下記のユーザーガイドに記載されています。

Apache Icebergは記事執筆時点では互換性があるようでしたが、そのときのバージョンの都合もあると思うので、ご興味のある方は使用時に一度動作確認をすると良いと思います。

また、テーブルフォーマットの使い分けについては、AWS サービス別資料にて、大変参考になる以下のPDF資料が公開されていましたので合わせてご確認下さい。

最後に

今回は、Amazon Athena for Apache Sparkで利用できるnon-Hiveテーブルフォーマットの例として、Apache Icebergフォーマットを使う際の例をご紹介しました。

試した分には、Athenaエンジンバージョン3なら、Athena SQL側からも相互にテーブルがみられたので使いやすそうですね。

参考になりましたら幸いです。