Airflow で GCS のデータを BigQuery にロードしてみた

2020.03.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

なんとなく Airflow をさわれるようになったものの、まだあまりバッチ処理の実行などの実処理部分を動かしてません。

Airflow で GCP 関連の処理実行してみたいなー。

やりたいこと

  • Airflow の GCP 関連オペレーターにどんなものがあるのか知りたい
  • Airflow で BigQuery にデータをロードしたい

Airflow の GCP 関連オペレーター

Airflow のJob( DAG )の実処理は Operator に記述されており、BashOperatorPythonOperator など、デフォルトでも様々なオペレーターが用意されていますが、GCP 関連のオペレーターもあるようです。

GCE や GCS、Cloud SQL に BigQuery など、GCP 関連処理には十分対応できそうです。

組み合わせれば、BigQuery へのデータロードも問題なくできそうな予感。

サンプル DAG で、GCS から BigQuery にデータをロードしてみる

サンプルで準備されている DAG を確認していると、example_gcs_to_bq_operator.py を発見しました。

これはそのものズバリ、GCS から BigQuery にデータロードしてくれる DAG では? と思ったので、とりあえず動かしてみたい!

コード確認すると、以下の処理を行っているようです。

  1. BigQuery のデータセットを作成
  2. 1 で作成したデータセットに、GCS に配置済みのファイルデータをロード
  3. 2 でデータロードしたデータセットを削除

実際の動きを、Amaxon Linux2 にインストール済みの Airflow v1.10.9 環境で確認しました。

なお、サーバーから bq コマンドの実行ができて、BigQuery API を使うためのアカウントキーファイルがサーバー上に配置してあることを前提としております。

BigQuery の接続情報設定

Airflow 管理画面の「Admin」タブから、BigQuery 接続情報を設定します。

「Connections」画面から「bigquery_default」接続設定の編集アイコンをクリックします。

「Project Id」、「Keyfile Path」、「copes (comma separated)」の項目を入力し、「Save」ボタンをクリック。

なお、各項目の設定値は、以下のページを参考にしました。

GCS にロード用データを配置

GCS に test-cm-mikami バケット を作成し、その下に airflow ディレクトリを作成して、データファイルを配置しました。

ロードデータは、BigQuery のスタートガイドにリンクのあった、一般公開データセットの アメリカの赤ちゃんの名前データです。

サンプル DAG を配置して実行

以下のサンプル DAG を動かしてみます。

/airflow/contrib/example_dags/example_gcs_to_bq_operator.py

バケット名やソースデータファイルの配置場所、スキーマなどを編集し、さらにロード後のデータを確認するためデータセット削除処理はコメントアウトして、airflow.cfg の「dags_folder」で指定されているディレクトリに配置しました。

example_gcs_to_bq_operator.py

(省略)
if gcs_to_bq is not None:
    args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = models.DAG(
        dag_id='example_gcs_to_bq_operator', default_args=args,
        schedule_interval=None)

    create_test_dataset = bash_operator.BashOperator(
        task_id='create_airflow_test_dataset',
        bash_command='bq mk airflow_test',
        dag=dag)

    # [START howto_operator_gcs_to_bq]
    load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
        task_id='gcs_to_bq_example',
# mod mikami
#        bucket='cloud-samples-data',
        bucket='test-cm-mikami',
#        source_objects=['bigquery/us-states/us-states.csv'],
        source_objects=['airflow/yob2000.txt'],
#        destination_project_dataset_table='airflow_test.gcs_to_bq_table',
        destination_project_dataset_table='airflow_test.name_2000',
#        schema_fields=[
#            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
#            {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
#        ],
        schema_fields=[
            {'name': 'name', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'gender', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'count', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        ],
        write_disposition='WRITE_TRUNCATE',
        dag=dag)
    # [END howto_operator_gcs_to_bq]

# del mikami
#    delete_test_dataset = bash_operator.BashOperator(
#        task_id='delete_airflow_test_dataset',
#        bash_command='bq rm -rf airflow_test',
#        dag=dag)

#    create_test_dataset >> load_csv >> delete_test_dataset
    create_test_dataset >> load_csv

管理画面から実行します。

実行完了後、GCP の管理画面から BigQuery を確認してみると・・・

無事、新しいデータセットが作成されて、GCS に配置したデータがロードされました。

つまずいたところ

サンプルで準備されてるほどだし、たぶん簡単に挙動確認できるでしょ? と、甘く見てましたが、サンプル DAG の正常終了までに、実は何度もつまずきました。。

サンプル DAG が表示されない

動作確認に使用した Amaxon Linux2 では、Airflow インストール前に bq コマンドや BigQuery API の動作確認してました。

google-cloud-sdk のインストールや API キーの認証設定も実施済みだったので、lib のインストールなども不要かと思い、初め何も考えずに サンプル DAG ファイルを配置したところ、Airflow 管理画面に example_gcs_to_bq_operator が表示されません。。

DAG のディレクトリがおかしい? Airflow Server 再起動しないとダメ? など考えましたが一向に表示されず。。

デバッグ目的で DAG ファイルを、Exception を Throw するように変更してみたところ・・・

example_gcs_to_bq_operator.py

(省略)
gcs_to_bq = None  # type: Any
try:
    from airflow.contrib.operators import gcs_to_bq
except ImportError:
##    pass
    raise ImportError
(省略)

まさに、ImportError になってました。。

[2020-03-13 04:18:54,290] {dagbag.py:246} ERROR - Failed to import: /home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py
Traceback (most recent call last):
  File "/home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py", line 28, in <module>
    from airflow.contrib.operators import gcs_to_bq
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 22, in <module>
    from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcs_hook.py", line 25, in <module>
    from google.cloud import storage
ModuleNotFoundError: No module named 'google'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/dagbag.py", line 243, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/home/ec2-user/test_airflow/lib64/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py", line 31, in <module>
    raise ImportError
ImportError

ライブラリ足りなかったかなー。。 で、ちょっとちゃんとドキュメント読みました。。。

pip install 'apache-airflow[gcp]' 実行したら、ようやく Import エラー解消されて、DAG が表示されるようになりました。

ImportError: cannot import name opentype

google ライブラリは Import できるようになったものの、また出た。。

[2020-03-13 08:02:25,478] {bash_operator.py:115} INFO - Running command: bq mk airflow_test
[2020-03-13 08:02:25,482] {bash_operator.py:122} INFO - Output:
[2020-03-13 08:02:25,772] {bash_operator.py:126} INFO - Traceback (most recent call last):
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/platform/bq/bq.py", line 57, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -     import oauth2client_4_0.service_account
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/service_account.py", line 27, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -     from oauth2client_4_0 import crypt
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/crypt.py", line 24, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -     from oauth2client_4_0 import _pure_python_crypt
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/_pure_python_crypt.py", line 25, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -     from pyasn1_modules.rfc2459 import Certificate
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/pyasn1_modules/rfc2459.py", line 21, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO -     from pyasn1.type import opentype
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - ImportError: cannot import name opentype
[2020-03-13 08:02:25,820] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-03-13 08:02:25,830] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

google 先生に聞いてみたら、pyasn1 のバージョン問題の可能性が高い? と思って lib をアップデートしようとしたけど、依存関係があるため上がらず。。

(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip install -U pyasn1 pyasn1-modules
Requirement already up-to-date: pyasn1 in ./test_airflow/lib/python3.7/site-packages (0.4.8)
Requirement already up-to-date: pyasn1-modules in ./test_airflow/lib/python3.7/site-packages (0.2.8)
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip list | grep pyasn1
pyasn1                         0.4.8
pyasn1-modules                 0.2.8

pyasn1.type の opentype がいないと怒られてるけど、実ソース確認したら、ちゃんといるんだけどなぁ。。。

(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip show pyasn1
Name: pyasn1
Version: 0.4.8
Summary: ASN.1 types and codecs
Home-page: https://github.com/etingof/pyasn1
Author: Ilya Etingof
Author-email: etingof@gmail.com
License: BSD
Location: /home/ec2-user/test_airflow/lib/python3.7/site-packages
Requires:
Required-by: rsa, pyasn1-modules
(test_airflow) [ec2-user@ip-10-0-43-239 type]$ pwd
/home/ec2-user/test_airflow/lib/python3.7/site-packages/pyasn1/type
(test_airflow) [ec2-user@ip-10-0-43-239 type]$ ls -l
total 232
-rw-rw-r-- 1 ec2-user ec2-user  22386 Mar 13 04:28 base.py
-rw-rw-r-- 1 ec2-user ec2-user  11397 Mar 13 04:28 char.py
-rw-rw-r-- 1 ec2-user ec2-user  22132 Mar 13 04:28 constraint.py
-rw-rw-r-- 1 ec2-user ec2-user    246 Mar 13 04:28 error.py
-rw-rw-r-- 1 ec2-user ec2-user     59 Mar 13 04:28 __init__.py
-rw-rw-r-- 1 ec2-user ec2-user  16368 Mar 13 04:28 namedtype.py
-rw-rw-r-- 1 ec2-user ec2-user   4886 Mar 13 04:28 namedval.py
-rw-rw-r-- 1 ec2-user ec2-user   2848 Mar 13 04:28 opentype.py
drwxrwxr-x 2 ec2-user ec2-user   4096 Mar 13 04:28 __pycache__
-rw-rw-r-- 1 ec2-user ec2-user   2998 Mar 13 04:28 tagmap.py
-rw-rw-r-- 1 ec2-user ec2-user   9486 Mar 13 04:28 tag.py
-rw-rw-r-- 1 ec2-user ec2-user 108921 Mar 13 04:28 univ.py
-rw-rw-r-- 1 ec2-user ec2-user   5368 Mar 13 04:28 useful.py

どっか別のところ見てる?( venv 環境変えたりしてるのが影響してるのかなぁ・・・?

結局、google-cloud-sdk をインストールしなおして解決しました。

ERROR: (bq) You do not currently have an active account selected.

サンプル DAG では、まず bq コマンドで BigQuery のデータセットを作成します。

が、コマンド実行でアカウント設定エラー。。

[2020-03-13 09:19:52,097] {bash_operator.py:115} INFO - Running command: bq mk airflow_test
[2020-03-13 09:19:52,101] {bash_operator.py:122} INFO - Output:
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - ERROR: (bq) You do not currently have an active account selected.
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - Please run:
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - 
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO -   $ gcloud auth login
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - 
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - to obtain new credentials, or if you have already logged in with a
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - different account:
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - 
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO -   $ gcloud config set account ACCOUNT
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - 
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - to select an already authenticated account to use.
[2020-03-13 09:19:52,797] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-03-13 09:19:52,810] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

コマンドラインから直接 bq コマンド実行しても、同じエラー発生。 google-cloud-sdk インストールし直したので、そりゃそうですよね。。

認証情報設定のため、gcloud init コマンド実行して、解決。

OSError: [Errno 12] Cannot allocate memory

なんですと? メモリが足りないですと? そんな高スペックが必要なのかしら?(メモリ関連のエラーって、けっこうドキッとしてしまいます。。。

[2020-03-13 09:40:30,323] {bash_operator.py:115} INFO - Running command: bq mk airflow_test
[2020-03-13 09:40:30,327] {bash_operator.py:122} INFO - Output:
[2020-03-13 09:40:31,269] {bash_operator.py:126} INFO - Traceback (most recent call last):
[2020-03-13 09:40:31,271] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 83, in <module>
[2020-03-13 09:40:31,271] {bash_operator.py:126} INFO -     exceptions.HandleError(e, 'bq')
[2020-03-13 09:40:31,271] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/calliope/exceptions.py", line 526, in HandleError
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     core_exceptions.reraise(exc)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/core/exceptions.py", line 111, in reraise
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     six.reraise(type(exc_value), exc_value, tb)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 81, in <module>
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     main()
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 76, in main
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     'platform/bq', 'bq.py', *args)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bootstrapping.py", line 44, in ExecutePythonTool
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     execution_utils.ArgsForPythonTool(_FullPath(tool_dir, exec_name), *args))
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bootstrapping.py", line 105, in _ExecuteTool
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     execution_utils.Exec(args + sys.argv[1:], env=_GetToolEnv())
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/core/execution_utils.py", line 300, in Exec
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     p = subprocess.Popen(args, env=env, **extra_popen_kwargs)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/usr/lib64/python2.7/subprocess.py", line 394, in __init__
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     errread, errwrite)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -   File "/usr/lib64/python2.7/subprocess.py", line 938, in _execute_child
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO -     self.pid = os.fork()
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - OSError: [Errno 12] Cannot allocate memory
[2020-03-13 09:40:31,323] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-03-13 09:40:31,339] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

こちらも bq コマンド実行時のエラーだったので、試しにまたコンソールから直接 bq コマンド実行してみると、今度はすんなり通った。。(なら、Airflow からでも行けるんじゃない・・・?

で、実行中のプロセス確認してみると・・・

(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ ps aux | grep airflow
ec2-user  4372  0.1  7.0 415996 70888 ?        S    04:03   0:24 airflow scheduler -- DagFileProcessorManager
ec2-user 11597  0.1  7.9 455620 80092 ?        S    07:27   0:14 airflow scheduler -- DagFileProcessorManager
ec2-user 12521  0.1  8.7 449744 88392 pts/2    S+   07:55   0:10 /home/ec2-user/test_airflow/bin/python3 /home/ec2-user/test_airflow/bin/airflo  webserver -p 8080
ec2-user 12524  0.0  6.8 421624 68988 pts/2    S+   07:55   0:02 gunicorn: master [airflow-webserver]
ec2-user 15880  0.0  7.9 455636 80140 ?        S    09:19   0:02 airflow scheduler -- DagFileProcessorManager
ec2-user 17714  0.3  9.0 449976 91220 pts/2    S+   10:08   0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17730  0.5  9.0 449976 91224 pts/2    S+   10:08   0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17748  0.8  9.0 449976 91220 pts/2    S+   10:09   0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17766  2.6  9.0 449980 91216 pts/2    S+   10:09   0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17778  0.0  0.0 119416   964 pts/5    S+   10:10   0:00 grep --color=auto airflow

airflow scheduler ってこんなに多かったっけ? デーモンが残っているようです。。

念のため、サーバー再起動したら、エラー出なくなりました。

ERROR - INTERNAL: No default project is specified

うん、そういえば、GCP のプロジェクトID、どこでも指定してなかった気がするw

[2020-03-13 10:14:28,680] {taskinstance.py:1128} ERROR - INTERNAL: No default project is specified
Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute
    encryption_configuration=self.encryption_configuration)
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1212, in run_load
    var_name='destination_project_dataset_table')
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 2189, in _split_tablename
    raise ValueError("INTERNAL: No default project is specified")
ValueError: INTERNAL: No default project is specified

どこで設定すればいいんだっけ? と調べて、Airflow 管理画面から各種接続設定ができることを知りました。

ERROR - Invalid key JSON.

まだ正常系通らない。。

[2020-03-13 11:06:56,491] {taskinstance.py:1128} ERROR - Invalid key JSON.
Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 165, in _get_credentials
    keyfile_dict = json.loads(keyfile_dict)
  File "/usr/lib64/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 246, in execute
    conn = bq_hook.get_conn()
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 68, in get_conn
    service = self.get_service()
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 82, in get_service
    http_authorized = self._authorize()
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 195, in _authorize
    credentials = self._get_credentials()
  File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 179, in _get_credentials
    raise AirflowException('Invalid key JSON.')
airflow.exceptions.AirflowException: Invalid key JSON.

例外吐いてるソースコードは以下。

gcp_api_base_hook.py

(省略)
        if not key_path and not keyfile_dict:
            self.log.info('Getting connection using `google.auth.default()` '
                          'since no key file is defined for hook.')
            credentials, _ = google.auth.default(scopes=scopes)
        elif key_path:
            # Get credentials from a JSON file.
            if key_path.endswith('.json'):
                self.log.debug('Getting connection using JSON key file %s' % key_path)
                credentials = (
                    google.oauth2.service_account.Credentials.from_service_account_file(
                        key_path, scopes=scopes)
                )
            elif key_path.endswith('.p12'):
                raise AirflowException('Legacy P12 key file are not supported, '
                                       'use a JSON key file.')
            else:
                raise AirflowException('Unrecognised extension for key file.')
        else:
            # Get credentials from JSON data provided in the UI.
            try:
                print(keyfile_dict)
                keyfile_dict = json.loads(keyfile_dict)

                # Depending on how the JSON was formatted, it may contain
                # escaped newlines. Convert those to actual newlines.
                keyfile_dict['private_key'] = keyfile_dict['private_key'].replace(
                    '\\n', '\n')

                credentials = (
                    google.oauth2.service_account.Credentials.from_service_account_info(
                        keyfile_dict, scopes=scopes)
                )
            except json.decoder.JSONDecodeError:
                raise AirflowException('Invalid key JSON.')
(省略)

JSON て、BigQuery API のアクセスキーファイルのことだよね? ファイルの中身のJSON構文がおかしい? と調べてみたけど問題なし。

デバッグプリント追加して keyfile_dict の値見てみたら、アクセスキーのファイルパスになってる。。

Airflow 管理画面からの接続設定で、ファイルパス書く場所間違ってました。。

  • 誤:Keyfile JSON
  • 正:Keyfile Path

まとめ(所感)

Airflow も GCP もまだあまり深く理解していない状態ですが、比較的簡単に目的達成できました。

既存のオペレーターを参考にして、他にもやりたいことに合わせて処理拡張できそうです。

各サービスの接続情報を画面から簡単に設定できるUIもあるので、Airflow を使えばいろいろなサービスと連携できて、バッチ処理の幅も広がりそうです。

参考