S3のファイル一覧習得と削除を行なってみた | Luigi Advent Calendar 2016 #10

2016.12.10

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』10日目の内容となります。
今回はS3のファイル一覧習得と削除を行なってみたいと思います。

先日9日目はS3のファイルを参照してみたでした。

LuigiはS3の操作用にBotoをラップしたモジュールを提供しています。
前回、前々回とファイルの書き込み・参照を行なっていました。
今回はLuigiのS3モジュールが提供している他の機能について調べてみます。

下準備

S3に以下のようなファイルが置かれていることを前提とします。

$ aws s3 ls  s3://cm-kajiwara-luigi-study/target
2016-11-18 20:34:50          0 target1.tsv
2016-11-18 20:34:59          0 target2.tsv
2016-11-18 20:35:06          0 target3.tsv
2016-11-18 20:35:12          0 target4.tsv
2016-11-18 20:35:19          0 target5.tsv

S3からの一覧習得・削除

一度のタスク処理で一覧から一つだけ取得し処理を行いS3の別のパスに配置し、配置済みのファイルは削除すると言ったタスクを実装してみたいと思います。
なんと、ここで落とし穴にハマりました。
LuigiのS3モジュールのListメソッドは前方一致でのファイル習得が行なえません。
そのため、上記のような構成でファイルを取得するためには、ディレクトリ単位で取得してローカルで必要な項目のみ取り出すといったことが必要になります。

今回の作例ではその点も考慮した作りになっています。
本来は、S3側で絞れる分は絞るべきと考えるのですが、その場合はluigiのS3モジュールを継承した形で実装する必要が出てくるものと思われます。

UseS3.py

import luigi
import luigi.s3
import re


class S3FileFlowSample(luigi.Task):
    path = luigi.Parameter(default="s3://cm-kajiwara-luigi-study")

    def run(self):
        p = re.compile("target*")
        for s3FilePath in luigi.s3.S3Client().list(path=self.path, return_key=False):
            if p.match(s3FilePath):
                luigi.s3.S3Client().remove("s3://cm-kajiwara-luigi-study/{}".format(s3FilePath))


if __name__ == '__main__':
    luigi.run()

実行結果

実行前S3環境

$ aws s3 ls  s3://cm-kajiwara-luigi-study/target
2016-11-18 20:34:50          0 target1.tsv
2016-11-18 20:34:59          0 target2.tsv
2016-11-18 20:35:06          0 target3.tsv
2016-11-18 20:35:12          0 target4.tsv
2016-11-18 20:35:19          0 target5.tsv

Luigi実行結果

ython ./UseS3.py --local-scheduler S3FileFlowSample
DEBUG: Checking if S3FileFlowSample(path=s3://cm-kajiwara-luigi-study) is complete
/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py:295: UserWarning: Task S3FileFlowSample(path=s3://cm-kajiwara-luigi-study) without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   S3FileFlowSample_s3___cm_kajiwara_5bb07c65b9   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 97087] Worker Worker(salt=759903535, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=97087) running   S3FileFlowSample(path=s3://cm-kajiwara-luigi-study)
DEBUG: Deleting target1.tsv from bucket cm-kajiwara-luigi-study
DEBUG: Deleting target2.tsv from bucket cm-kajiwara-luigi-study
DEBUG: Deleting target3.tsv from bucket cm-kajiwara-luigi-study
DEBUG: Deleting target4.tsv from bucket cm-kajiwara-luigi-study
DEBUG: Deleting target5.tsv from bucket cm-kajiwara-luigi-study
INFO: [pid 97087] Worker Worker(salt=759903535, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=97087) done      S3FileFlowSample(path=s3://cm-kajiwara-luigi-study)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   S3FileFlowSample_s3___cm_kajiwara_5bb07c65b9   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=759903535, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=97087) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 S3FileFlowSample(path=s3://cm-kajiwara-luigi-study)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

実行後S3環境

$ aws s3 ls  s3://cm-kajiwara-luigi-study/target

まとめ

S3に対してのファイルの一覧習得やファイル削除が行えることが分かりました。
明日からは各DBの環境から値の取得するタスクを作成していきたいと思います。