ローカルファイルに対してカラム操作とファイル出力をしてみた | Luigi Advent Calendar 2016 #06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』6日目の内容となります。
今回はローカルファイルを参照していきたいと思います。

先日5日目は複数のローカルファイルを参照してみるでした。

前回・前々回とファイルの参照を中心にやってきましたが、今回は参照したファイルの加工と加工した結果の出力を行いたいと思います。

下準備

前回と同様のTSVファイルを用います。

1	Customer#000000001	IVhzIApeRb	MOROCCO  0	MOROCCO	AFRICA	25-989-741-2988	BUILDING
2	Customer#000000002	XSTf4,NCwDVaWNe6tE	JORDAN   1	JORDAN	MIDDLE EAST	23-768-687-3665	AUTOMOBILE
3	Customer#000000003	MG9kdTD	ARGENTINA7	ARGENTINA	AMERICA	11-719-748-3364	AUTOMOBILE
4	Customer#000000004	XxVSJsL	EGYPT    4	EGYPT	MIDDLE EAST	14-128-190-5944	MACHINERY
5	Customer#000000005	KvpyuHCplrB84WgAi	CANADA   5	CANADA	AMERICA	13-750-942-6364	HOUSEHOLD
6	Customer#000000006	sKZz0CsnMD7mp4Xd0YrBvx	SAUDI ARA2	SAUDI ARABIA	MIDDLE EAST	30-114-968-4951	AUTOMOBILE
7	Customer#000000007	TcGe5gaZNgVePxU5kR	CHINA    0	CHINA	ASIA	28-190-982-9759	AUTOMOBILE
8	Customer#000000008	I0B10bB0AymmC, 0PrRYBC	PERU     6	PERU	AMERICA	27-147-574-9335	BUILDING
9	Customer#000000009	xKiAFTjUsCuxfele	INDIA    6	INDIA	ASIA	18-338-906-3675	FURNITURE
10	Customer#000000010	6LrEaV6KR6PLVcgl2ArL 	ETHIOPIA 9	ETHIOPIA	AFRICA	15-741-346-9870	HOUSEHOL

このTSVファイルは以下のような構成になっています。

c_custkey c_name c_address c_city c_nation c_region c_phone c_mktsegment
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING
2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE
3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE
4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY
5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD
6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE
7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE
8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING
9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE
10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOLD

今回の実装では、3番目のカラムであるc_addressを削除し、7番目のカラムのc_phoneをマスキングしたTSVファイルを出力するタスクを実装していきます。

ファイルの加工と出力

ファイルの参照を行うタスクの実装は前回と変更ありません。
パラメータでファイル名を受け取れるようにしておき、
outputメソッドでパラメータで受け取ったファイルを返すといった実装になります。

import luigi

class TsvInput(luigi.Task):
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(self.filename)

次にファイルの加工についてですが、Luigiの話ではなくPythonではどうするかといった話になります。
実装内容としては下記のようになります。

class TsvColumnEdit(luigi.Task):
    fileName = luigi.Parameter(default="customer0.tsv")

    def requires(self):
        return TsvInput(self.fileName)

    def output(self):
        return luigi.LocalTarget("translate_" + self.fileName)

    def run(self):
        ## あとで説明するためここでは省略

filenameをluigi.Parameterとすることで、実行時に処理を行うファイル名を指定できるようにしています。
また、今回の処理対象ファイルであるcustomer0.tsvをデフォルトパラメータとして指定しています。

次にrequiresメソッドでファイルを参照しているタスクを依存関係として指定しています。

今回はファイルを出力するタスクのため、outputメソッドを実装しています。
こちらで出力するファイルを指定しています。

カラム削除

実際のタスクの挙動についてはrunメソッドに記載していきます。
そのため、今回のカラム削除や値のマスキングといった処理はすべてrunメソッド内に記載することになります。

    def run(self):
        with self.input().open('r') as input, self.output().open('w') as out_file:
            for line in input:
                c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment = line.split('\t')
                maskedPhone = []
                tmpPhoneArray = c_phone.split('-')
                for i, v in enumerate(tmpPhoneArray):
                    if i == len(tmpPhoneArray) - 1:
                        maskedPhone.append(v)
                    else:
                        maskedPhone.append(re.sub('\d', '*', v))

                out_file.write('{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(c_custkey, c_name, c_city, c_nation, c_region, '-'.join(maskedPhone), c_mktsegment))

まず、requiresメソッドで参照したファイルとoutputメソッドで出力先として指定したファイルをwithステートメントを用いてオブジェクトを取得しています。
splitメソッドに引数にtab(\t)を指定することで、タブを区切り文字とした配列を取得することができます。
その際にその配列の要素数分変数を用意しておくことで、用意した変数に値が格納されることになります。(4行目)

その後、ファイルに書き出す際に、書き出さない変数を含まずに書き出しています。(13行目)

マスキング

先ほど書いたrunメソッドからマスキングに関する個所のみ抜き出します。

                maskedPhone = []
                tmpPhoneArray = c_phone.split('-')
                for i, v in enumerate(tmpPhoneArray):
                    if i == len(tmpPhoneArray) - 1:
                        maskedPhone.append(v)
                    else:
                        maskedPhone.append(re.sub('\d', '*', v))

カラム削除のところで変数に格納した値(c_phone)のマスキングを行います。
マスキング処理をする値はで区切り文字付(-)の電話番号のため、まずは区切り文字毎に分割をします。
これはカラム削除の時に用いたsplitメソッドに引数に区切り文字である"-"を指定することで実現できます。

次に、分割した配列をfor文で処理していきます。
今回は下4桁以外をマスキングするといった処理を実施しました。

for文で処理する際にindexも含めて取得しておき、配列の一番最後の要素以外のときは
正規表現の置き換えで数字をすべて * に置き換えた文字列を配列に追加していきます。
一番最後の要素の際は、そのまま配列に追加しています。

最後に配列を文字列として書き出す際に、joinメソッドを用いることで区切り文字を指定して書き出すことができるので、
-を区切り文字として、マスキングした値を含む配列の中身を出力しています。

最後に実装したタスク全体と、実行結果を記載しておきます。

import luigi
import re


class TsvInput(luigi.Task):
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(self.filename)


class TsvColumnEdit(luigi.Task):
    fileName = luigi.Parameter(default="customer0.tsv")

    def requires(self):
        return TsvInput(self.fileName)

    def output(self):
        return luigi.LocalTarget("translate_" + self.fileName)

    def run(self):
        with self.input().open('r') as input, self.output().open('w') as out_file:
            for line in input:
                c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment = line.split('\t')
                maskedPhone = []
                tmpPhoneArray = c_phone.split('-')
                for i, v in enumerate(tmpPhoneArray):
                    if i == len(tmpPhoneArray) - 1:
                        maskedPhone.append(v)
                    else:
                        maskedPhone.append(re.sub('\d', '*', v))

                out_file.write('{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(c_custkey, c_name, c_city, c_nation, c_region, '-'.join(maskedPhone), c_mktsegment))




if __name__ == '__main__':
    luigi.run()

実行結果

$ python ./UseTsv.py  --local-scheduler TsvColumnEdit > result.txt
DEBUG: Checking if TsvColumnEdit(fileName=customer0.tsv) is complete
DEBUG: Checking if TsvInput(filename=customer0.tsv) is complete
INFO: Informed scheduler that task   TsvColumnEdit_customer0_tsv_41b288805d   has status   PENDING
INFO: Informed scheduler that task   TsvInput_customer0_tsv_3630d0d21c   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 35607] Worker Worker(salt=365300218, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=35607) running   TsvColumnEdit(fileName=customer0.tsv)
INFO: [pid 35607] Worker Worker(salt=365300218, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=35607) done      TsvColumnEdit(fileName=customer0.tsv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TsvColumnEdit_customer0_tsv_41b288805d   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=365300218, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=35607) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 TsvInput(filename=customer0.tsv)
* 1 ran successfully:
    - 1 TsvColumnEdit(fileName=customer0.tsv)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====
1	Customer#000000001	MOROCCO  0	MOROCCO	AFRICA	**-***-***-2988	BUILDING

2	Customer#000000002	JORDAN   1	JORDAN	MIDDLE EAST	**-***-***-3665	AUTOMOBILE

3	Customer#000000003	ARGENTINA7	ARGENTINA	AMERICA	**-***-***-3364	AUTOMOBILE

4	Customer#000000004	EGYPT    4	EGYPT	MIDDLE EAST	**-***-***-5944	MACHINERY

5	Customer#000000005	CANADA   5	CANADA	AMERICA	**-***-***-6364	HOUSEHOLD

6	Customer#000000006	SAUDI ARA2	SAUDI ARABIA	MIDDLE EAST	**-***-***-4951	AUTOMOBILE

7	Customer#000000007	CHINA    0	CHINA	ASIA	**-***-***-9759	AUTOMOBILE

8	Customer#000000008	PERU     6	PERU	AMERICA	**-***-***-9335	BUILDING

9	Customer#000000009	INDIA    6	INDIA	ASIA	**-***-***-3675	FURNITURE

10	Customer#000000010	ETHIOPIA 9	ETHIOPIA	AFRICA	**-***-***-9870	HOUSEHOLD

まとめ

Luigiの実装というよりかはPythonの実装の話になってはいますが、連携したデータに対してマスキング処理をかけることができました。 明日は設定ファイルについて調べてみたいと思います。