【レポート】ABD215 – Serverless Data Prep with AWS Glue #reinvent
はじめに
re:Invent 2017のワークショップセッション「abd215 - Serverless Data Prep with AWS Glue」 についてご紹介します。
セッション概要
In this session, you learn how to set up a crawler to automatically discover your data and build your AWS Glue Data Catalog. You then auto-generate an AWS Glue ETL script, download it, and interactively edit it using a Zeppelin notebook, connected to an AWS Glue development endpoint. After that, you upload this script to Amazon S3, reuse it across multiple jobs, and add trigger conditions to run the jobs. The resulting datasets automatically get registered in the AWS Glue Data Catalog and you can then query these new datasets from Amazon EMR and Amazon Athena. Prerequisites: Knowledge of Python and familiarity with big data applications is preferred but not required. Attendees must bring their own laptops.
セッション資料
Workshop
こちらの資料をみながら進めていきます。
AWS Glue の為のIAMのパーミッションをセットアップ
既に以下のアクセス権がある場合はスキップしてください。
- IAMコンソールにアクセスし、「ユーザー」を選択します。 次に、ユーザー名を選択します。
- [アクセス権限の追加]ボタンをクリックします。
- 「AWSによる管理」のリストから、次のものを付与します。
- AWSGlueConsoleFullAccess
- CloudWatchLogsReadOnlyAccess
- AWSCloudFormationReadOnlyAccess
AWS Glueのデフォルトサービスロールをセットアップ
- IAMコンソールから[ロール]をクリックし、新しいロールを作成します
- サービスのリストから「Glue」を選択し、「次のステップ:アクセス権限」ボタンをクリックします。
-
ポリシーのリストから、以下を付与し、「次のステップ:確認」ボタンをクリックします。
- AWSGlueServiceRole
- AWSGlueServiceNotebookRole
- AmazonS3FullAccess
4.ロール名にAWSGlueServiceRoleなどの名前を付け、ロールの作成をクリックします
AWS Glue Notebookのロールをセットアップ
- IAMコンソールから[ロール]をクリックし、新しいロールを作成します。
- サービスのリストからEC2を選択し、最初のEC2ユースケースを選択して、「次のステップ:アクセス権限」をクリックします。
-
ポリシーのリストから、以下を付与し、「次のステップ:確認」ボタンをクリックします。
- AWSGlueServiceNotebookRole
- AmazonS3FullAccess
4.ロール名にAWSGlueServiceNotebookRoleなどの名前を付け、ロールの作成をクリックします
適切なセキュリティグループを作成する
- EC2コンソールから、[セキュリティグループ]を選択します
- [セキュリティグループの作成]ボタンをクリックします。
- セキュリティグループにAWSGlueNotebookSecurityGroupなどの名前を付けます。 それをデフォルトのVPCに割り当てます(またはVPCを選択します)
- [インバウンド]タブの[ルールの追加]をクリックし以下のように設定します。
-
[作成]をクリックします
AWS Glue Developer Endpoint を作成します。
- Glueコンソールから、左側のDev Endpointsを選択します。
- [Add endpoint]ボタンをクリックします。
-
エンドポイントに名前を付け(10文字以下でなければなりません)、前のセクションで作成したIAMの役割を割り当てます。 Script libraries and job parameters (optional)をドロップし、DPUを10に変更します。
-
Networkingは、デフォルト設定のままにしてネットワーク情報をスキップします
-
[Create SSH Key]をクリックします。 秘密鍵はブラウザによって自動的にダウンロードされます。 保存して[Next]をクリックします
-
[Finish]をクリックして、Dev Endpointの作成を開始します。 これには数分かかります。
新しいDev EndpointにNotebookを起動します。
Provisioning status
がREADY
になってから進めます。- Dev Endpointの隣にあるチェックボックスをオンにして、[Action]ボタンをクリックし、ドロップダウンから[create Notebook server]を選択します。
-
この画面で、必要なフィールドに記入していきます。
- CloudFormation stack が作成されますので任意の名前を入力します。
- IAMロールは、以前作成した
AWSGlueServiceNotebookRole
ロールを設定します。 - EC2鍵ペアはすでに作成された鍵に設定するか、新しい鍵を作成する必要があります。(SSHアクセスのため)
- Dev Endpoint作成時に保存したSSH秘密鍵をアップロードする
- Notebookにユーザー名とパスワードを設定。 後でログインする必要があります。
- 作成したNotebook保存するためのS3パスを設定。
- public subnetを選択、前のセクションでセキュリティグループを割り当てたのと同じVPCにあることを確認します。
- 前のセクションで作成した
AWSGlueNotebookSecurityGroup
セキュリティグループを選択します。
-
Finishをクリックします。
Notebooks
Glue コンソール
- Dev Endpointsを選択
- 作成したEndpointをクリック
- Notebook Server URLをクリックして、Zeppelin Notebookを開きます。
- 先ほど設定したユーザー名とパスワードでログインします。
- 以下、ファイルをダウンロードしてImport Noteからインポートします。
- Introduction to AWS Glue DynamicFrame API
- Web Page Scraping and Machine Learning
- Book Reviews Graph Analysis
Notebookにログインして3つのモジュールを実施することができます。
Workshop Notebooks
それぞれのモジュールをみていきます。
1. Introduction to AWS Glue DynamicFrame API
このモジュールでは、AWS Glue APIを使用してデータの読み取り、書き込み、操作を行う方法について説明します。 このインタラクティブNotebookに書かれているすべてのコードは、 AWS Glue ETLエンジンを使用し、有効なETLスクリプトにコピーすることができます。
Step 1
Glue transformation functions と the AWS Boto3 SDKをインポート
from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * import boto3 # sc = SparkContext() # we will not reload the Spark context because our Zeppelin notebook already created one for us glueContext = GlueContext(sc) spark = glueContext.spark_session
Step 2
NYC taxi datasetを自動検出するためにGlueクローラーをセットアップします。
- AWS Glueコンソールを開く
- Crawler -> Add Crawler
- データソースとしてS3を選択し、別のアカウントでパスを指定します。 S3パスとしてs3://workshop-public/taxi/を貼り付けます。
- 追加のデータソースを追加しない。
- IAM Roleは先ほど作ったAWSGlueServiceRoleを選びます。
- FrequencyはRun On Demandを選択してください。
- defaultデーターベースを選択、もしくは作成、tableのプレフィックスを
table_
などにします。 - Finishを押します。
-
作成したクローラーを実行します。
クローラが完了したら、defaultデータベースにtable_taxiという名前のテーブルがあるはずです。
Step 3
Glue Data Catalogを使用してTaxiデータを読み取るために、Glue DynamicFrame APIを使用できるようになりました。 次に、テーブルスキーマを参照としてprintします。 Glueクローラーで定義したもののデータベース名とテーブル名を置き換えてください
taxis = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "table_taxi", transformation_ctx = "taxis") taxis.printSchema()
total_amountが$ 0を除外する
Filter APIは、Trueと評価される式をパラメータとして受け取り、その結果、これらのレコードを保持します。
下の最初の行では、Glue DynamicFrameをSpark DataFrameに変換してSpark APIを確認しています。
df = taxis.toDF() df = df.filter('total_amount > 0')
次に、データを見るためにSQLを使用したいので、DataFrameを一時的なビューとして登録する必要があります
df.createOrReplaceTempView('rides')
Step 4
SQLを使用してデータをselect
してみます。 Zeppelinのグラフを使ってデータをグラフ化してみます。
select date_format(dropoff_datetime, 'yyyy-MM-dd') as dt, sum(trip_distance), sum(total_amount) from rides group by date_format(dropoff_datetime, 'yyyy-MM-dd') order by dt limit 10
2. Web Page Scraping and Machine Learning
二つ目のモジュールでは、AWSフォーラムの投稿を収集し、それらを整理して構造化する簡単なWebクローラを作成します。 machine leanringのためにデータをさらにPre処理します。 その後、データを用いて簡単なWord2VecモデルとLDAクラスタリングモデルを構築します。
Step 1
必要なモジュールのパッケージをロードします。 Requests、BeautifulSoup、Numpyを含む(SparkMLに必要) 以下では、必要なモジュールをインポートします
sc.addPyFile('s3://workshop-public/packages/web_parsing_modules.zip')
from concurrent.futures import ThreadPoolExecutor, as_completed from collections import OrderedDict from bs4 import BeautifulSoup import urlparse as url import requests import numpy import json import sys import re
Step 2
最初のセルには、解析したいAWSフォーラムのパスを保持するための変数が含まれます。 2番目のセルには、フォーラムの投稿を繰り返し処理したり、relavent情報を抽出するためのヘルパー関数
AWSFORUMS = 'https://forums.aws.amazon.com/' ATHENA_FORUM = 'forum.jspa?forumID=242&start=0'
def listPosts(items): posts = [] for item in items: post = {} # thread subject line and link i = item.find('a', id='jive-thread-0') post['subject'] = i.string post['link'] = AWSFORUMS + i['href'] # give our post a unique id for tracking purposes path = url.urlparse(post['link']) post['id'] = url.parse_qs(path.query)['threadID'][0] # thread view and reply count i = item.find('td', 'jive-view-count').string.split() if len(i) == 0: post['views'] = 0 post['replies'] = 0 else: post['views'] = i[0] post['replies'] = i[2] posts.append(post) return posts def getPostContent(post): data = requests.get(post['link']) html = BeautifulSoup(data.text, 'html.parser') post['date'] = html.select('div.jive-message-info font')[0].string body = [] messages = html.select('div.jive-message-body') if len(messages) > 0: for msg in messages: body.append(list(msg.div.stripped_strings)) post['body'] = [val for sublist in body for val in sublist] # flatten the nested list of lists return post
Step 3
このセルでは、フォーラムURLへの複数の同時GET要求を実行するための単純な thread poolをセットアップします。 ページを取得したら、それをBeautifulSoupで解析し、CSSセレクタを使って必要なビットを抽出します。 最後に、作成した後の各ポストをJSONでエンコードして、SparkのDataFrameという表形式に簡単に変換できます
pool = ThreadPoolExecutor(2) forum = requests.get(AWSFORUMS + ATHENA_FORUM) html = BeautifulSoup(forum.text, 'html.parser') items = html.select('.jive-thread-list > .jive-table tbody > tr') posts = listPosts(items) futures = [pool.submit(getPostContent, post) for post in posts] json_posts = [] for r in as_completed(futures): try: json_posts.append(json.dumps(r.result())) except Exception, err: sys.stderr.write('*** ERROR: %sn' % str(err)) rdd = sc.parallelize(json_posts) df = spark.read.json(rdd)
Step 4
次に、追加のデータ準備に使用する追加のモジュールをインポートします。 post textから不要な文字をさらに削除するための単純なPython lambdaを作成する
from pyspark.ml.feature import Tokenizer, StopWordsRemover from pyspark.sql.types import StringType, ArrayType, DoubleType from pyspark.sql.functions import udf
次のセルは、2つのことを準備します。
- 文字、数字、およびスペースのみを保持する正規表現を作成します。
- データセットの各行からポストボディを指定するユーザー定義関数を登録し、文字列の複数の配列を1つの文の配列に結合します
次に、3つのアクションを呼び出します。
- Sparkを使用してデータセットを取得し、より良い並列処理のためにノードのクラスタ全体に分散させます
- データセットをJSONとして解析し、DataFrameと呼ばれるタブルラ表現を作成します
- 宣言されたUDFを使用して、消去されたpost bodyで構成される文章と呼ばれる新しい列を追加します
最後の行は不完全で、正しいコードを追加する必要があることに注意してください。 ヒント、上記の#3をもう一度読んでください。
removeExp = re.compile(r'[^\w\d\s]|\n|\r') cleanUdf = udf(lambda s: re.sub(removeExp, '', ' '.join(s)), StringType()) rdd = sc.parallelize(json_posts) df = spark.read.json(rdd) df = df.withColumn('sentences', cleanUdf(df.body))
Printing out the [schema](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchema) of our DataFrame lets us see the column names and types ## Print out the schema
Step 5
次のセルは、Spark MLlibライブラリの2つの関数を使用して、post textを最初にトークン化(単語に分割)し、2番目にstep wordsを削除します。 この手順は、テキスト処理のワークフローで最も一般的です。
tokenizer = Tokenizer(inputCol="sentences", outputCol="words") wordsData = tokenizer.transform(df) remover = StopWordsRemover(inputCol="words", outputCol="filteredWords") filteredData = remover.transform(wordsData)
Step 6
次のセルでは、SparkMLのWord2Vec実装を使用して、投稿のテキストに基づいて単語埋め込みを生成します。 Word2Vecは、単語に基づく大きなベクトル空間にマップされた単語の数値表現(ベクトル配列、数字の配列)を作成する興味深いアルゴリズムです。 文書のコーパスから識別される類似点。 findSynonyms関数を使用して単語の類似性を検索する場合、アルゴリズムはベクトル空間全体を検索します。 残りのコーパスとの関係に基づいて最も類似している単語に一致させることができます。
モデルを訓練するためのテキストのサブセットは非常に小さいため、結果はあまり良くないことに留意することが重要です。 しかし、使い方を理解する価値はまだあります。
from pyspark.ml.feature import Word2Vec word2Vec = Word2Vec(vectorSize=5, seed=42, inputCol="filteredWords", outputCol="word_vec") wv_model = word2Vec.fit(filteredData) wv_model.findSynonyms('query', 2).show()
Step 7
Latent Dirichlet allocation (LDA) は、機能の点でクラスタリングと似たアルゴリズムであり、テキスト中の単語の関係に基づいてテキスト中の数多くのトピックを識別するために使用されます。
最初に、文書ごとに単語数のベクトルを作成するCountVectorizer(フォーラムポスト)を実行し、LDAアルゴリズムに入力します。 モデルにデータをフィットさせ(トレーニング)、同じデータに変換します。 ここでも、訓練するデータはあまりなく、同じデータを訓練して予測することは望ましくありません。
最後に、結果のDataFrameを取得し、一時的なビューを作成して、SQLを使用して分析します。
from pyspark.ml.feature import CountVectorizer from pyspark.ml.clustering import LDA cv = CountVectorizer().setInputCol('filteredWords').setOutputCol('features').setVocabSize(500).setBinary(True) counted_data = cv.fit(filteredData).transform(filteredData) lda = LDA().setK(3).setMaxIter(10).setOptimizer('em') lda_model = lda.fit(counted_data) lda_topics_df = lda_model.transform(counted_data).select('subject', 'topicDistribution') lda_topics_df.createOrReplaceTempView('topics') # The topic weights are in a vector format so we need to convert them. We'll register a UDF to do that with SQL spark.udf.register('toArrayUdf', (lambda r: r.toArray().tolist()), ArrayType(DoubleType())) # A UDF to extract the index in the topics array for which the value is the largest. This identifys which topic the post is most closely realted to. def maxUdf(items): l = items.toArray().tolist() return l.index(max(l)) spark.udf.register('toMaxUdf', maxUdf, IntegerType())
下の表から、LDAモデルは各投稿に対して3つのクラスタを作成し、各クラスタでの投稿の影響を特定しました。
%sql with dataset as (select subject, toArrayUdf(topicDistribution) as weights from topics) select subject, weights[0] as k1, weights[1] as k2, weights[2] as k3 from dataset
エラーが出てしまいましたが内容だけ記載しました。
ウェイトの最大値に基づいてトピックに最も関連する投稿の総数に関連するトピックの分布を視覚化できます。
%sql select subject, toMaxUdf(topicDistribution) as topic from topics
3. Book Reviews Graph Analysis
このモジュールでは、2153471ユーザー、1143092会場、1021970チェックイン、27098490社のソーシャルコネクション、および2809581件のユーザーが会場に割り当てたUMN / Sarwat Foursquare Datasetのデータを使用します。 すべてがFoursquareアプリケーションからパブリックAPIを通じて抽出されます。 すべてのユーザー情報は匿名化されています。つまり、ユーザーのジオロケーションも匿名化されています。 データを読み、それを掃除し、GraphFramesを使ってグラフを作成し、検索し、接続を見つける方法を学びます。
- users.dat:各ユーザーが一意のIDと、ユーザーのホームタウンの場所を表す地理空間の場所(緯度と経度)を持つようなユーザーのセットで構成されます。
- venues.dat:各会場が一意のIDと地理空間位置(lattudeと経度)を持つような一連の会場(レストランなど)で構成されます。
- checkins.dat:会場のユーザーのチェックイン(訪問)をマークします。 各チェックインには、一意のID、ユーザーID、会場IDがあります。
- socialgraph.dat:ユーザー間に存在するソーシャルグラフのエッジ(接続)を含みます。 各ソーシャルコネクションは、2つのユニークなid(first_user_idとsecond_user_id)で表される2人のユーザ(フレンド)から構成されます。
- ratings.dat:ユーザーが特定の会場をどのくらい好きであるかを定量化する暗黙の評価で構成されます。
Step 1
このステップでは、MVPリポジトリから外部モジュールをロードするためのZeppelinの機能を使用しています。 このセルを最初に実行し、他のセルを実行する前に完了させる必要があります。 完了したら、次のセルを実行し、他のセルを実行する前に完了させることもできます。
これらのセルを最初に実行する必要がある理由は、Spark Contextに外部モジュールをロードし、プログラムで使用する前にそれを実行する必要があるためです
%spark.dep z.load("graphframes:graphframes:0.5.0-spark2.1-s_2.11")
## There is one small issue with the GraphFrame library in that it doesn't properly expose its Python module for use in PySpark. ## To fix that I had to extract the Python files from the JAR and make them available as a separate module. sc.addPyFile('s3://workshop-public/packages/graphframes.zip')
import re from graphframe import * from pyspark.sql.functions import *
Step 2
このステップでは、すべてのデータファイルを読み込みます。 read.csvやread.jsonなどのconvinience関数は使用していないことに注意してください。 その理由は、これらのファイルが非常に機械に適したフォーマットではないからです。 textFile APIを使用すると、テキストファイルを読み込み、改行文字で区切られていると仮定して、すべての行に新しい行を自動的に作成します。
users = sc.textFile('s3://workshop-public/4square/users.dat') checkins = sc.textFile('s3://workshop-public/4square/checkins.dat') ratings = sc.textFile('s3://workshop-public/4square/ratings.dat') venues = sc.textFile('s3://workshop-public/4square/venues.dat') connections = sc.textFile('s3://workshop-public/4square/socialgraph.dat')
Step 3
大部分のデータ準備が行われる場所です。設定するには、データから不要な文字を削除するために使用される正規表現パターンを作成します。
次に、Spark APIを使用してデータファイルを一度に1つずつ繰り返します。操作は次のとおりです。
- 各行にマップし、テキストを| (パイプ)デリミタ
- 分割された各行にマップし、正規表現を使用して不要な文字を削除する
- 結果セットをその中のデータを持つ行だけにフィルタする
データを繰り返し処理して消去すると、そのデータをSpark DataFrameに変換し、プロセス内で列名を割り当てることができます。
GraphFramesを操作するために、元のファイルの名前から少し変更しました。すべてのDataFramesのid列はユーザーIDを参照します。 接続DataFrameはソーシャルグラフを保持し、起点ユーザIDと、ユーザ間の再接続するターゲットユーザIDをリストします。 GraphFramesは、これらの2つのカラムが起点ユーザIDの名前srcと、ターゲットユーザIDのためのdstであることを要求します。
removeExp = re.compile(r'-?[a-z\+\s\_]+|-{2,}') u = users \ .map(lambda r: r.split('|')) \ .map(lambda r: map(lambda i: re.sub(removeExp, '', i), r)) \ .filter(lambda r: r[0] != '') df_users = spark.createDataFrame(u, ['id', 'user_lat', 'user_lon']) c = checkins \ <COMPLETE THE FUNCTION> df_checkins = spark.createDataFrame(c, ['checkinid', 'id', 'venueid', 'checkin_lat', 'checkin_lon', 'created']) r = ratings \ <COMPLETE THE FUNCTION> df_ratings = spark.createDataFrame(r, ['id', 'venueid', 'score']) v = venues \ <COMPLETE THE FUNCTION> df_venues = spark.createDataFrame(v, ['venueid', 'venue_lat', 'venue_lon']) con = connections \ <COMPLETE THE FUNCTION> df_connections = spark.createDataFrame(con, ['src', 'dst'])
Step 4
複数のファイルが関連しているので、包括的なDataFrameを作成するためにそれらを結合する必要があります。 最初に会場とレーティングに参加すると、DataFrameとチェックインデータが結合されます。 冗長な列を削除する必要があることに注意してください。
venues_ratings = df_venues.join(df_ratings, df_ratings.venueid == df_venues.venueid) \ .drop(df_ratings.venueid) venues_ratings = venues_ratings.selectExpr(""" CASE WHEN score<=2.0 THEN 'poor' WHEN score>=3.0 AND score<=4.0 THEN 'good' WHEN score>4.0 THEN 'great' END as rating """, "*") venues_ratings.printSchema() venues_ratings.show(5)
Step 5
次に、データからGraphFrameを作成します。 GraphFrameコンストラクタは、頂点とエッジをパラメータとして取ります。 頂点はグラフ内のノードであり、エッジはこれらのノード間の接続またはパスです。
次のセルでは、頂点と端を見て、これが何を意味するのかを感じ取ることができます。
5つのエッジと5つの頂点のサンプルを印刷して、どのように見えるかを確認します。 ヒント:グラフオブジェクトには、データフレームであるエッジとverticesの要素があります。 DataFrameの値をどのように表示するか覚えていますか?
vertices = venues_ratings.drop('venue_lat').drop('venue_lon') graph = GraphFrame(vertices ,df_connections) ## Show the edges and vertices
Step 6
モチーフ検索は、グラフ内のパターンを見つける方法です。 GraphFrameのモチーフAPIは、ドメイン固有言語を使用して構造クエリーを表現します。 APIのドキュメントhere
例えば、以下のモチーフ検索オペレーションは、両方の方向のエッジによって接続される任意の2つの頂点(ノード) aおよびbを検索することである。 私はあなたの友人のようなもので、あなたは私の友人です。 検索パターンで使用される名前は単なるラベルであり、GraphFrameの実際のデータを表していないことに注意してください。
モチーフ検索はリソース集中型操作であることに注意してください。 Glue開発者のエンドポイントは、通常は大容量でプロビジョニングされていないため、この作業には少し時間がかかることが予想されます。
## Due to the fact that graph operations are resource intensive and the dev endpoint is configured for data exploration and not heavy operations, ## we'll take our dataset and split it into a large sample and a smaller sample so we can show how these graph operations work. verx_lg, verx_sm = venues_ratings.randomSplit([0.8, 0.2], seed=12345)
motifs = graph.find("(a)-[e]->(b); (b)-[e2]->(a)") motifs.show(20, truncate=False)
PageRank
PageRankは、接続ノードの相対的重要度を測定する目的でグラフの各エッジに数値ウェイトを割り当てるためのリンク解析アルゴリズムです。
GraphFrames APIはパーソナライズされていないパーソナライズされたPageRankをサポートしています。 特定のユーザーIDに対してPageRankを実行する方法を理解できますか?
g = GraphFrame(df_users, df_connections) results = g.pageRank(resetProbability=0.15, maxIter=5) results.vertices.select("id", "pagerank").show(10) results.edges.select("src", "dst", "weight").show(10)
特定のユーザーIDでPageRankを実行するコードを完成させてください。 最初に、ユーザーIDがデータフレーム内にあることを確認する必要があります。 hint
g = GraphFrame(df_users, df_connections) ## Complete the code
Label Propagation Algorithm (LPA)
グラフ上でLPAを実行して、コミュニティ検出を行います。
この操作はリソースを大量に消費するため、開発者のエンドポイントで時間がかかることに注意してください。
g = GraphFrame(df_users, df_connections) ## Complete the code to do LPA on variable g result = g.labelPropagation(maxIter=5) result.select("id", "label").show()
おわりに
re:Inventを機会にAWS Glueに触れてみました。AWS Glueに興味があるかたは是非試してみてください。途中からエラーが発生してしまい全ての実行結果をお見せできませんでしたが内容について記載いたしました。
AWS Glueについては弊社ブログGlue – 特集カテゴリー – | Developers.IOもご参照ください。