SageMakerの前処理をGlue開発エンドポイントで実行させる:Amazon SageMaker Advent Calendar 2018

SageMakerの前処理をGlue開発エンドポイントで実行してみよう
2018.12.09

概要

こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の9日目の記事となります。

今回はSageMakerの前処理を「AWS Glueの開発エンドポイント」で実行させてみます。
下記を参考に進めていきます。
Managing Notebooks
Working with Notebooks on the AWS Glue Console
AWS Glue コンソールでの開発エンドポイントの操作

以前に、「SageMaker-EMR-Glue-S3」と接続させて前処理をする構成についてのエントリーをご紹介したのですが、それと少し似ています。今回はEMRを使わずに「Glue開発エンドポイントのサーバーレスApache Spark環境」を利用することとなります。

目次

1.最初に

SageMakerを利用していて「データの前処理をどこで実行するか」、という点について悩んだことはないでしょうか?例えば、大きな組織でSageMakerを利用していて「ユーザーごとにノートブックインスタンスを用意」していた場合で考えてみましょう。
「前処理のためにマシーンパワーが必要だから、全員分のインスタンスをスケールアップする」というのはコスト的に厳しい一方、とはいえ前処理に単純に時間がかかるとそれはそれで業務が滞る、といったことになるかと思います。

今回ご紹介する構成では、SageMakerノートブックインスタンスからGlue開発エンドポイントにアクセスすることで、上記のように「処理の重い前処理をGlue開発エンドポイント」にて処理させることができます。
(でかいデータをSageMakerノートブックインスタンスに持ってきて処理、とかしていた部分が楽になるのでどちらかと言うと「データ探索の手間が省ける」といったメリットの方がでかいかもしれません)

詳細については下記をご参照ください
AWS Glue が Amazon SageMaker ノートブックの開発エンドポイントへの接続サポートを開始

2.Glueの準備

まずは「Glue側」の準備をします。
下記の5点を対応する必要があります。

  • IAMロールの設定(Glue開発エンドポイントからS3、SageMakerインスタンス、Cloudwatch等にアクセスできるように)
  • データベースの作成(Glueデータカタログのデータベース)
  • クローラの作成(Glueデータカタログの更新)
  • Glue開発エンドポイントの作成(今回メインで稼働するインスタンス)
  • SageMakerノートブックインスタンスの作成(Glue開発エンドポイントと接続できるようにしたインスタンス)

2-1.IAMロールの設定

利用するS3バケットにアクセスできるように設定しましょう。
今回は「AmazonS3FullAccess」、「AWSGlueServiceRole」の2つのポリシーをアタッチして先に進みます。

・今回用意したIAMロール

2-2.データベースの作成

Glueデータカタログの「データベース」を作成します。
後ほどSageMakerノートブックインスタンスからアクセスする際は、このデータベースを指定することになります。

作成手順については、まずコンソール画面上から「Glue」の画面に移動し、「データベース」、「データベースの追加」をクリックします。

今回は「legislators」と指定しました。

2-3.クローラの作成

続いて、クローラの作成です。クローラの役割はソースデータが更新された場合に、「Glueデータベース」を更新することです。
「クローラ」から「クローラの追加」をクリックします。

続いて、クローラの名前を「crawler_legislators」等の適当なものに設定し、

データの格納先を指定します。今回はAWSがサンプルとして用意しているデータセットを利用するために「s3://awsglue-datasets/examples/us-legislators/all」と指定します。

今回は上記のデータセットのみを利用するので、ここは「いいえ」にして「次へ」をクリックします。

「2-1.IAMロールの設定」で用意したIAMロールを利用します。このIAMロールは「AmazonS3FullAccess」、「AWSGlueServiceRole」ポリシーをアタッチしているだけのロールです。

今回は、データソースの変更もなく一回実行できればいいので、「オンデマンドで実行」を指定します。

先ほど作成した「legislators」データベースを出力先として指定します。

最後に、諸々の設定に間違いがないかを確認し、問題なければ「完了」をクリックしてクローラを作成します。

クローラの作成が完了したら、クローラを実行しましょう。

下記のように、jsonファイルが認識されてテーブルとして情報が取得できています。

2-4.Glue開発エンドポイントの作成

続いて、実行環境となる「Glue開発エンドポイント」を作成します。

「開発エンドポイント」から「エンドポイントの追加」をクリックし、

開発エンドポイント名、IAMロールを指定します。
IAMロールは先ほどクローラに設定したものと同じものを利用しています。

今回はS3のデータのみを参照するので、「ネットワーキング情報をスキップ」として先に進みます。

今回はチュートリアルとして、とりあえず実行してみることを目的としているので、SSHキーは特に追加せずに「次へ」をクリックします

最後に、全体を確認して問題がなければ「完了」を実行します。

「プロビジョニングのステータス」が「READY」になったらエンドポイントの準備完了です。エンドポイントの準備ができるまで数分かかるので、ゆっくり待ちましょう。
ちなみに、料金は「0.44$/DAU*時間」(執筆時点)とのことなので、今回私が用意したエンドポイントの場合だと1時間で2.2$くらいかかるみたいです。

引用:Glue開発エンドポイントの料金について

2-5.SageMakerノートブックインスタンスの作成

最後に「2-4.Glue開発エンドポイントの作成」で作成したGlue開発エンドポイントと接続できるSageMakerノートブックインスタンスを作成します。
このインスタンスは「Glue開発エンドポイント」と同じリージョンに作成する必要があります。

まずは「Notebooks」から「ノートブックサーバーの作成」をクリックします。

続いて、ノートブックインスタンスの情報等について指定します。
SageMakerノートブックインスタンス立ち上げに利用するIAMロールについては、ここで新規作成しました。

ノートブックインスタンスがGlue開発エンドポイントと紐づいた状態で作成されます。

ちなみに、当然ですがこのインスタンスは「SageMaker」のコンソール画面上からも確認できます。

また、「ライフサイクル設定」を見てみると、新しく設定が追加されていました。
この「ライフサイクル設定」でノートブックインスタンスが起動するたびに、設定した「Glue開発エンドポイント」に接続しているようです。

ノートブックインスタンスが作成できたら、早速ノートブックインスタンスを開いてみましょう。
これで実行環境の準備は完了です。

3.SageMakerノートブックインスタンスでの処理

では、実際にSageMakerノートブックインスタンスから前処理を実行していきます。
やることは、「S3に保存しているjsonファイル」を「Glue開発エンドポイントで処理」させ、その結果を「S3に出力する」というものです。
ここまでと同じ手順で進めている場合は「Joining, Filtering, and Loading Relational Data with AWS Glue.ipynb」というファイルが最初からロードされているかと思いますが、こちらのスクリプトに記述されている内容をベースに解説を続けていこうと思います。
本スクリプト内の処理内容はコード例: データの結合と関係付けにも紹介されています。

3-1.必要なモジュールのインポート

まずは必要なモジュールをインポートします。
awsglue,pysparkモジュールをメインに使います。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

3-2.データの確認

続いて、データの中身を確認してみましょう。
Glueのダイナミックフレームはほとんど触ったことが無かったので少し困惑しましたが、下記2点の説明がわかりやすいのでお勧めです。

まずは指定したテーブルの基本情報をprintSchema関数を使って調べます。
対象データの基本的な定義について調べるこの操作は、利用頻度が高そうです。

orgs = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
Count:  13
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

また、下記のようなコマンドで実際にどんな値が入っているのか、を簡単に目視で確認することもできます。

orgs.toDF().show()
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+
|         identifiers|         other_names|                  id|classification|                name|               links|               image|seats|       type|
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+
|[[wikidata,Q36829...|[[en,multilingual...|            party/al|         party|                  AL|                null|                null| null|       null|
| [[wikidata,Q29552]]|[[zh,multilingual...|      party/democrat|         party|            Democrat|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q65407...|[[en,multilingual...|party/democrat-li...|         party|    Democrat-Liberal|[[website,http://...|                null| null|       null|
| [[wikidata,Q11701]]|                null|d56acebe-8fdc-47b...|   legislature|House of Represen...|                null|                null|  435|lower house|
|[[wikidata,Q327591]]|[[zh-hans,multili...|   party/independent|         party|         Independent|                null|                null| null|       null|
|[[wikidata,Q10765...|[[de,multilingual...|party/new_progres...|         party|     New Progressive|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q199319]]|[[es,multilingual...|party/popular_dem...|         party|    Popular Democrat|[[website,http://...|                null| null|       null|
| [[wikidata,Q29468]]|[[zh,multilingual...|    party/republican|         party|          Republican|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q51630...|[[en,multilingual...|party/republican-...|         party|Republican-Conser...|[[website,http://...|                null| null|       null|
| [[wikidata,Q29552]]|[[zh,multilingual...|      party/democrat|         party|            Democrat|[[website,http://...|https://upload.wi...| null|       null|
|[[wikidata,Q327591]]|[[zh-hans,multili...|   party/independent|         party|         Independent|                null|                null| null|       null|
| [[wikidata,Q29468]]|[[zh,multilingual...|    party/republican|         party|          Republican|[[website,http://...|https://upload.wi...| null|       null|
| [[wikidata,Q66096]]|                null|8fa6c3d2-71dc-478...|   legislature|              Senate|                null|                null|  100|upper house|
+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----------+

3-3.前処理

データの確認の次は前処理に移ります。
今回は「不要なフィールドの削除」、「フィールド名の変更」、「複数のデータフレームをJOIN」といった処理をします。

# フィールド名の変更と不要なフィールドの削除
orgs = orgs.drop_fields(['other_names','identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')

# JOIN(サブクエリ的処理させて、最後に不要(重複した)フィールドを削除している)
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])

3-4.処理結果をS3に出力

「l_history」データフレームを「parquet形式」のファイルとしてS3に出力します。
出力方法もいくつかありますが、それぞれユースケースに合わせて利用しましょう。

write_dynamic_frame.from_options()

glueContext.write_dynamic_frame.from_options(frame = l_history,
              connection_type = "s3",
              connection_options = {"path": "s3://bucket/your_dir"},
              format = "parquet")

.toDF().repartition(1).write.parquet()

s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://bucket/your_dir')

.toDF().write.parquet()

l_history.toDF().write.parquet('s3://bucket/your_dir', partitionBy=['org_name'])

S3に「parquet形式」でファイル出力まで完了したら、後は学習するだけですね。

3-5.Relational DBに書き込む(おまけ)

今回は試していないのでコードの紹介のみにとどめますが、データフレームをDBに書き込むこともできるようです。
以下はサンプルコードですが、もし実際に書き込む場合はGlueの「データベースへの接続設定」をする必要があります。

from_jdbc_conf

for df_name in dfc.keys():
    m_df = dfc.select(df_name)
    print "Writing to Redshift table: ", df_name
    glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
                                                   catalog_connection = "redshift3",
                                                   connection_options = {"dbtable": df_name, "database": "testdb"},
                                                   redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")

4.まとめ

今回はSageMakerの前処理を実行する環境として、「AWS Glueの開発エンドポイント」を指定してみました。
前処理にマシーンパワーが欲しい場合でも、「ユーザーごとにノートブックインスタンスを用意しているため、コスト的に厳しい」といった場合もあるかと思いますが、そのような場合には今回のように「Glue開発エンドポイント」を「共通の処理環境」にすることで、費用対効果よく処理能力を向上できるかもしれません。
また、Glueを参照しているので、分析者が「S3からデータを持ってきて...」とかをする手間が省け、開発の手間が削減できます。

ただ、今回の構成では「画像や動画」等のデータは処理できないので、それらのデータを処理する場合は異なる方法を選択する必要があります。

本エントリーの紹介内容は以上です。
最後までご覧いただきありがとうございました。