Cloud Composer(Airflow)でGCSTimeSpanFileTransformOperatorを使ってみる

2023.01.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

前回GoogleCloudのCloud Composer(Airflow)でGCSFileTransformOperatorを使ったファイル変換処理のエントリを書きました。まとめの中で複数ファイルの処理ではGCSTimeSpanFileTransformOperatorのほうが良さそうだとお伝えしたのでそちらも試してみます。

GCSTimeSpanFileTransformOperatorとは

前回ご紹介したGCSFileTransformOperatorではソースであるGCS上のファイルをローカルに一時的に保存してから変換スクリプトにてファイを変換し、変換後に再びGCSへアップロードするといった処理を行いました。GCSFileTransformOperatorに関してはGCSFileTransformOperatorの公式ドキュメント のパラメータの項目からもわかるようにsource_objectでは任意のオブジェクトのパスを指定して与えなくてはならなく、ディレクトリのパスを指定して複数ファイルを処理することはできません。

複数ファイルの変換を行いたい場合はGCSTimeSpanFileTransformOperatorを使う必要があります。このオペレータはDAG実行の期間にGCS上の指定パスに追加・更新されたオブジェクトを検索し、それらのオブジェクトをローカルにダウンロードしてから変換スクリプトにて変換を行い、変換後に再びGCSへアップロードするといった処理を行います。

GCSFileTransformOperatorと違い注意しなくてはならないのが期間です。これはDAG開始時間から次のDAGがスケジュールされている時間までを意味しています。したがって、DAG実行前に作成・更新されたオブジェクトは処理対象から外れます。

airflow.providers.google.cloud.operators.gcs GCSTimeSpanFileTransformOperator— apache-airflow-providers-google Documentation

GCSTimeSpanFileTransformOperatorの使い方

GCSTimeSpanFileTransformOperator(
    task_id="gcs_transform",
    source_bucket="{ソースバケット名}",
    source_prefix="{ソースオブジェクトのパスPrefix}",
    source_gcp_conn_id="{GCSのConnection}",
    destination_bucket="{出力バケット名}",
    destination_prefix="{出力オブジェクトのパスPrefix}",
    destination_gcp_conn_id="{GCSのConnection}",
    transform_script=['python', 'script.py', 10],
)
  • gcp_conn_id: Airflowの管理コンソールで設定したConnection名
  • source_bucket: ソースオブジェクトのあるバケット
  • source_object: ソースオブジェクトのパスPrefix
    • Prefixなので*を使って複数オブジェクトを指定できる
  • destination_bucket: 出力先のバケット
  • destination_object: 出力先のオブジェクトパスPrefix
  • transform_script: 変換するスクリプトをリスト形式で指定

transform_scriptの書き方ですがリスト形式で指定します。こちらもGCSFileTransformOperator同様にリストで渡した値をsubprocess.Popenで実行しているだけなのでPopenの使い方で記述すれば良いだけです。また変換スクリプトではPythonだけでなくシェルスクリプトなども使うことができます。

変換スクリプトの書き方としてはソースファイル群が保存されているローカルディレクトリが第一引数、アップロードされるディレクトリが第二引数として与えられます。また第三引数として期間の開始日時、第四引数として期間の終了日時が与えられます。

Cloud ComposerでGCSTimeSpanFileTransformOperatorを使ってみる

では実際にGCSTimeSpanFileTransformOperatorを使ってみたいと思います。処理の内容としてはGCS上にUTF-16でエンコードされたCSVがあり、これをUTF-8に変換するような前回と同じ処理を行います。

環境

  • Composerバージョン: 2.0.28
  • Airflowバージョン: 2.3.3

行う内容としては、AirflowでDAGを作成するdagファイルをPythonで記述します。変換ファイルは今回もUTF-16をUTF-8に変換するだけなのでお手軽にiconvコマンドを使うシェルスクリプトを記述します。

ディレクトリ構成は以下になります。

Composer用のDAGファイルパス
├── transform_utf16_ts.py
└── scripts
    └── transform_utf16_ts.sh

transform_utf16_ts.py

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.google.cloud.operators.gcs import (
    GCSTimeSpanFileTransformOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator

work_dir = os.path.dirname(os.path.abspath(__file__))

with DAG(
    dag_id="gcs2bq-transfer_json_ts",
    start_date=datetime.now(),
    schedule_interval=None,
    default_args={"retries": 1},
    tags=["gcs2bq"],
) as dag:
    # DAG内のファイル更新時間になるようにファイルをCopyする
    copy_files = GCSToGCSOperator(
        task_id="copy_files",
        source_bucket="sample_bucket",
        source_object="user_events/20230101/7001000000*.json",
        destination_bucket="sample_bucket",
        destination_object="user_events_tgt/20230101/",
    )

    # 実処理
    gcs_transform = GCSTimeSpanFileTransformOperator(
        task_id="gcs_transform",
        source_bucket="sample_bucket",
        source_prefix="user_events_tgt/20230101",
        source_gcp_conn_id="google_cloud_storage_default",
        destination_bucket="sample_bucket",
        destination_prefix="user_events_tgt/20230101",
        destination_gcp_conn_id="google_cloud_storage_default",
        transform_script=["bash", f"{work_dir}/test_script/transform_utf16_ts.sh"],
    )

    copy_files >> gcs_transform

GCSToGCSOperatorですが、GCSTimeSpanFileTransformOperatorでは先程説明した通りsourceパス配下のオブジェクトのうち、DAGが実行された日時以降に作成・更新されたオブジェクトが対象となるため、DAG中でオブジェクトをコピーしてオブジェクト作成日が期間に含まれるようにしています。

transform_utf16_ts.sh

transform_utf16_ts.sh

#! /bin/bash

find $1 -name "*.csv"  | while read -r fname
do
  iconv -f UTF-16 -t UTF-8 $fname > "utf8-$fname"
done

find $1 -name "utf8-*.csv"  | xargs -I% mv % $2

ソースディレクトリは$1で取得できアップロードディレクトリは$2で取得できるので上記のような非常に簡単なワンライナーのコードになります。

GCSTimeSpanFileTransformOperatorの動きとしては

  1. $1に対象のパスからオブジェクトがダウンロードされる
  2. スクリプトが実行される
  3. $2にあるファイルがdestinationにアップロードされる

といった流れになるため、2で行われるのがファイル個別のパスが与えられるのではなく、あくまでディレクトリが渡されることに注意してスクリプトを記述する必要があります。

あとはこれをCloud ComposerでTrigger DAGで実行すればGCS上のUTF-16のcsvファイル群がUTF-8のcsvファイルに変換されます。

まとめ

GoogleCloudのCloud Composer(Airflow)でGCSTimeSpanFileTransformOperatorを使ってGCS上のファイルを変換スクリプトで変換してみました。GCSFileTransformOperatorと違って複数オブジェクトを変換対象にできるため幅広い使い方ができるかと思います。

注意点としてはスクリプトに与えられるのがパスであるという点と期間内に作成・変更されたオブジェクトのみが対象になる点です。

最後まで読んで頂いてありがとうございました。