SageMakerの前処理をGlue開発エンドポイントで実行させる:Amazon SageMaker Advent Calendar 2018

概要

こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の9日目の記事となります。

今回はSageMakerの前処理を「AWS Glueの開発エンドポイント」で実行させてみます。
下記を参考に進めていきます。
Managing Notebooks
Working with Notebooks on the AWS Glue Console
AWS Glue コンソールでの開発エンドポイントの操作

以前に、「SageMaker-EMR-Glue-S3」と接続させて前処理をする構成についてのエントリーをご紹介したのですが、それと少し似ています。今回はEMRを使わずに「Glue開発エンドポイントのサーバーレスApache Spark環境」を利用することとなります。

目次

1.最初に

SageMakerを利用していて「データの前処理をどこで実行するか」、という点について悩んだことはないでしょうか?例えば、大きな組織でSageMakerを利用していて「ユーザーごとにノートブックインスタンスを用意」していた場合で考えてみましょう。
「前処理のためにマシーンパワーが必要だから、全員分のインスタンスをスケールアップする」というのはコスト的に厳しい一方、とはいえ前処理に単純に時間がかかるとそれはそれで業務が滞る、といったことになるかと思います。

今回ご紹介する構成では、SageMakerノートブックインスタンスからGlue開発エンドポイントにアクセスすることで、上記のように「処理の重い前処理をGlue開発エンドポイント」にて処理させることができます。
(でかいデータをSageMakerノートブックインスタンスに持ってきて処理、とかしていた部分が楽になるのでどちらかと言うと「データ探索の手間が省ける」といったメリットの方がでかいかもしれません)

詳細については下記をご参照ください
AWS Glue が Amazon SageMaker ノートブックの開発エンドポイントへの接続サポートを開始

2.Glueの準備

まずは「Glue側」の準備をします。
下記の5点を対応する必要があります。

  • IAMロールの設定(Glue開発エンドポイントからS3、SageMakerインスタンス、Cloudwatch等にアクセスできるように)
  • データベースの作成(Glueデータカタログのデータベース)
  • クローラの作成(Glueデータカタログの更新)
  • Glue開発エンドポイントの作成(今回メインで稼働するインスタンス)
  • SageMakerノートブックインスタンスの作成(Glue開発エンドポイントと接続できるようにしたインスタンス)

2-1.IAMロールの設定

利用するS3バケットにアクセスできるように設定しましょう。
今回は「AmazonS3FullAccess」、「AWSGlueServiceRole」の2つのポリシーをアタッチして先に進みます。

・今回用意したIAMロール

2-2.データベースの作成

Glueデータカタログの「データベース」を作成します。
後ほどSageMakerノートブックインスタンスからアクセスする際は、このデータベースを指定することになります。

作成手順については、まずコンソール画面上から「Glue」の画面に移動し、「データベース」、「データベースの追加」をクリックします。

今回は「legislators」と指定しました。

2-3.クローラの作成

続いて、クローラの作成です。クローラの役割はソースデータが更新された場合に、「Glueデータベース」を更新することです。
「クローラ」から「クローラの追加」をクリックします。

続いて、クローラの名前を「crawler_legislators」等の適当なものに設定し、

データの格納先を指定します。今回はAWSがサンプルとして用意しているデータセットを利用するために「s3://awsglue-datasets/examples/us-legislators/all」と指定します。

今回は上記のデータセットのみを利用するので、ここは「いいえ」にして「次へ」をクリックします。

「2-1.IAMロールの設定」で用意したIAMロールを利用します。このIAMロールは「AmazonS3FullAccess」、「AWSGlueServiceRole」ポリシーをアタッチしているだけのロールです。

今回は、データソースの変更もなく一回実行できればいいので、「オンデマンドで実行」を指定します。

先ほど作成した「legislators」データベースを出力先として指定します。

最後に、諸々の設定に間違いがないかを確認し、問題なければ「完了」をクリックしてクローラを作成します。

クローラの作成が完了したら、クローラを実行しましょう。

下記のように、jsonファイルが認識されてテーブルとして情報が取得できています。

2-4.Glue開発エンドポイントの作成

続いて、実行環境となる「Glue開発エンドポイント」を作成します。

「開発エンドポイント」から「エンドポイントの追加」をクリックし、

開発エンドポイント名、IAMロールを指定します。
IAMロールは先ほどクローラに設定したものと同じものを利用しています。

今回はS3のデータのみを参照するので、「ネットワーキング情報をスキップ」として先に進みます。

今回はチュートリアルとして、とりあえず実行してみることを目的としているので、SSHキーは特に追加せずに「次へ」をクリックします

最後に、全体を確認して問題がなければ「完了」を実行します。

「プロビジョニングのステータス」が「READY」になったらエンドポイントの準備完了です。エンドポイントの準備ができるまで数分かかるので、ゆっくり待ちましょう。
ちなみに、料金は「0.44$/DAU*時間」(執筆時点)とのことなので、今回私が用意したエンドポイントの場合だと1時間で2.2$くらいかかるみたいです。

引用:Glue開発エンドポイントの料金について

2-5.SageMakerノートブックインスタンスの作成

最後に「2-4.Glue開発エンドポイントの作成」で作成したGlue開発エンドポイントと接続できるSageMakerノートブックインスタンスを作成します。
このインスタンスは「Glue開発エンドポイント」と同じリージョンに作成する必要があります。

まずは「Notebooks」から「ノートブックサーバーの作成」をクリックします。

続いて、ノートブックインスタンスの情報等について指定します。
SageMakerノートブックインスタンス立ち上げに利用するIAMロールについては、ここで新規作成しました。

ノートブックインスタンスがGlue開発エンドポイントと紐づいた状態で作成されます。

ちなみに、当然ですがこのインスタンスは「SageMaker」のコンソール画面上からも確認できます。

また、「ライフサイクル設定」を見てみると、新しく設定が追加されていました。
この「ライフサイクル設定」でノートブックインスタンスが起動するたびに、設定した「Glue開発エンドポイント」に接続しているようです。

ノートブックインスタンスが作成できたら、早速ノートブックインスタンスを開いてみましょう。
これで実行環境の準備は完了です。

3.SageMakerノートブックインスタンスでの処理

では、実際にSageMakerノートブックインスタンスから前処理を実行していきます。
やることは、「S3に保存しているjsonファイル」を「Glue開発エンドポイントで処理」させ、その結果を「S3に出力する」というものです。
ここまでと同じ手順で進めている場合は「Joining, Filtering, and Loading Relational Data with AWS Glue.ipynb」というファイルが最初からロードされているかと思いますが、こちらのスクリプトに記述されている内容をベースに解説を続けていこうと思います。
本スクリプト内の処理内容はコード例: データの結合と関係付けにも紹介されています。

3-1.必要なモジュールのインポート

まずは必要なモジュールをインポートします。
awsglue,pysparkモジュールをメインに使います。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

3-2.データの確認

続いて、データの中身を確認してみましょう。
Glueのダイナミックフレームはほとんど触ったことが無かったので少し困惑しましたが、下記2点の説明がわかりやすいのでお勧めです。

まずは指定したテーブルの基本情報をprintSchema関数を使って調べます。
対象データの基本的な定義について調べるこの操作は、利用頻度が高そうです。

orgs = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
Count:  13
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

また、下記のようなコマンドで実際にどんな値が入っているのか、を簡単に目視で確認することもできます。

orgs.toDF().show()
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+
|         identifiers|         other_names|                  id|classification|                name|               links|               image|seats|       type|
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+
|[[wikidata,Q36829...|[[en,multilingual...|            party/al|         party|                  AL|                null|                null| null|       null|
| [[wikidata,Q29552]]|[[zh,multilingual...|      party/democrat|         party|            Democrat|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q65407...|[[en,multilingual...|party/democrat-li...|         party|    Democrat-Liberal|[[website,http://...|                null| null|       null|
| [[wikidata,Q11701]]|                null|d56acebe-8fdc-47b...|   legislature|House of Represen...|                null|                null|  435|lower house|
|[[wikidata,Q327591]]|[[zh-hans,multili...|   party/independent|         party|         Independent|                null|                null| null|       null|
|[[wikidata,Q10765...|[[de,multilingual...|party/new_progres...|         party|     New Progressive|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q199319]]|[[es,multilingual...|party/popular_dem...|         party|    Popular Democrat|[[website,http://...|                null| null|       null|
| [[wikidata,Q29468]]|[[zh,multilingual...|    party/republican|         party|          Republican|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q51630...|[[en,multilingual...|party/republican-...|         party|Republican-Conser...|[[website,http://...|                null| null|       null|
| [[wikidata,Q29552]]|[[zh,multilingual...|      party/democrat|         party|            Democrat|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q327591]]|[[zh-hans,multili...|   party/independent|         party|         Independent|                null|                null| null|       null|
| [[wikidata,Q29468]]|[[zh,multilingual...|    party/republican|         party|          Republican|[[website,http://...|https://upload.wi...| null|       null|
| [[wikidata,Q66096]]|                null|8fa6c3d2-71dc-478...|   legislature|              Senate|                null|                null|  100|upper house|
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+

3-3.前処理

データの確認の次は前処理に移ります。
今回は「不要なフィールドの削除」、「フィールド名の変更」、「複数のデータフレームをJOIN」といった処理をします。

# フィールド名の変更と不要なフィールドの削除
orgs = orgs.drop_fields(['other_names','identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')

# JOIN(サブクエリ的処理させて、最後に不要(重複した)フィールドを削除している)
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])

3-4.処理結果をS3に出力

「l_history」データフレームを「parquet形式」のファイルとしてS3に出力します。
出力方法もいくつかありますが、それぞれユースケースに合わせて利用しましょう。

write_dynamic_frame.from_options()

glueContext.write_dynamic_frame.from_options(frame = l_history,
              connection_type = "s3",
              connection_options = {"path": "s3://bucket/your_dir"},
              format = "parquet")

.toDF().repartition(1).write.parquet()

s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://bucket/your_dir')

.toDF().write.parquet()

l_history.toDF().write.parquet('s3://bucket/your_dir', partitionBy=['org_name'])

S3に「parquet形式」でファイル出力まで完了したら、後は学習するだけですね。

3-5.Relational DBに書き込む(おまけ)

今回は試していないのでコードの紹介のみにとどめますが、データフレームをDBに書き込むこともできるようです。
以下はサンプルコードですが、もし実際に書き込む場合はGlueの「データベースへの接続設定」をする必要があります。

from_jdbc_conf

for df_name in dfc.keys():
    m_df = dfc.select(df_name)
    print "Writing to Redshift table: ", df_name
    glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
                                                   catalog_connection = "redshift3",
                                                   connection_options = {"dbtable": df_name, "database": "testdb"},
                                                   redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")

4.まとめ

今回はSageMakerの前処理を実行する環境として、「AWS Glueの開発エンドポイント」を指定してみました。
前処理にマシーンパワーが欲しい場合でも、「ユーザーごとにノートブックインスタンスを用意しているため、コスト的に厳しい」といった場合もあるかと思いますが、そのような場合には今回のように「Glue開発エンドポイント」を「共通の処理環境」にすることで、費用対効果よく処理能力を向上できるかもしれません。
また、Glueを参照しているので、分析者が「S3からデータを持ってきて...」とかをする手間が省け、開発の手間が削減できます。

ただ、今回の構成では「画像や動画」等のデータは処理できないので、それらのデータを処理する場合は異なる方法を選択する必要があります。

本エントリーの紹介内容は以上です。
最後までご覧いただきありがとうございました。