Amazon Athena for Apache SparkでS3バケット上のデータを可視化してみた
データアナリティクス事業本部の鈴木です。
Amazon Athena for Apache SparkのAthenaノートブックで、自分で用意したS3バケット上のデータを可視化してみたので、検証内容を共有します。
re:Invent2022にて発表されたAmazon Athenaの機能で、Jupyter Notebookと互換性があるAthenaノートブックをインターフェースに、Apache Sparkを使ってインタラクティブにデータの分析を行うことができるというものです。
検証で確認したかったポイント
今回は、Amazon Athena for Apache Sparkを使って以下のことをどうできるか確認してみました。
- Amazon Athena for Apache SparkからS3バケット上の自分のデータにアクセスするためのIAMポリシーの設定方法
- Athenaノートブックから可視化する方法
可視化する方法については、SeabornとPlotlyを試してみました。
Athenaノートブックの実行環境にプリインストールされているPythonライブラリについては、開発者ガイドのプリインストールされているPythonライブラリのリストから確認できますが、可視化のための代表的なライブラリであるmatplotlib
・seaborn
・plotly
がインストールされていることが分かります。
matplotlib
を単体で使う例としては、サンプルノートブックにも記載がありますし、発表時に執筆された以下のブログにも取り上げられているので、今回はそれを除いたseaborn
およびplotly
を試してみました。
データの準備
今回は下記リンクにて公開されている、UCI Machine Learning RepositoryのIris Data Setを使用しました。
- https://archive.ics.uci.edu/ml/datasets/iris
データをダウンロードし、iris.data
をS3バケットにアップロードしておきます。
レコードの数は150で、ヘッダーなしのCSVファイルになります。別途用意したAthenaテーブルで検索すると以下のようになっています。
※『AWS Glue Data Quality(プレビュー) をAWS Glueコンソールから試してみよう!』に掲載した結果です。
やってみる
1. IAMポリシーの設定方法
今回は以下のIAMロール用のCloudFormationテンプレートを作成してみました。
これは、Apache Sparkエンジンを利用するためワークグループを作成する際に、サンプルノートブックを実行するための許可が付与されたサービスロールを新規作成できますが、それをもとに自分のデータやGlueデータカタログにアクセスするための権限を定義したものになります。
AWSTemplateFormatVersion: "2010-09-09" Description: Creating AWS Athena Spark Execution Role Parameters: AthenaResultBucketName: Description: Backet Name for Athena Result Bucket. Type: String DataBucketName: Description: Backet Name Which Contains Data. Type: String AthenaWorkGroupName: Description: Athena WorkGroup Name. Type: String ReadWriteTargetGlueDatabaseName: Description: Glue Database Name For Read. Type: String Resources: AWSAthenaSparkExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: athena.amazonaws.com Action: "sts:AssumeRole" Condition: StringEquals: "aws:SourceAccount": !Sub ${AWS::AccountId} ArnLike: "aws:SourceArn": !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}" Path: "/" Policies: - PolicyName: AWSAthenaSparkExecutionPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: [ "s3:PutObject", "s3:ListBucket", "s3:DeleteObject", "s3:GetObject" ] Resource: [ !Sub "arn:aws:s3:::${AthenaResultBucketName}", !Sub "arn:aws:s3:::${AthenaResultBucketName}/*" ] - Effect: "Allow" Action: [ "athena:GetWorkGroup", "athena:TerminateSession", "athena:GetSession", "athena:GetSessionStatus", "athena:ListSessions", "athena:StartCalculationExecution", "athena:GetCalculationExecutionCode", "athena:StopCalculationExecution", "athena:ListCalculationExecutions", "athena:GetCalculationExecution", "athena:GetCalculationExecutionStatus", "athena:ListExecutors", "athena:ExportNotebook", "athena:UpdateNotebook" ] Resource: [ !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroupName}" ] - Effect: "Allow" Action: [ "logs:CreateLogStream", "logs:DescribeLogStreams", "logs:CreateLogGroup", "logs:PutLogEvents" ] Resource: [ !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena:*", !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-athena*:log-stream:*" ] - Effect: "Allow" Action: [ "logs:DescribeLogGroups" ] Resource: [ !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" ] - Effect: "Allow" Action: [ "cloudwatch:PutMetricData" ] Resource: [ !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" ] Condition: StringEquals: "cloudwatch:namespace": "AmazonAthenaForApacheSpark" - PolicyName: AWSAthenaSparkExecutionPolicyForData PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Sid: "AccessS3Buckets" Action: [ "s3:GetObject" ] Resource: [ !Sub "arn:aws:s3:::${DataBucketName}/*" ] - Effect: "Allow" Sid: "GlueReadDatabases" Action: [ "glue:GetDatabases" ] Resource: [ !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:*" ] - Effect: "Allow" Sid: "GlueReadDatabase" Action: [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:GetPartition", "glue:GetPartitions" ] Resource: [ !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default" ] - Effect: "Allow" Sid: "GlueCreateDatabase" Action: [ "glue:CreateDatabase" ] Resource: [ !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default" ] - Effect: "Allow" Sid: "GlueDeleteDatabase" Action: [ "glue:DeleteDatabase" ] Resource: [ !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/default" ] - Effect: "Allow" Sid: "GlueCreateDeleteTable" Action: [ "glue:CreateTable", "glue:DeleteTable" ] Resource: [ !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${ReadWriteTargetGlueDatabaseName}", !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${ReadWriteTargetGlueDatabaseName}/*", ]
このテンプレートでは以下のことができます。
No | パラメータ | 設定するもの | 備考 |
---|---|---|---|
1 | AthenaResultBucketName | ワークグループで設定する計算結果の場所に記載するS3バケット名 | |
2 | DataBucketName | Apache Sparkから利用したいデータのあるS3バケット名 | |
3 | AthenaWorkGroupName | IAMロールを利用するワークグループ名 | |
4 | ReadWriteTargetGlueDatabaseName | Apache Sparkからテーブルを操作したいGlueデータベース名 | できる操作の内容については定義を確認ください。 |
Amazon Athena for Apache Sparkで利用するため、CloudFormationからIAMロールを作成し、Athenaのワークグループから選択しておきます
2. Athenaノートブックから可視化する方法
ノートブックの作成
まずノートブックを新規作成しました。ノートブックエクスプローラー
から、ノートブックを作成
をクリックします。
ノートブック名を入力し、必要があればオプションのセッションパラメータを設定します。作成
を押すとノートブックが作成されます。
これにより、ノートブックエディタでノートブックが開き、セッションが開始されます。
ライブラリの読み込み
まず、必要なライブラリを読み込みます。今回はSeabornとPlotlyによる可視化をしてみようと思うので、必要なものを読み込みました。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType import matplotlib.pyplot as plt import numpy as np import pandas as pd import plotly.express as px import seaborn as sns
データの読み込み
ノートブックからS3のデータを読み込みます。
今回はヘッダーなしのCSVファイルを読み込みたいので、以下のように読み込みました。
csv_schema = StructType([ StructField("sepal_length", FloatType()), StructField("sepal_width", FloatType()), StructField("petal_length", FloatType()), StructField("petal_width", FloatType()), StructField("class", StringType()) ]) file_name = "s3://データを配置したバケット名/iris-data/iris.data" iris_df = (spark.read.format("csv") .option("header", "false") .schema(csv_schema) .load(file_name)) print("Read csv file" + " complete")
読み込んだデータとスキーマは以下のように確認できます。
iris_df.show() iris_df.printSchema()
データの集計
読み込んだデータを格納したDataFrameに対して、Spark SQLで集計してみます。
iris_df.createOrReplaceTempView("iris") sqlDF = spark.sql("SELECT AVG(sepal_length) AS sepal_length_avg, class from iris group by class") sqlDF.show()
ちなみにマジックコマンドでも実行が可能です。以下のように150レコード分ちゃんと読み込めていることが分かります。
%%sql SELECT count(*) AS cnt from iris
PandasのDataFrameへの変換
to_pandas
でPandasのDataFrameへ変換することができます。
これでより使い慣れた方の多いPandasのDataFrameで処理ができるようになります。
df = sqlDF.toPandas()
ちなみに、to_pandas
実行時に一つのノードにデータが集約されることが想定されるため、集計したデータフレームに対してto_pandas
を実行するとよいでしょう。
Seabornでの可視化
まずSeabornでの可視化です。
Seabornでの可視化方法は、Using magic commandsのUsing the matplotlib and seaborn libraries togetherのセクションに記載がありました。
Seabornはライブラリの使い方通りに使用すればよいですが、図を描画する際には以下のように%matplot
マジックコマンドを実行する必要があります。matplotlibとseabornを一緒に使うときの例は以下です。
sns.barplot(data=df, x="class", y="sepal_length_avg") %matplot plt
棒グラフが描画されました。ほかのグラフについてもSeabornのドキュメントを参考に表示していくことができます。
Plotlyでの可視化
次にPlotlyでの可視化です。
Plotlyでの可視化方法は、Using magic commandsの%plotlyのセクションに記載がありました。
Plotlyの方は、図の描画に%plotly
マジックコマンドを実行する必要があります。
以下のように実行して、棒グラフを描画することができました。
ほかのグラフについてもPlotlyのドキュメントを参考に表示していくことができます。
最後に
今回は、Amazon Athena for Apache SparkでS3バケット上の自分のデータを読み込み、SeabornおよびPlotlyで可視化を試してみました。
グラフの表示のマジックコマンドについては、Athenaノートブック上で描画するためにドキュメントを読む必要がありますが、一度分かってしまえばJupyter Notebookと同じようにグラフを描き、データの傾向を探っていくことができます。
ぜひAthenaノートブックを使って、データをさまざまな角度から分析してみてください。