Cloud Composer(Airflow)でGCSFileTransformOperatorを使ってみる

2022.12.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

最近データパイプラインツールとしてGoogleCloudのCloud Composer(Airflow)をよく使っているのですが、Google Cloud Storage(以降GCS)上のファイルを加工してから次の工程へ流す場合に使われるオペレータを使ってみたのでその内容をまとめます。

Cloud Composer  |  Google Cloud

GCSFileTransformOperator

GCS上のファイルをソースとしてそのファイルを加工してから再度GCSへアップロードする用途としてGCSFileTransformOperatorがあります。このオペレータはソースであるGCS上のファイルをローカルに一時的に保存してから変換スクリプトにてファイを変換し、変換後に再びGCSへアップロードするといった処理を行います。出力先は指定しなければソースファイルを上書きを行い、出力先を指定した場合はそのパスへ変換後のファイルを新規オブジェクトとしてアップロードします。

GCSFileTransformOperatorの使い方

GCSFileTransformOperator(
    task_id="gcs_transform",
    gcp_conn_id="{GCSのConnection}",
    source_bucket="{ソースバケット名}",
    source_object="{ソースオブジェクトのパス}",
    destination_bucket="{出力バケット名}",
    destination_object="{出力オブジェクトのパス}",
    transform_script=['python', 'script.py', 10],
)
  • gcp_conn_id: Airflowの管理コンソールで設定したConnection名
  • source_bucket: ソースオブジェクトのあるバケット
  • source_object: ソースオブジェクトのパス
  • destination_bucket: 出力先のバケット
    • 指定しない場合はsource_bucketと同一になる
  • destination_object: 出力先のオブジェクトパス
    • 指定しない場合はsource_objectと同一になる
  • transform_script: 変換するスクリプトをリスト形式で指定

transform_scriptの書き方ですがリスト形式で指定します。ドキュメントを読んだだけでは少し指定の仕方が理解できなかったのですが、GCSFileTransformOperatorのソースファイルを読んだところリストで渡した値をsubprocess.Popenで実行しているだけだったのでPopenの使い方で記述すれば良いだけです。したがって変換スクリプトではPythonだけでなくシェルスクリプトなども使うことができます。

実際の処理はtransform_scriptのリストにソースファイルと出力先ファイル名を付け加えてPopenで実行しています。変換スクリプトの書き方としてはソースファイルが第一引数、出力ファイルが第二引数として与えられるのでpythonですと以下のような形になります。

import sys
import pandas as pd

src_file=sys.argv[1]
dest_file=sys.argv[2]

df = pd.read_csv(src_file)
# 何らかの変換
df.to_csv(dest_file)

またシェルスクリプトで変換ファイルを行う場合は以下のような形になります。

#! /bin/bash

# (例)tsvをcsvへ変換
tr "\\t" "," < $1 > $2

Cloud ComposerでGCSFileTransformOperatorを使ってみる

では実際にGCSFileTransformOperatorを使ってみたいと思います。処理の内容としてはGCS上にUTF-16でエンコードされたCSVがあり、これをUTF-8に変換するような処理を行います。

環境

  • Composerバージョン: 2.0.28
  • Airflowバージョン: 2.3.3

行う内容としては、AirflowでDAGを作成するdagファイルをPythonで記述します。また変換ファイルはPythonで記述してもよいのですが、ただ単にUTF-16をUTF-8に変換するだけなのでiconvコマンドを使うシェルスクリプトを記述します。

ディレクトリ構成は以下になります。

Composer用のDAGファイルパス
├── transform_utf16.py
└── scripts
    └── transform_utf16.sh

transform_utf16.py

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.google.cloud.operators.gcs import GCSFileTransformOperator

work_dir = os.path.dirname(os.path.abspath(__file__))

with DAG(
    dag_id="transfer_utf16",
    start_date=datetime.now(),
    schedule_interval=None,
    default_args={"retries": 1},
    tags=["gcs2bq"],
) as dag:

    gcs_to_gcs = GCSFileTransformOperator(
        task_id="gcs_transform",
        gcp_conn_id="google_cloud_storage_default",
        source_bucket="sample_bucket",
        source_object="user_events/20221219/utf16_sample.csv",
        destination_object="user_events/20221219/utf8_sample.csv",
        transform_script=["bash", f"{work_dir}/scripts/transform_utf16.sh"],
    )

ファイルを変換して同じパスに別名で保存するためdestination_bucketは指定していません。

transform_utf16.sh

#! /bin/bash

iconv -f UTF-16 -t UTF-8 $1 > $2

ソースファイルは$1で取得でき出力先ファイルは$2で取得できるので上記のような非常に簡単なワンライナーのコードになります。

あとはこれをCloud ComposerでTrigger DAGで実行すればGCS上のUTF-16のcsvファイルがUTF-8のcsvファイルに変換されます。

Cloud Composerを動かしているインスタンスのスペックにもよりますが今回の検証では12GB程度のファイルで14分ほどでUTF-16からUTF-8に変換が行うことができました。

まとめ

GoogleCloudのCloud Composer(Airflow)でGCSFileTransformOperatorを使ってGCS上のファイルを変換スクリプトで変換してみました。データパイプラインにてソースデータを加工することは往々にしてあることなのでGCSFileTransformOperatorは使い勝手が良いと思います。

ただ1点GCSFileTransformOperatorの使いにくい点としてGCSFileTransformOperatorではsource_objectとしてGCSオブジェクトを指定する必要があり、Prefixで複数ファイルを処理することができません。そのような場合はGCSTimeSpanFileTransformOperatorをつかってみるのが良いかと思います。

最後まで読んで頂いてありがとうございました。