Redshiftに対してCopyコマンドを発行してみた(JSON取り込み) | Luigi Advent Calendar 2016 #19

2016.12.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』19日目の内容となります。
今回はRedshiftへのJSONの取り込みを行なってみたいと思います。

先日18日目はRedshiftのCopyに用いるManifestを生成してみた。でした。

下準備

取り込みファイルとしては、前日に生成したmanifestをJSONファイルとして取り込んでみたいと思います、
と思ったのですが、RedshiftのCopyコマンドのJSON取り込みは配列1件1件ごとをレコードにするといった取り込みは行なえません。
そのため、jq等を用いてファイルの変換を実施する必要があります。

only3manifest

{
  "entries": [
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json",
      "mandatory": true
    }
  ]
}

上記のjsonファイルに対して、jqを用いてentriesの中身のみを取り出した形にしたいと思います。

$ cat ./only3manifest| jq '.entries[]' > strip3Manifest.json
$ cat ./strip3Manifest.json
{
    "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json",
    "mandatory": true
}
(省略)
{
    "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json",
    "mandatory": true
}

セミコロン等もないので、問題なくRedshiftで扱うことができそうです。

次に上記のjsonを取り込む際に用いるjsonpathの設定ファイルですが、こちらはシンプルな内容になります。

jsonpath.json

{
    "jsonpaths": [
        "$['url']"
    ]
}

既に上記の2つのファイルはS3にアップロードされているものとします。
最後にテーブルとしては以下のようなテーブルを用意します。

create table manifest (url varchar(1024));

Redshiftに対してCopyコマンドを発行してみた(JSON取り込み)

基本はluigiのモジュール側で実装されているので、必要なパラメータを渡すのみで済みます。

userPython.py

import luigi
import luigi.contrib.redshift

class copyJsonPath(luigi.contrib.redshift.S3CopyJSONToTable):

    s3FilePath = luigi.Parameter()
    param = {}
    param["host"] = "[Redshift Host]"
    param["database"] = "[Redshift database]"
    param["password"] = "[Redshift Password]"
    param["user"] = "[Redshift User]"
    param["table"] = "顧客情報"
    param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{}
    param["jsonpath"] = "s3://cm-kajiwara-redshift-load/jsonpath.json"
    param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' "
    param["copy_json_options"] = ""
    param["aws_access_key_id"] = "["aws_access_key_id"]"
    param["aws_secret_access_key"] = "["aws_secret_access_key"]"

    @property
    def host(self):
        return self.param["host"]

    @property
    def database(self):
        return self.param["database"]

    @property
    def user(self):
        return self.param["user"]

    @property
    def password(self):
        return self.param["password"]

    @property
    def table(self):
        return self.param["table"]

    def s3_load_path(self):
        return self.param["s3_load_path"].format(self.s3FilePath)

    @property
    def jsonpath(self):
        return self.param["jsonpath"]

    @property
    def aws_access_key_id(self):
        return self.param["aws_access_key_id"]

    @property
    def aws_secret_access_key(self):
        return self.param["aws_secret_access_key"]

    @property
    def copy_json_options(self):
        return self.param["copy_json_options"]

    @property
    def copy_options(self):
        return self.param["copy_options"]

実行結果

$ python ./useRedshift.py --local-scheduler copyJsonPath --s3FilePath strip3Manifest.json
DEBUG: Checking if copyJsonPath(s3FilePath=strip3Manifest.json) is complete
INFO: Informed scheduler that task   copyJsonPath_strip3Manifest_j_8bec9a68a0   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 36672] Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) running   copyJsonPath(s3FilePath=strip3Manifest.json)
INFO: Inserting file: s3://cm-kajiwara-redshift-load/strip3Manifest.json
INFO: Executing post copy queries
INFO: [pid 36672] Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) done      copyJsonPath(s3FilePath=strip3Manifest.json)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   copyJsonPath_strip3Manifest_j_8bec9a68a0   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=996099161, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=36672) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 copyJsonPath(s3FilePath=strip3Manifest.json)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====
url
s3://cm-kajiwara-redshift-load/100_199/000102.json
s3://cm-kajiwara-redshift-load/100_199/000111.json
s3://cm-kajiwara-redshift-load/100_199/000120.json
s3://cm-kajiwara-redshift-load/100_199/000130.json
s3://cm-kajiwara-redshift-load/100_199/000134.json
s3://cm-kajiwara-redshift-load/100_199/000138.json
s3://cm-kajiwara-redshift-load/100_199/000144.json
s3://cm-kajiwara-redshift-load/100_199/000156.json
s3://cm-kajiwara-redshift-load/100_199/000165.json
s3://cm-kajiwara-redshift-load/100_199/000174.json
s3://cm-kajiwara-redshift-load/100_199/000186.json
s3://cm-kajiwara-redshift-load/100_199/000195.json
s3://cm-kajiwara-redshift-load/100_199/000108.json
s3://cm-kajiwara-redshift-load/100_199/000117.json
s3://cm-kajiwara-redshift-load/100_199/000129.json
s3://cm-kajiwara-redshift-load/100_199/000133.json
s3://cm-kajiwara-redshift-load/100_199/000137.json
s3://cm-kajiwara-redshift-load/100_199/000143.json
s3://cm-kajiwara-redshift-load/100_199/000153.json
s3://cm-kajiwara-redshift-load/100_199/000163.json
s3://cm-kajiwara-redshift-load/100_199/000173.json
s3://cm-kajiwara-redshift-load/100_199/000183.json
s3://cm-kajiwara-redshift-load/100_199/000193.json
s3://cm-kajiwara-redshift-load/100_199/000105.json
s3://cm-kajiwara-redshift-load/100_199/000114.json
s3://cm-kajiwara-redshift-load/100_199/000126.json
s3://cm-kajiwara-redshift-load/100_199/000132.json
s3://cm-kajiwara-redshift-load/100_199/000136.json
s3://cm-kajiwara-redshift-load/100_199/000141.json
s3://cm-kajiwara-redshift-load/100_199/000150.json
s3://cm-kajiwara-redshift-load/100_199/000162.json
s3://cm-kajiwara-redshift-load/100_199/000171.json
s3://cm-kajiwara-redshift-load/100_199/000180.json
s3://cm-kajiwara-redshift-load/100_199/000192.json
s3://cm-kajiwara-redshift-load/100_199/000103.json
s3://cm-kajiwara-redshift-load/100_199/000113.json
s3://cm-kajiwara-redshift-load/100_199/000123.json
s3://cm-kajiwara-redshift-load/100_199/000131.json
s3://cm-kajiwara-redshift-load/100_199/000135.json
s3://cm-kajiwara-redshift-load/100_199/000139.json
s3://cm-kajiwara-redshift-load/100_199/000147.json
s3://cm-kajiwara-redshift-load/100_199/000159.json
s3://cm-kajiwara-redshift-load/100_199/000168.json
s3://cm-kajiwara-redshift-load/100_199/000177.json
s3://cm-kajiwara-redshift-load/100_199/000189.json
s3://cm-kajiwara-redshift-load/100_199/000198.json

まとめ

JSONの取り込み自体も実装自体はLuigi側でされているので、
実際に行う際は、必要なパラメータを与えるだけで取り込めることが確認できました。
明日は、LuigiのRedshiftモジュールをIAM Role取り込み対応にしてみたいと思います。