GCSToBigQueryOperatorでGCSからBigQueryにJSONの必要なフィールドのみロードしたい

2023.08.23

Apache AirflowでGCSにあるファイルをBigQueryにロードさせるには、GCSToBigQueryOperatorを使ってワークフローを構築していくことができます。

JSONファイルをロードする際、全てではなく必要なフィールドのみをロードしてみたかったので、方法を残しておこうと思います。

AIrflowのバージョンは、

$ airflow version  
2.6.3

です。

jsonファイルはGoogle Cloudのドキュメントに載っているものを利用します。

{"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}
{"id":"2","first_name":"Jane","last_name":"Doe","dob":"1980-10-16","addresses":[{"status":"current","address":"789 Any Avenue","city":"New York","state":"NY","zip":"33333","numberOfYears":"2"},{"status":"previous","address":"321 Main Street","city":"Hoboken","state":"NJ","zip":"44444","numberOfYears":"3"}]}

ここで欲しいものは、

id, first_name, last_name, dob とします。

schema_fieldsで定義

スキーマのリストをでフィールドを指定します。

※ schema_object'がNULLでautodetectがFalseの場合、パラメータを定義する必要がある

ドキュメントを参考にしながら、DAGのタスク部分を以下のようなコードにしてみました。

GCSToBigQueryOperator(
        gcp_conn_id=CONNECTION_ID,
        task_id="load",
        bucket='airflow-events',
        source_objects=["sample/year=2023/month=08/day=01/*.json"],
        destination_project_dataset_table=f'{DATASET_NAME}.users',
        source_format='NEWLINE_DELIMITED_JSON',
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED',
        autodetect=False,
        ignore_unknown_values=True,
        schema_update_options=['ALLOW_FIELD_ADDITION'],
        schema_fields=[
                {"name": "id", "type": "INTEGER"},
                {"name": "first_name", "type": "STRING"},
                {"name": "last_name", "type": "STRING"},
                {"name": "dob", "type": "DATE"},
            ],  
)

このオペレーターを実装したDAGを実行すると、BigQuery側にデータがロードされていました。

schema_objectで定義

テーブルのスキーマを含む .json ファイルを指すGCSオブジェクト・パスを指定。

※ 'schema_fields' が NULL で autodetect が False の場合に定義する必要があります。

今回の場合だと、以下のような形式でスキーマの情報を定義し、GCSにjsonファイルとして保存しておく必要があります。

[
    {
        "name": "id",
        "type": "INTEGER",
        "mode": "NULLABLE"
    },
    {
        "name": "first_name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "last_name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "dob",
        "type": "DATE",
        "mode": "NULLABLE"
    }
 ]

DAGのタスク部分を以下のようなコードにしてみました。

保存したJSONが、例えば

gs://airflow-sample-events/myschema.json

だったとしたら、schema_objectは パスの部分 (myschema.json) となります。

GCSToBigQueryOperator(
        gcp_conn_id=CONNECTION_ID,
        task_id="load",
        bucket='airflow-events',
        source_objects=["sample/year=2023/month=08/day=01/*.json"],
        destination_project_dataset_table=f'{DATASET_NAME}.users',
        source_format='NEWLINE_DELIMITED_JSON',
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED',
        autodetect=False,
        ignore_unknown_values=True,
        schema_update_options=['ALLOW_FIELD_ADDITION'],
        schema_object="myschema.json",
)

このオペレーターを実装したDAGを実行すると、schema_objectと同様にBigQuery側にデータがロードされていました。