
Redshiftに対してCopyコマンドを発行してみた(JSON取り込み) | Luigi Advent Calendar 2016 #19
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』19日目の内容となります。
今回はRedshiftへのJSONの取り込みを行なってみたいと思います。
先日18日目はRedshiftのCopyに用いるManifestを生成してみた。でした。
下準備
取り込みファイルとしては、前日に生成したmanifestをJSONファイルとして取り込んでみたいと思います、
と思ったのですが、RedshiftのCopyコマンドのJSON取り込みは配列1件1件ごとをレコードにするといった取り込みは行なえません。
そのため、jq等を用いてファイルの変換を実施する必要があります。
{
  "entries": [
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json",
      "mandatory": true
    }
  ]
}
上記のjsonファイルに対して、jqを用いてentriesの中身のみを取り出した形にしたいと思います。
$ cat ./only3manifest| jq '.entries[]' > strip3Manifest.json
$ cat ./strip3Manifest.json
{
    "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json",
    "mandatory": true
}
(省略)
{
    "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json",
    "mandatory": true
}
セミコロン等もないので、問題なくRedshiftで扱うことができそうです。
次に上記のjsonを取り込む際に用いるjsonpathの設定ファイルですが、こちらはシンプルな内容になります。
{
    "jsonpaths": [
        "$['url']"
    ]
}
既に上記の2つのファイルはS3にアップロードされているものとします。
最後にテーブルとしては以下のようなテーブルを用意します。
create table manifest (url varchar(1024));
Redshiftに対してCopyコマンドを発行してみた(JSON取り込み)
基本はluigiのモジュール側で実装されているので、必要なパラメータを渡すのみで済みます。
import luigi
import luigi.contrib.redshift
class copyJsonPath(luigi.contrib.redshift.S3CopyJSONToTable):
    s3FilePath = luigi.Parameter()
    param = {}
    param["host"] = "[Redshift Host]"
    param["database"] = "[Redshift database]"
    param["password"] = "[Redshift Password]"
    param["user"] = "[Redshift User]"
    param["table"] = "顧客情報"
    param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{}
    param["jsonpath"] = "s3://cm-kajiwara-redshift-load/jsonpath.json"
    param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' "
    param["copy_json_options"] = ""
    param["aws_access_key_id"] = "["aws_access_key_id"]"
    param["aws_secret_access_key"] = "["aws_secret_access_key"]"
    @property
    def host(self):
        return self.param["host"]
    @property
    def database(self):
        return self.param["database"]
    @property
    def user(self):
        return self.param["user"]
    @property
    def password(self):
        return self.param["password"]
    @property
    def table(self):
        return self.param["table"]
    def s3_load_path(self):
        return self.param["s3_load_path"].format(self.s3FilePath)
    @property
    def jsonpath(self):
        return self.param["jsonpath"]
    @property
    def aws_access_key_id(self):
        return self.param["aws_access_key_id"]
    @property
    def aws_secret_access_key(self):
        return self.param["aws_secret_access_key"]
    @property
    def copy_json_options(self):
        return self.param["copy_json_options"]
    @property
    def copy_options(self):
        return self.param["copy_options"]
実行結果
$ python ./useRedshift.py --local-scheduler copyJsonPath --s3FilePath strip3Manifest.json
DEBUG: Checking if copyJsonPath(s3FilePath=strip3Manifest.json) is complete
INFO: Informed scheduler that task   copyJsonPath_strip3Manifest_j_8bec9a68a0   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 36672] Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) running   copyJsonPath(s3FilePath=strip3Manifest.json)
INFO: Inserting file: s3://cm-kajiwara-redshift-load/strip3Manifest.json
INFO: Executing post copy queries
INFO: [pid 36672] Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) done      copyJsonPath(s3FilePath=strip3Manifest.json)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   copyJsonPath_strip3Manifest_j_8bec9a68a0   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 copyJsonPath(s3FilePath=strip3Manifest.json)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
| url | 
|---|
| s3://cm-kajiwara-redshift-load/100_199/000102.json | 
| s3://cm-kajiwara-redshift-load/100_199/000111.json | 
| s3://cm-kajiwara-redshift-load/100_199/000120.json | 
| s3://cm-kajiwara-redshift-load/100_199/000130.json | 
| s3://cm-kajiwara-redshift-load/100_199/000134.json | 
| s3://cm-kajiwara-redshift-load/100_199/000138.json | 
| s3://cm-kajiwara-redshift-load/100_199/000144.json | 
| s3://cm-kajiwara-redshift-load/100_199/000156.json | 
| s3://cm-kajiwara-redshift-load/100_199/000165.json | 
| s3://cm-kajiwara-redshift-load/100_199/000174.json | 
| s3://cm-kajiwara-redshift-load/100_199/000186.json | 
| s3://cm-kajiwara-redshift-load/100_199/000195.json | 
| s3://cm-kajiwara-redshift-load/100_199/000108.json | 
| s3://cm-kajiwara-redshift-load/100_199/000117.json | 
| s3://cm-kajiwara-redshift-load/100_199/000129.json | 
| s3://cm-kajiwara-redshift-load/100_199/000133.json | 
| s3://cm-kajiwara-redshift-load/100_199/000137.json | 
| s3://cm-kajiwara-redshift-load/100_199/000143.json | 
| s3://cm-kajiwara-redshift-load/100_199/000153.json | 
| s3://cm-kajiwara-redshift-load/100_199/000163.json | 
| s3://cm-kajiwara-redshift-load/100_199/000173.json | 
| s3://cm-kajiwara-redshift-load/100_199/000183.json | 
| s3://cm-kajiwara-redshift-load/100_199/000193.json | 
| s3://cm-kajiwara-redshift-load/100_199/000105.json | 
| s3://cm-kajiwara-redshift-load/100_199/000114.json | 
| s3://cm-kajiwara-redshift-load/100_199/000126.json | 
| s3://cm-kajiwara-redshift-load/100_199/000132.json | 
| s3://cm-kajiwara-redshift-load/100_199/000136.json | 
| s3://cm-kajiwara-redshift-load/100_199/000141.json | 
| s3://cm-kajiwara-redshift-load/100_199/000150.json | 
| s3://cm-kajiwara-redshift-load/100_199/000162.json | 
| s3://cm-kajiwara-redshift-load/100_199/000171.json | 
| s3://cm-kajiwara-redshift-load/100_199/000180.json | 
| s3://cm-kajiwara-redshift-load/100_199/000192.json | 
| s3://cm-kajiwara-redshift-load/100_199/000103.json | 
| s3://cm-kajiwara-redshift-load/100_199/000113.json | 
| s3://cm-kajiwara-redshift-load/100_199/000123.json | 
| s3://cm-kajiwara-redshift-load/100_199/000131.json | 
| s3://cm-kajiwara-redshift-load/100_199/000135.json | 
| s3://cm-kajiwara-redshift-load/100_199/000139.json | 
| s3://cm-kajiwara-redshift-load/100_199/000147.json | 
| s3://cm-kajiwara-redshift-load/100_199/000159.json | 
| s3://cm-kajiwara-redshift-load/100_199/000168.json | 
| s3://cm-kajiwara-redshift-load/100_199/000177.json | 
| s3://cm-kajiwara-redshift-load/100_199/000189.json | 
| s3://cm-kajiwara-redshift-load/100_199/000198.json | 
まとめ
JSONの取り込み自体も実装自体はLuigi側でされているので、
実際に行う際は、必要なパラメータを与えるだけで取り込めることが確認できました。
明日は、LuigiのRedshiftモジュールをIAM Role取り込み対応にしてみたいと思います。













