AWS GlueでS3に保存したJSONをRDSへ保存してみた
こんにちは。CX事業本部Delivery部のakkyです。
IoT CoreへMQTT経由で流れてきたデータはそのままデータベースに保存することが多いですが、マスターとしてJSON形式のままS3に保存しておくと、それを後からAthena等から利用したり、またデータベースのバックアップとして使うことができて便利です。
そこで、AWS Glueを使ってS3からRDSへのETLを実験したのですが、簡単にできると思って挑戦してみたものの案外情報がなく、だいぶハマってしまいました。今回はその備忘録です。ご参考になれば幸いです。
なお、今回とは逆の、RDS→S3の手順は、以下の記事をご覧ください。
ハマったポイントまとめ
- 最初にRDSへのコネクタの作成とテーブルのクロールが必要
- RDSへの接続にSecrets Managerは使えないので、パスワードを直接入力する必要がある
事前準備
VPCとIAMの設定
RDSへ接続する場合はVPC内での実行になるため、セキュリティグループの設定と、保存先のS3へアクセスできるようにS3のゲートウェイVPCエンドポイントを設定しておく必要があります。また、Glueが引き受けるIAMロールも設定しておきます。詳しくは、以下の記事をご覧ください。
S3に保存されているデータ
S3には専用のバケットに/year=/month=/day=/hour=/*.json
のファイル名で、次のようなファイルが入っています。
{"timeInSeconds": 1673488530, "offsetInNanos": 544990, "voltage": 103.4, "frequency": 49.93}
RDSの設定
保存先のRDS(MySQL)へテーブルを作っておきます。今回は次のようにしました。
CREATE TABLE acinfotokyo ( time timestamp PRIMARY KEY, voltage double, frequency double )
作業手順
コンソールから行う場合は、次の手順が必要です。
- RDSコネクタの設定
- クローラーによるRDSテーブルのクロール(データベースカタログのテーブルの作成)
- ジョブの作成
- ジョブの実行
コネクタの設定
AWS Glue StudioからConnectorを開き、Create Connectionをクリックします。
ここで必要事項を入力すればよいのですが、ハマりポイントがあります。 Credential typeをsecretにすると、ジョブが実行できないのです!このすぐ次に行うテーブルのクロールは正常に実行できるので、ハマりました。
Secrets Managerを利用している場合でもUsername and passwordを選択し、面倒ですがユーザー名とパスワードをコピペしてください。
この問題に関する情報はrePostに投稿されています。
https://repost.aws/questions/QU8wqr-zM9R5SbTgWbx0wSEQ/aws-glue-connection-with-secrets-manager-issue[.e
具体的には、Jobの実行時にAn error occurred while calling ***.getCatalogSink. None.get
というエラーが表示されました。
なお、今回の手順とは異なりますが、コネクタからジョブを作成しようとすると、ビジュアルエディタからは接続できない旨が表示されます。しかし実際にはビジュアルエディタからコードを書かなくてもきちんとジョブを作れます。
クローラーの設定
Data CatalogのCrawlersからクローラを作成します。ここも画面の指示通りに行えばよいのですが、記載されている注意事項にあるようにMySQLを使っている場合には、Include pathには/%
とだけ入力します。
設定出来たらクローラーをRunします。しばらく経つとData catalogのTablesにDBから取ってきたテーブル定義が反映されているはずです。
なお、現行のコンソールからクローラーを作成するとエラーが発生する場合がありました。その場合は、旧コンソールCrawlers (legacy)から作成すると成功するかもしれません。
ジョブの作成
AWS Glue StudioからJobsを選択して新規ジョブを作成します。SourceにS3、TargetにMySQLを選択してCreateをクリックすると、自動的にテンプレートが生成されます。
まずS3のノードを設定します。
Data source properties - S3のタブを選んでから、バケット名を選びます。Data formatはJSONにします。Output schemaは自動的に認識されました。
次にMySQLのノードを設定します。
DatabaseとTable(それぞれ、Data catalogのデータベースとテーブルのこと)から、先ほどクローラーで生成した項目を選択します。
今回はS3に保存されているJSONとMySQLのテーブル定義が微妙に異なる(ナノ秒を別フィールドに入れている)ので、データ変換を行います。
ActionからSQL Queryノードを追加して、次のようなクエリを書きました。
select timeinseconds+offsetinnanos/1000000 as time, voltage, frequency from myDataSource
同様にActionからTo timestampノードを追加して、timeフィールドをtimestamp型に変換しました。
最終的には、次のようなノード構成になりました。
このJobsをSaveしてRunすると、S3に保存されていたデータがMySQLのテーブルに保存できました!
まとめ
GlueはAthenaを使うときにデータカタログの概念が出てきて、なんだこれ?と思ったきち放置していたのですが、今回ようやくきちんと触ってみました。
S3やAthenaで使うETLの手段として紹介されることが多く、RDSで使う方法があまり出てこなかったのでハマってしましました。
今回は動かすまでに少し時間がかかりましたが、サーバレスで便利に使えるサービスなので、実際の案件でも使ってみたいと思っています。