RedshiftのCopyに用いるManifestを生成してみた。 | Luigi Advent Calendar 2016 #18

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』18日目の内容となります。
RedshiftのCopyコマンドの発行にManifestを使う事ができますが、
今回はそのManifestファイルの生成を行えるタスクを触ってみました。

先日17日目はRedshiftに対してCopyコマンドを発行してみた でした。

下準備

指定したS3のパス配下にあるファイルをすべて含んだ形でmanifestを作ってくれるジョブのようなので、ちょっと意地悪な構成を作ってみました。

s3://cm-kajiwara-redshift-load
├── 0_99
│   ├── 000000.json
│   ├── 000001.json
(中略)
│   └── 000099.json
├── 100_199
│   ├── 000100.json
│   ├── 000101.json
(中略)
│   └── 000199.json
└── 200_5000
    ├── 000200.json
(中略)
    └── 004999.json
 

上記のようなjson構成を取り込めるManifestを生成したいと思います。

上記の構成を作るにおいて使ったスクリプトは以下になります。

#!/bin/sh
mkdir ./0_99
mkdir ./100_199
mkdir ./200_5000
count=0
while [ $count -ne 100 ]
do
    copyTarget=$(printf ./0_99/%06d.json $count)
    cp ./sample.json $copyTarget
    count=`expr $count + 1`
done
while [ $count -ne 200 ]
do
    copyTarget=$(printf ./100_199/%06d.json $count)
    cp ./sample.json $copyTarget
    count=`expr $count + 1`
done
while [ $count -ne 5000 ]
do
    copyTarget=$(printf ./200_5000/%06d.json $count)
    cp ./sample.json $copyTarget
    count=`expr $count + 1`
done
aws s3 sync ./0_99/ s3://cm-kajiwara-redshift-load 
aws s3 sync ./100_199/ s3://cm-kajiwara-redshift-load
aws s3 sync ./200_5000/ s3://cm-kajiwara-redshift-load

Manifestを生成してみた

import luigi
import luigi.contrib.redshift

class createManifest(luigi.contrib.redshift.RedshiftManifestTask):

    path = "s3://cm-kajiwara-redshift-load/manifest"
    folder_paths = []
    folder_paths.append("s3://cm-kajiwara-redshift-load/0_99")
    folder_paths.append("s3://cm-kajiwara-redshift-load/100_199")
    folder_paths.append("s3://cm-kajiwara-redshift-load/200_5000")

上記だけで、配下に置いたファイルをすべて取り込むmanifestの生成ができます。

実行結果

$ python ./useRedshift.py --local-scheduler createManifest
$ aws s3 cp s3://cm-kajiwara-redshift-load/manifest ./
$ cat ./manifest | jq .
{
  "entries": [
    {
      "url": "s3://cm-kajiwara-redshift-load/0_99/000000.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/0_99/000001.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/0_99/000002.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/0_99/000099.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000100.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000101.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000199.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/200_5000/000200.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/200_5000/000201.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/200_5000/004998.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/200_5000/004999.json",
      "mandatory": true
    }
  ]
}

Manifestを作って処理させたいのは、同一のフォルダ下に処理させたいファイルと扠せたくないファイルが混在している場合かと思うので、
3の倍数と3が付く数字を処理対象としたいと思います。

import luigi
import luigi.contrib.redshift
from luigi.s3 import S3Target


class createOnlyMultiplesOf3ORInclude3Manifest(luigi.contrib.redshift.RedshiftManifestTask):

    path = "s3://cm-kajiwara-redshift-load/only3manifest"
    folder_paths = []
    folder_paths.append("s3://cm-kajiwara-redshift-load/100_199")

    def run(self):
        p = re.compile('.*3.*')
        p2 = re.compile('(\d*).json')
        entries = []
        for folder_path in self.folder_paths:
            s3 = S3Target(folder_path)
            client = self.output().fs
            for file_name in client.list(s3.path):
                if p.match(file_name):
                    entries.append({
                        'url': '%s/%s' % (folder_path, file_name),
                        'mandatory': True
                    })
                else:
                    m = p2.search(file_name)
                    num = int(m.group(1))
                    if num % 3 == 0:
                        entries.append({
                            'url': '%s/%s' % (folder_path, file_name),
                            'mandatory': True
                        })
        manifest = {'entries': entries}
        target = self.output().open('w')
        dump = json.dumps(manifest)
        if not self.text_target:
            dump = dump.encode('utf8')
        target.write(dump)
        target.close()


実行結果

$ python ./useRedshift.py --local-scheduler createOnlyMultiplesOf3ORInclude3Manifest
$ aws s3 cp s3://cm-kajiwara-redshift-load/only3manifest ./
$ cat ./manifest | jq .
{
  "entries": [
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000102.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000103.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000105.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000130.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000131.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000132.json",
      "mandatory": true
    },
(省略)
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000193.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000195.json",
      "mandatory": true
    },
    {
      "url": "s3://cm-kajiwara-redshift-load/100_199/000198.json",
      "mandatory": true
    }
  ]
}

まとめ

今回はManifestファイルの生成を行いました。
次回はJSONを取り込むCopyを実施するタスクを実装します。