Amazon Athena for Apache SparkでS3バケット上のデータを可視化してみた

2023.03.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

Amazon Athena for Apache SparkのAthenaノートブックで、自分で用意したS3バケット上のデータを可視化してみたので、検証内容を共有します。

re:Invent2022にて発表されたAmazon Athenaの機能で、Jupyter Notebookと互換性があるAthenaノートブックをインターフェースに、Apache Sparkを使ってインタラクティブにデータの分析を行うことができるというものです。

検証で確認したかったポイント

今回は、Amazon Athena for Apache Sparkを使って以下のことをどうできるか確認してみました。

  1. Amazon Athena for Apache SparkからS3バケット上の自分のデータにアクセスするためのIAMポリシーの設定方法
  2. Athenaノートブックから可視化する方法

可視化する方法については、SeabornとPlotlyを試してみました。

Athenaノートブックの実行環境にプリインストールされているPythonライブラリについては、開発者ガイドのプリインストールされているPythonライブラリのリストから確認できますが、可視化のための代表的なライブラリであるmatplotlibseabornplotlyがインストールされていることが分かります。

matplotlibを単体で使う例としては、サンプルノートブックにも記載がありますし、発表時に執筆された以下のブログにも取り上げられているので、今回はそれを除いたseabornおよびplotlyを試してみました。

データの準備

今回は下記リンクにて公開されている、UCI Machine Learning RepositoryのIris Data Setを使用しました。

  • https://archive.ics.uci.edu/ml/datasets/iris

データをダウンロードし、iris.dataをS3バケットにアップロードしておきます。

レコードの数は150で、ヘッダーなしのCSVファイルになります。別途用意したAthenaテーブルで検索すると以下のようになっています。

Athenaでの検索結果

※『AWS Glue Data Quality(プレビュー) をAWS Glueコンソールから試してみよう!』に掲載した結果です。

やってみる

1. IAMポリシーの設定方法

今回は以下のIAMロール用のCloudFormationテンプレートを作成してみました。

これは、Apache Sparkエンジンを利用するためワークグループを作成する際に、サンプルノートブックを実行するための許可が付与されたサービスロールを新規作成できますが、それをもとに自分のデータやGlueデータカタログにアクセスするための権限を定義したものになります。

AWSAthenaSparkExecutionRole.yml

AWSTemplateFormatVersion: "2010-09-09"
Description: Creating AWS Athena Spark Execution Role

Parameters:
  AthenaResultBucketName:
    Description: Backet Name for Athena Result Bucket.
    Type: String
  DataBucketName:
    Description: Backet Name Which Contains Data.
    Type: String
  AthenaWorkGroupName:
    Description: Athena WorkGroup Name.
    Type: String
  ReadWriteTargetGlueDatabaseName:
    Description: Glue Database Name For Read.
    Type: String


Resources:
  AWSAthenaSparkExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: athena.amazonaws.com
            Action: "sts:AssumeRole"
            Condition:
              StringEquals:
                "aws:SourceAccount": !Sub ${AWS::AccountId}
              ArnLike:
                "aws:SourceArn": !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}"

      Path: "/"
      Policies:
        - PolicyName: AWSAthenaSparkExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: [
                  "s3:PutObject",
                  "s3:ListBucket",
                  "s3:DeleteObject",
                  "s3:GetObject"
                ]
                Resource: [
                  !Sub "arn:aws:s3:::${AthenaResultBucketName}",
                  !Sub "arn:aws:s3:::${AthenaResultBucketName}/*"
                ]
              - Effect: "Allow"
                Action: [
                  "athena:GetWorkGroup",
                  "athena:TerminateSession",
                  "athena:GetSession",
                  "athena:GetSessionStatus",
                  "athena:ListSessions",
                  "athena:StartCalculationExecution",
                  "athena:GetCalculationExecutionCode",
                  "athena:StopCalculationExecution",
                  "athena:ListCalculationExecutions",
                  "athena:GetCalculationExecution",
                  "athena:GetCalculationExecutionStatus",
                  "athena:ListExecutors",
                  "athena:ExportNotebook",
                  "athena:UpdateNotebook"
                ]
                Resource: [
                  !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}"
                ]
              - Effect: "Allow"
                Action: [
                  "logs:CreateLogStream",
                  "logs:DescribeLogStreams",
                  "logs:CreateLogGroup",
                  "logs:PutLogEvents"
                ]
                Resource: [
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena:*",
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena*:log-stream:*" 
                ]
              - Effect: "Allow"
                Action: [
                  "logs:DescribeLogGroups"
                ]
                Resource: [
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*"
                ]
              - Effect: "Allow"
                Action: [
                  "cloudwatch:PutMetricData"
                ]
                Resource: [
                  !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*"
                ]
                Condition:
                  StringEquals:
                    "cloudwatch:namespace": "AmazonAthenaForApacheSpark"
        - PolicyName: AWSAthenaSparkExecutionPolicyForData
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Sid: "AccessS3Buckets"
                Action: [
                  "s3:GetObject"
                ]
                Resource: [
                  !Sub "arn:aws:s3:::${DataBucketName}/*"
                ]
              - Effect: "Allow"
                Sid: "GlueReadDatabases"
                Action: [
                  "glue:GetDatabases"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:*"
                ]
              - Effect: "Allow"
                Sid: "GlueReadDatabase"
                Action: [
                  "glue:GetDatabase",
                  "glue:GetTable",
                  "glue:GetTables",
                  "glue:GetPartition",
                  "glue:GetPartitions"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
                ]
              - Effect: "Allow"
                Sid: "GlueCreateDatabase"
                Action: [
                  "glue:CreateDatabase"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
                ]
              - Effect: "Allow"
                Sid: "GlueDeleteDatabase"
                Action: [
                  "glue:DeleteDatabase"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default"
                ]
              - Effect: "Allow"
                Sid: "GlueCreateDeleteTable"
                Action: [
                  "glue:CreateTable",
                  "glue:DeleteTable"
                ]
                Resource: [
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}",
                  !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*",
                ]

このテンプレートでは以下のことができます。

No パラメータ 設定するもの 備考
1 AthenaResultBucketName ワークグループで設定する計算結果の場所に記載するS3バケット名
2 DataBucketName Apache Sparkから利用したいデータのあるS3バケット名
3 AthenaWorkGroupName IAMロールを利用するワークグループ名
4 ReadWriteTargetGlueDatabaseName Apache Sparkからテーブルを操作したいGlueデータベース名 できる操作の内容については定義を確認ください。

Amazon Athena for Apache Sparkで利用するため、CloudFormationからIAMロールを作成し、Athenaのワークグループから選択しておきます

2. Athenaノートブックから可視化する方法

ノートブックの作成

まずノートブックを新規作成しました。ノートブックエクスプローラーから、ノートブックを作成をクリックします。

ノートブックの作成

ノートブック名を入力し、必要があればオプションのセッションパラメータを設定します。作成を押すとノートブックが作成されます。

ノートブックを作成画面

これにより、ノートブックエディタでノートブックが開き、セッションが開始されます。

ライブラリの読み込み

まず、必要なライブラリを読み込みます。今回はSeabornとPlotlyによる可視化をしてみようと思うので、必要なものを読み込みました。

ライブラリのインポート

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
import seaborn as sns

データの読み込み

ノートブックからS3のデータを読み込みます。

今回はヘッダーなしのCSVファイルを読み込みたいので、以下のように読み込みました。

csv_schema = StructType([
    StructField("sepal_length", FloatType()),
    StructField("sepal_width", FloatType()),
    StructField("petal_length", FloatType()),
    StructField("petal_width", FloatType()),
    StructField("class", StringType())
])

file_name = "s3://データを配置したバケット名/iris-data/iris.data"

iris_df = (spark.read.format("csv")
     .option("header", "false")
     .schema(csv_schema)
     .load(file_name))

print("Read csv file" + " complete")

データの読み込み

読み込んだデータとスキーマは以下のように確認できます。

iris_df.show()

iris_df.printSchema()

読み込んだデータの確認

データの集計

読み込んだデータを格納したDataFrameに対して、Spark SQLで集計してみます。

iris_df.createOrReplaceTempView("iris")
sqlDF = spark.sql("SELECT AVG(sepal_length) AS sepal_length_avg, class from iris group by class")

sqlDF.show()

データの集計

ちなみにマジックコマンドでも実行が可能です。以下のように150レコード分ちゃんと読み込めていることが分かります。

%%sql
SELECT count(*) AS cnt from iris

レコード数の確認

PandasのDataFrameへの変換

to_pandasでPandasのDataFrameへ変換することができます。

これでより使い慣れた方の多いPandasのDataFrameで処理ができるようになります。

df = sqlDF.toPandas()

PandasのDataFrameへの変換

ちなみに、to_pandas実行時に一つのノードにデータが集約されることが想定されるため、集計したデータフレームに対してto_pandasを実行するとよいでしょう。

Seabornでの可視化

まずSeabornでの可視化です。

Seabornでの可視化方法は、Using magic commandsUsing the matplotlib and seaborn libraries togetherのセクションに記載がありました。

Seabornはライブラリの使い方通りに使用すればよいですが、図を描画する際には以下のように%matplotマジックコマンドを実行する必要があります。matplotlibとseabornを一緒に使うときの例は以下です。

sns.barplot(data=df, x="class", y="sepal_length_avg")
%matplot plt

Seabornでの可視化

棒グラフが描画されました。ほかのグラフについてもSeabornのドキュメントを参考に表示していくことができます。

Plotlyでの可視化

次にPlotlyでの可視化です。

Plotlyでの可視化方法は、Using magic commands%plotlyのセクションに記載がありました。

Plotlyの方は、図の描画に%plotlyマジックコマンドを実行する必要があります。

以下のように実行して、棒グラフを描画することができました。

Plotlyでの可視化

ほかのグラフについてもPlotlyのドキュメントを参考に表示していくことができます。

最後に

今回は、Amazon Athena for Apache SparkでS3バケット上の自分のデータを読み込み、SeabornおよびPlotlyで可視化を試してみました。

グラフの表示のマジックコマンドについては、Athenaノートブック上で描画するためにドキュメントを読む必要がありますが、一度分かってしまえばJupyter Notebookと同じようにグラフを描き、データの傾向を探っていくことができます。

ぜひAthenaノートブックを使って、データをさまざまな角度から分析してみてください。