Redshiftに対してCopyコマンドを発行してみた(JSON取り込み) | Luigi Advent Calendar 2016 #19
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』19日目の内容となります。
今回はRedshiftへのJSONの取り込みを行なってみたいと思います。
先日18日目はRedshiftのCopyに用いるManifestを生成してみた。でした。
下準備
取り込みファイルとしては、前日に生成したmanifestをJSONファイルとして取り込んでみたいと思います、
と思ったのですが、RedshiftのCopyコマンドのJSON取り込みは配列1件1件ごとをレコードにするといった取り込みは行なえません。
そのため、jq等を用いてファイルの変換を実施する必要があります。
{ "entries": [ { "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json", "mandatory": true }, (省略) { "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json", "mandatory": true } ] }
上記のjsonファイルに対して、jqを用いてentriesの中身のみを取り出した形にしたいと思います。
$ cat ./only3manifest| jq '.entries[]' > strip3Manifest.json $ cat ./strip3Manifest.json { "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json", "mandatory": true } (省略) { "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json", "mandatory": true }
セミコロン等もないので、問題なくRedshiftで扱うことができそうです。
次に上記のjsonを取り込む際に用いるjsonpathの設定ファイルですが、こちらはシンプルな内容になります。
{ "jsonpaths": [ "$['url']" ] }
既に上記の2つのファイルはS3にアップロードされているものとします。
最後にテーブルとしては以下のようなテーブルを用意します。
create table manifest (url varchar(1024));
Redshiftに対してCopyコマンドを発行してみた(JSON取り込み)
基本はluigiのモジュール側で実装されているので、必要なパラメータを渡すのみで済みます。
import luigi import luigi.contrib.redshift class copyJsonPath(luigi.contrib.redshift.S3CopyJSONToTable): s3FilePath = luigi.Parameter() param = {} param["host"] = "[Redshift Host]" param["database"] = "[Redshift database]" param["password"] = "[Redshift Password]" param["user"] = "[Redshift User]" param["table"] = "顧客情報" param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{} param["jsonpath"] = "s3://cm-kajiwara-redshift-load/jsonpath.json" param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' " param["copy_json_options"] = "" param["aws_access_key_id"] = "["aws_access_key_id"]" param["aws_secret_access_key"] = "["aws_secret_access_key"]" @property def host(self): return self.param["host"] @property def database(self): return self.param["database"] @property def user(self): return self.param["user"] @property def password(self): return self.param["password"] @property def table(self): return self.param["table"] def s3_load_path(self): return self.param["s3_load_path"].format(self.s3FilePath) @property def jsonpath(self): return self.param["jsonpath"] @property def aws_access_key_id(self): return self.param["aws_access_key_id"] @property def aws_secret_access_key(self): return self.param["aws_secret_access_key"] @property def copy_json_options(self): return self.param["copy_json_options"] @property def copy_options(self): return self.param["copy_options"]
実行結果
$ python ./useRedshift.py --local-scheduler copyJsonPath --s3FilePath strip3Manifest.json DEBUG: Checking if copyJsonPath(s3FilePath=strip3Manifest.json) is complete INFO: Informed scheduler that task copyJsonPath_strip3Manifest_j_8bec9a68a0 has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 36672] Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) running copyJsonPath(s3FilePath=strip3Manifest.json) INFO: Inserting file: s3://cm-kajiwara-redshift-load/strip3Manifest.json INFO: Executing post copy queries INFO: [pid 36672] Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) done copyJsonPath(s3FilePath=strip3Manifest.json) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task copyJsonPath_strip3Manifest_j_8bec9a68a0 has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 copyJsonPath(s3FilePath=strip3Manifest.json) This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
url |
---|
s3://cm-kajiwara-redshift-load/100_199/000102.json |
s3://cm-kajiwara-redshift-load/100_199/000111.json |
s3://cm-kajiwara-redshift-load/100_199/000120.json |
s3://cm-kajiwara-redshift-load/100_199/000130.json |
s3://cm-kajiwara-redshift-load/100_199/000134.json |
s3://cm-kajiwara-redshift-load/100_199/000138.json |
s3://cm-kajiwara-redshift-load/100_199/000144.json |
s3://cm-kajiwara-redshift-load/100_199/000156.json |
s3://cm-kajiwara-redshift-load/100_199/000165.json |
s3://cm-kajiwara-redshift-load/100_199/000174.json |
s3://cm-kajiwara-redshift-load/100_199/000186.json |
s3://cm-kajiwara-redshift-load/100_199/000195.json |
s3://cm-kajiwara-redshift-load/100_199/000108.json |
s3://cm-kajiwara-redshift-load/100_199/000117.json |
s3://cm-kajiwara-redshift-load/100_199/000129.json |
s3://cm-kajiwara-redshift-load/100_199/000133.json |
s3://cm-kajiwara-redshift-load/100_199/000137.json |
s3://cm-kajiwara-redshift-load/100_199/000143.json |
s3://cm-kajiwara-redshift-load/100_199/000153.json |
s3://cm-kajiwara-redshift-load/100_199/000163.json |
s3://cm-kajiwara-redshift-load/100_199/000173.json |
s3://cm-kajiwara-redshift-load/100_199/000183.json |
s3://cm-kajiwara-redshift-load/100_199/000193.json |
s3://cm-kajiwara-redshift-load/100_199/000105.json |
s3://cm-kajiwara-redshift-load/100_199/000114.json |
s3://cm-kajiwara-redshift-load/100_199/000126.json |
s3://cm-kajiwara-redshift-load/100_199/000132.json |
s3://cm-kajiwara-redshift-load/100_199/000136.json |
s3://cm-kajiwara-redshift-load/100_199/000141.json |
s3://cm-kajiwara-redshift-load/100_199/000150.json |
s3://cm-kajiwara-redshift-load/100_199/000162.json |
s3://cm-kajiwara-redshift-load/100_199/000171.json |
s3://cm-kajiwara-redshift-load/100_199/000180.json |
s3://cm-kajiwara-redshift-load/100_199/000192.json |
s3://cm-kajiwara-redshift-load/100_199/000103.json |
s3://cm-kajiwara-redshift-load/100_199/000113.json |
s3://cm-kajiwara-redshift-load/100_199/000123.json |
s3://cm-kajiwara-redshift-load/100_199/000131.json |
s3://cm-kajiwara-redshift-load/100_199/000135.json |
s3://cm-kajiwara-redshift-load/100_199/000139.json |
s3://cm-kajiwara-redshift-load/100_199/000147.json |
s3://cm-kajiwara-redshift-load/100_199/000159.json |
s3://cm-kajiwara-redshift-load/100_199/000168.json |
s3://cm-kajiwara-redshift-load/100_199/000177.json |
s3://cm-kajiwara-redshift-load/100_199/000189.json |
s3://cm-kajiwara-redshift-load/100_199/000198.json |
まとめ
JSONの取り込み自体も実装自体はLuigi側でされているので、
実際に行う際は、必要なパラメータを与えるだけで取り込めることが確認できました。
明日は、LuigiのRedshiftモジュールをIAM Role取り込み対応にしてみたいと思います。