Apache AirflowでGCSにあるファイルをBigQueryにロードさせるには、GCSToBigQueryOperatorを使ってワークフローを構築していくことができます。
JSONファイルをロードする際、全てではなく必要なフィールドのみをロードしてみたかったので、方法を残しておこうと思います。
AIrflowのバージョンは、
$ airflow version
2.6.3
です。
jsonファイルはGoogle Cloudのドキュメントに載っているものを利用します。
{"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}
{"id":"2","first_name":"Jane","last_name":"Doe","dob":"1980-10-16","addresses":[{"status":"current","address":"789 Any Avenue","city":"New York","state":"NY","zip":"33333","numberOfYears":"2"},{"status":"previous","address":"321 Main Street","city":"Hoboken","state":"NJ","zip":"44444","numberOfYears":"3"}]}
ここで欲しいものは、
id
, first_name
, last_name
, dob
とします。
schema_fieldsで定義
スキーマのリストをでフィールドを指定します。
※ schema_object'がNULLでautodetectがFalseの場合、パラメータを定義する必要がある
ドキュメントを参考にしながら、DAGのタスク部分を以下のようなコードにしてみました。
GCSToBigQueryOperator(
gcp_conn_id=CONNECTION_ID,
task_id="load",
bucket='airflow-events',
source_objects=["sample/year=2023/month=08/day=01/*.json"],
destination_project_dataset_table=f'{DATASET_NAME}.users',
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
autodetect=False,
ignore_unknown_values=True,
schema_update_options=['ALLOW_FIELD_ADDITION'],
schema_fields=[
{"name": "id", "type": "INTEGER"},
{"name": "first_name", "type": "STRING"},
{"name": "last_name", "type": "STRING"},
{"name": "dob", "type": "DATE"},
],
)
このオペレーターを実装したDAGを実行すると、BigQuery側にデータがロードされていました。
schema_objectで定義
テーブルのスキーマを含む .json ファイルを指すGCSオブジェクト・パスを指定。
※ 'schema_fields' が NULL で autodetect が False の場合に定義する必要があります。
今回の場合だと、以下のような形式でスキーマの情報を定義し、GCSにjsonファイルとして保存しておく必要があります。
[
{
"name": "id",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "first_name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "last_name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "dob",
"type": "DATE",
"mode": "NULLABLE"
}
]
DAGのタスク部分を以下のようなコードにしてみました。
保存したJSONが、例えば
gs://airflow-sample-events/myschema.json
だったとしたら、schema_object
は パスの部分 (myschema.json
) となります。
GCSToBigQueryOperator(
gcp_conn_id=CONNECTION_ID,
task_id="load",
bucket='airflow-events',
source_objects=["sample/year=2023/month=08/day=01/*.json"],
destination_project_dataset_table=f'{DATASET_NAME}.users',
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
autodetect=False,
ignore_unknown_values=True,
schema_update_options=['ALLOW_FIELD_ADDITION'],
schema_object="myschema.json",
)
このオペレーターを実装したDAGを実行すると、schema_objectと同様にBigQuery側にデータがロードされていました。