AWS Glue が DynamoDBのテーブルのクロールやETLジョブをサポートしたので試してみました

2018.07.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS Glue アップデートを確認したことろ、DynamoDBのテーブルのクロールとETLジョブをデータソースに指定できるようになったようです。どこまでサポートしたのか早速試してみました。

サポート内容

DynamoDBのテーブルのクロールをサポート

AWS GlueのクローラがAmazon DynamoDBのテーブルをメタデータを抽出し、AWS Glue データカタログに追加できるようになりました。

DynamoDBのテーブルがデータソースに利用可能

AWS Glue データカタログに登録したDyamoDBのテーブルは、Glue ETLジョブを作成してDynamoDBテーブルのデータをAmazon S3やAmazon Redshiftなどのサービスに読み込み、変換し、ロードして、分析に利用することもできるようになりました。

準備

IAMロールにDyamamoDBのマネージドポリシーを追加

新機能を試す前に、クローラやETLジョブ作成時に指定するIAMロールに対して、新しくサポートしたDyamamoDBのマネージドポリシーを追加してください。設定を忘れると、権限不足が原因のエラーが発生します。

今回のサポートは、DynamoDBのテーブルのメタ情報とテーブルデータの読み込みのみなので、AmazonDynamoDBReadOnlyAccessマネージドロールを追加します。

DynamoDBのテーブルの用意

DynamoDBにposdataというテーブルを作成して、100件のレコードを作成しました。

移行準備が、整いましたので引き続き試してみます。

DynamoDBのテーブルのクロールを試してみる

クローラを作成します。Choose a Data Storeは新たに追加されたDynamoDBを選択します。

Choose an IAM roleで指定するIAMロールには、DyamamoDBのマネージドポリシーを追加したものを指定してください。既存のIAMロールに追加済みであればそのIAMロールを指定します。

今回、クローラに設定した内容は以下のとおりです。

クロールした結果、AWS Glueデータカタログに登録されたテーブルは以下のとおりです。ClassificationがDynamoDB、Locationがarn:aws:dynamodb:ap-northeast-1:xxxxxxxxxx:table/posdataと設定されています。

DynamoDBのテーブルからjson出力するETLジョブを試してみる

DynamoDBのテーブルからjson出力する、つまりエクスポートするような動作になるETLジョブを作成します。また、ETLジョブならではの動作として、DynamoDBのRangekeyであるsales_dateをキーにカラム名ありパーティションで出力します。

Job Properties の IAM roleで指定するIAMロールには、DyamamoDBのマネージドポリシーを追加したものを指定してください。既存のIAMロールに追加済みであればそのIAMロールを指定します。

Choose your data sourcesでは、先程クローラでAWS Glueデータカタログに登録したdymamo_posdataテーブルを選択します。

Choose your data targetsでは、DataStoreにS3、Formatにjsonを指定します。

今回、ETLジョブに設定した内容は以下のとおりです。

すると、以下のETLジョブのソースが生成されます。ソースを見る限りS3データレイクを入力する場合と代わりません。sales_dateをキーにカラム名ありパーティションで出力しまので、L.xxの行の代わりにL.xxの行を追加しています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "dymamo_posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "dymamo_posdata", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("shop_id", "string", "shop_id", "string"), ("total", "string", "total", "string"), ("quantity", "string", "quantity", "string"), ("item_id", "string", "item_id", "string"), ("sales_date", "date", "sales_date", "date"), ("price", "string", "price", "string"), ("item_name", "string", "item_name", "string"), ("id", "string", "id", "string"), ("shop_name", "string", "shop_name", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("shop_id", "string", "shop_id", "string"), ("total", "string", "total", "string"), ("quantity", "string", "quantity", "string"), ("item_id", "string", "item_id", "string"), ("sales_date", "date", "sales_date", "date"), ("price", "string", "price", "string"), ("item_name", "string", "item_name", "string"), ("id", "string", "id", "string"), ("shop_name", "string", "shop_name", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-mybucket/tmp"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-mybucket/tmp"}, format = "json", transformation_ctx = "datasink2")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-mybucket/tmp", "partitionKeys": ["sales_date"]}, format = "json", transformation_ctx = "datasink2")
job.commit()

ターゲットに指定した出力ファイルは以下のとおりです。sales_dateフォルダの下にファイルが出力されています。

$ aws s3 ls s3://cm-mybucket/tmp/ --recursive
2018-07-06 13:26:54 0 tmp/
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-01/run-1531292031980-part-r-00001
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-01/run-1531292031980-part-r-00011
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-02/run-1531292031980-part-r-00002
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-02/run-1531292031980-part-r-00012
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-03/run-1531292031980-part-r-00003
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-03/run-1531292031980-part-r-00013
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-04/run-1531292031980-part-r-00004
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-04/run-1531292031980-part-r-00014
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-05/run-1531292031980-part-r-00005
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-05/run-1531292031980-part-r-00015
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-06/run-1531292031980-part-r-00006
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-06/run-1531292031980-part-r-00016
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-07/run-1531292031980-part-r-00007
2018-07-11 15:53:59 857 tmp/sales_date=2018-07-07/run-1531292031980-part-r-00017
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-08/run-1531292031980-part-r-00008
2018-07-11 15:53:59 857 tmp/sales_date=2018-07-08/run-1531292031980-part-r-00018
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-09/run-1531292031980-part-r-00009
2018-07-11 15:53:59 857 tmp/sales_date=2018-07-09/run-1531292031980-part-r-00019
2018-07-11 15:53:58 857 tmp/sales_date=2018-07-10/run-1531292031980-part-r-00000
2018-07-11 15:53:58 860 tmp/sales_date=2018-07-10/run-1531292031980-part-r-00010

$ aws s3 ls s3://cm-mybucket/tmp/sales_date=2018-07-01/
2018-07-11 15:53:58 860 run-1531292031980-part-r-00001
2018-07-11 15:53:58 857 run-1531292031980-part-r-00011

$ aws s3 cp s3://cm-mybucket/tmp/sales_date=2018-07-01/run-1531292031980-part-r-00001 -
{"shop_id":"1001","total":"400","quantity":"1","item_id":"35940008","price":"400","item_name":"単3乾電池(8本)","id":"00000008","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"100","quantity":"1","item_id":"35940002","price":"100","item_name":"単3乾電池(2本)","id":"00000002","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"500","quantity":"1","item_id":"35940010","price":"500","item_name":"単3乾電池(10本)","id":"00000010","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"200","quantity":"1","item_id":"35940004","price":"200","item_name":"単3乾電池(4本)","id":"00000004","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"300","quantity":"1","item_id":"35940006","price":"300","item_name":"単3乾電池(6本)","id":"00000006","shop_name":"秋葉原駅前店"}

$ aws s3 cp s3://cm-mybucket/tmp/sales_date=2018-07-01/run-1531292031980-part-r-00011 -
{"shop_id":"1001","total":"350","quantity":"1","item_id":"35940007","price":"350","item_name":"単3乾電池(7本)","id":"00000007","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"250","quantity":"1","item_id":"35940005","price":"250","item_name":"単3乾電池(5本)","id":"00000005","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"50","quantity":"1","item_id":"35940001","price":"50","item_name":"単3乾電池(1本)","id":"00000001","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"150","quantity":"1","item_id":"35940003","price":"150","item_name":"単3乾電池(3本)","id":"00000003","shop_name":"秋葉原駅前店"}
{"shop_id":"1001","total":"450","quantity":"1","item_id":"35940009","price":"450","item_name":"単3乾電池(9本)","id":"00000009","shop_name":"秋葉原駅前店"}

なお、ソースデータとターゲットデータ共にDynamoDBに指定した場合、Map the source columns to target columnsにおいて、ETLジョブ作成時に指定するIAMロールに対して、マネージドポリシーDyamamoDBFullAccessを指定しましたが、以下のエラーが出力されました。変更したのはデータターゲットだけなのですが、、、(2018/07/11時点)

(上記の画像は読めないので、テキストも掲載します。)

{"service":"AWSGlue","statusCode":400,"errorCode":"InvalidInputException","requestId":"cba009ee-84da-11e8-b8d4-e17b4c484dc6","errorMessage":"No enum constant com.amazonaws.services.glue.FileFormat.DYNAMODB","type":"AwsServiceError"}

補足:DynamoDBのデータソースのReserved Capacity Unit(RCU)のチューニング

DynamoDBは、単位時間に読み取ることができるキャパシティが設定されており、AWS Glueのデータ取得もそのキャパシティの範囲で取得する必要があります。よって、AWS Glueのデータ取得したために他のシステムが読み取りできなくなるということを避けることを考慮したほうが良いでしょう。DynamoDBの利用のピークにAWS Glueの読み取りが加わる場合はキャパシティのチューニングを検討してください。

使用するReserved Capacity Unit(RCU)は、dynamodb.throughput.read.percentオプションで指定できます。この値は、0.1から1.5の範囲で指定可能で、デフォルトでは0.5に設定されています。

詳細は、以下のマニュアルを参照してください。

最後に

DynamoDBのクロールやETLジョブはこれまでの利用方法とほとんど変わりませんが、Reserved Capacity Unit(RCU)のチューニングについては検討してください。本日の検証で御覧頂いたとおり、レコード数がそれほど多くなければ、Glueを使ってDynamoDBのデータをS3に自由なフォーマットでエクスポートするのもありまもしれません。

サーバレスアナリティクスは、耐久性があり、高度にスケーラブルでメンテナンスが不要であるビッグデータストレージやデータベースを用いて、システムをデザインしたいと考えるはずです。AWS Glueのデータカタログにあらゆるデータを統合される傾向があり、その流れの中でDynamoDBもサポートされたことは嬉しい限りです。