この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、みかみです。
なんとなく Airflow をさわれるようになったものの、まだあまりバッチ処理の実行などの実処理部分を動かしてません。
Airflow で GCP 関連の処理実行してみたいなー。
やりたいこと
- Airflow の GCP 関連オペレーターにどんなものがあるのか知りたい
- Airflow で BigQuery にデータをロードしたい
Airflow の GCP 関連オペレーター
Airflow のJob( DAG )の実処理は Operator に記述されており、BashOperator や PythonOperator など、デフォルトでも様々なオペレーターが用意されていますが、GCP 関連のオペレーターもあるようです。
- Google Cloud Operators | Apache Airflow Documentation
- airflow.contrib.operators | Apache Airflow Documentation
GCE や GCS、Cloud SQL に BigQuery など、GCP 関連処理には十分対応できそうです。
組み合わせれば、BigQuery へのデータロードも問題なくできそうな予感。
サンプル DAG で、GCS から BigQuery にデータをロードしてみる
サンプルで準備されている DAG を確認していると、example_gcs_to_bq_operator.py を発見しました。
これはそのものズバリ、GCS から BigQuery にデータロードしてくれる DAG では? と思ったので、とりあえず動かしてみたい!
コード確認すると、以下の処理を行っているようです。
- BigQuery のデータセットを作成
- 1 で作成したデータセットに、GCS に配置済みのファイルデータをロード
- 2 でデータロードしたデータセットを削除
実際の動きを、Amaxon Linux2 にインストール済みの Airflow v1.10.9 環境で確認しました。
なお、サーバーから bq コマンドの実行ができて、BigQuery API を使うためのアカウントキーファイルがサーバー上に配置してあることを前提としております。
BigQuery の接続情報設定
Airflow 管理画面の「Admin」タブから、BigQuery 接続情報を設定します。
「Connections」画面から「bigquery_default」接続設定の編集アイコンをクリックします。
「Project Id」、「Keyfile Path」、「copes (comma separated)」の項目を入力し、「Save」ボタンをクリック。
なお、各項目の設定値は、以下のページを参考にしました。
GCS にロード用データを配置
GCS に test-cm-mikami バケット を作成し、その下に airflow ディレクトリを作成して、データファイルを配置しました。
ロードデータは、BigQuery のスタートガイドにリンクのあった、一般公開データセットの アメリカの赤ちゃんの名前データです。
サンプル DAG を配置して実行
以下のサンプル DAG を動かしてみます。
/airflow/contrib/example_dags/example_gcs_to_bq_operator.py
バケット名やソースデータファイルの配置場所、スキーマなどを編集し、さらにロード後のデータを確認するためデータセット削除処理はコメントアウトして、airflow.cfg の「dags_folder」で指定されているディレクトリに配置しました。
example_gcs_to_bq_operator.py
(省略)
if gcs_to_bq is not None:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = models.DAG(
dag_id='example_gcs_to_bq_operator', default_args=args,
schedule_interval=None)
create_test_dataset = bash_operator.BashOperator(
task_id='create_airflow_test_dataset',
bash_command='bq mk airflow_test',
dag=dag)
# [START howto_operator_gcs_to_bq]
load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
# mod mikami
# bucket='cloud-samples-data',
bucket='test-cm-mikami',
# source_objects=['bigquery/us-states/us-states.csv'],
source_objects=['airflow/yob2000.txt'],
# destination_project_dataset_table='airflow_test.gcs_to_bq_table',
destination_project_dataset_table='airflow_test.name_2000',
# schema_fields=[
# {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
# {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
# ],
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'gender', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'count', 'type': 'INTEGER', 'mode': 'REQUIRED'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag)
# [END howto_operator_gcs_to_bq]
# del mikami
# delete_test_dataset = bash_operator.BashOperator(
# task_id='delete_airflow_test_dataset',
# bash_command='bq rm -rf airflow_test',
# dag=dag)
# create_test_dataset >> load_csv >> delete_test_dataset
create_test_dataset >> load_csv
管理画面から実行します。
実行完了後、GCP の管理画面から BigQuery を確認してみると・・・
無事、新しいデータセットが作成されて、GCS に配置したデータがロードされました。
つまずいたところ
サンプルで準備されてるほどだし、たぶん簡単に挙動確認できるでしょ? と、甘く見てましたが、サンプル DAG の正常終了までに、実は何度もつまずきました。。
サンプル DAG が表示されない
動作確認に使用した Amaxon Linux2 では、Airflow インストール前に bq コマンドや BigQuery API の動作確認してました。
google-cloud-sdk のインストールや API キーの認証設定も実施済みだったので、lib のインストールなども不要かと思い、初め何も考えずに サンプル DAG ファイルを配置したところ、Airflow 管理画面に example_gcs_to_bq_operator が表示されません。。
DAG のディレクトリがおかしい? Airflow Server 再起動しないとダメ? など考えましたが一向に表示されず。。
デバッグ目的で DAG ファイルを、Exception を Throw するように変更してみたところ・・・
example_gcs_to_bq_operator.py
(省略)
gcs_to_bq = None # type: Any
try:
from airflow.contrib.operators import gcs_to_bq
except ImportError:
## pass
raise ImportError
(省略)
まさに、ImportError になってました。。
[2020-03-13 04:18:54,290] {dagbag.py:246} ERROR - Failed to import: /home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py
Traceback (most recent call last):
File "/home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py", line 28, in <module>
from airflow.contrib.operators import gcs_to_bq
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 22, in <module>
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcs_hook.py", line 25, in <module>
from google.cloud import storage
ModuleNotFoundError: No module named 'google'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/dagbag.py", line 243, in process_file
m = imp.load_source(mod_name, filepath)
File "/home/ec2-user/test_airflow/lib64/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py", line 31, in <module>
raise ImportError
ImportError
ライブラリ足りなかったかなー。。 で、ちょっとちゃんとドキュメント読みました。。。
pip install 'apache-airflow[gcp]' 実行したら、ようやく Import エラー解消されて、DAG が表示されるようになりました。
ImportError: cannot import name opentype
google ライブラリは Import できるようになったものの、また出た。。
[2020-03-13 08:02:25,478] {bash_operator.py:115} INFO - Running command: bq mk airflow_test
[2020-03-13 08:02:25,482] {bash_operator.py:122} INFO - Output:
[2020-03-13 08:02:25,772] {bash_operator.py:126} INFO - Traceback (most recent call last):
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/bq.py", line 57, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - import oauth2client_4_0.service_account
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/service_account.py", line 27, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from oauth2client_4_0 import crypt
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/crypt.py", line 24, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from oauth2client_4_0 import _pure_python_crypt
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/_pure_python_crypt.py", line 25, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from pyasn1_modules.rfc2459 import Certificate
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/pyasn1_modules/rfc2459.py", line 21, in <module>
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from pyasn1.type import opentype
[2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - ImportError: cannot import name opentype
[2020-03-13 08:02:25,820] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-03-13 08:02:25,830] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
google 先生に聞いてみたら、pyasn1 のバージョン問題の可能性が高い? と思って lib をアップデートしようとしたけど、依存関係があるため上がらず。。
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip install -U pyasn1 pyasn1-modules
Requirement already up-to-date: pyasn1 in ./test_airflow/lib/python3.7/site-packages (0.4.8)
Requirement already up-to-date: pyasn1-modules in ./test_airflow/lib/python3.7/site-packages (0.2.8)
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip list | grep pyasn1
pyasn1 0.4.8
pyasn1-modules 0.2.8
pyasn1.type の opentype がいないと怒られてるけど、実ソース確認したら、ちゃんといるんだけどなぁ。。。
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip show pyasn1
Name: pyasn1
Version: 0.4.8
Summary: ASN.1 types and codecs
Home-page: https://github.com/etingof/pyasn1
Author: Ilya Etingof
Author-email: etingof@gmail.com
License: BSD
Location: /home/ec2-user/test_airflow/lib/python3.7/site-packages
Requires:
Required-by: rsa, pyasn1-modules
(test_airflow) [ec2-user@ip-10-0-43-239 type]$ pwd
/home/ec2-user/test_airflow/lib/python3.7/site-packages/pyasn1/type
(test_airflow) [ec2-user@ip-10-0-43-239 type]$ ls -l
total 232
-rw-rw-r-- 1 ec2-user ec2-user 22386 Mar 13 04:28 base.py
-rw-rw-r-- 1 ec2-user ec2-user 11397 Mar 13 04:28 char.py
-rw-rw-r-- 1 ec2-user ec2-user 22132 Mar 13 04:28 constraint.py
-rw-rw-r-- 1 ec2-user ec2-user 246 Mar 13 04:28 error.py
-rw-rw-r-- 1 ec2-user ec2-user 59 Mar 13 04:28 __init__.py
-rw-rw-r-- 1 ec2-user ec2-user 16368 Mar 13 04:28 namedtype.py
-rw-rw-r-- 1 ec2-user ec2-user 4886 Mar 13 04:28 namedval.py
-rw-rw-r-- 1 ec2-user ec2-user 2848 Mar 13 04:28 opentype.py
drwxrwxr-x 2 ec2-user ec2-user 4096 Mar 13 04:28 __pycache__
-rw-rw-r-- 1 ec2-user ec2-user 2998 Mar 13 04:28 tagmap.py
-rw-rw-r-- 1 ec2-user ec2-user 9486 Mar 13 04:28 tag.py
-rw-rw-r-- 1 ec2-user ec2-user 108921 Mar 13 04:28 univ.py
-rw-rw-r-- 1 ec2-user ec2-user 5368 Mar 13 04:28 useful.py
どっか別のところ見てる?( venv 環境変えたりしてるのが影響してるのかなぁ・・・?
結局、google-cloud-sdk をインストールしなおして解決しました。
ERROR: (bq) You do not currently have an active account selected.
サンプル DAG では、まず bq コマンドで BigQuery のデータセットを作成します。
が、コマンド実行でアカウント設定エラー。。
[2020-03-13 09:19:52,097] {bash_operator.py:115} INFO - Running command: bq mk airflow_test
[2020-03-13 09:19:52,101] {bash_operator.py:122} INFO - Output:
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - ERROR: (bq) You do not currently have an active account selected.
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - Please run:
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO -
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - $ gcloud auth login
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO -
[2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - to obtain new credentials, or if you have already logged in with a
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - different account:
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO -
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - $ gcloud config set account ACCOUNT
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO -
[2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - to select an already authenticated account to use.
[2020-03-13 09:19:52,797] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-03-13 09:19:52,810] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
コマンドラインから直接 bq コマンド実行しても、同じエラー発生。 google-cloud-sdk インストールし直したので、そりゃそうですよね。。
認証情報設定のため、gcloud init コマンド実行して、解決。
OSError: [Errno 12] Cannot allocate memory
なんですと? メモリが足りないですと? そんな高スペックが必要なのかしら?(メモリ関連のエラーって、けっこうドキッとしてしまいます。。。
[2020-03-13 09:40:30,323] {bash_operator.py:115} INFO - Running command: bq mk airflow_test
[2020-03-13 09:40:30,327] {bash_operator.py:122} INFO - Output:
[2020-03-13 09:40:31,269] {bash_operator.py:126} INFO - Traceback (most recent call last):
[2020-03-13 09:40:31,271] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 83, in <module>
[2020-03-13 09:40:31,271] {bash_operator.py:126} INFO - exceptions.HandleError(e, 'bq')
[2020-03-13 09:40:31,271] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/calliope/exceptions.py", line 526, in HandleError
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - core_exceptions.reraise(exc)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/core/exceptions.py", line 111, in reraise
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - six.reraise(type(exc_value), exc_value, tb)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 81, in <module>
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - main()
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 76, in main
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - 'platform/bq', 'bq.py', *args)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bootstrapping.py", line 44, in ExecutePythonTool
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - execution_utils.ArgsForPythonTool(_FullPath(tool_dir, exec_name), *args))
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bootstrapping.py", line 105, in _ExecuteTool
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - execution_utils.Exec(args + sys.argv[1:], env=_GetToolEnv())
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/core/execution_utils.py", line 300, in Exec
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - p = subprocess.Popen(args, env=env, **extra_popen_kwargs)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/usr/lib64/python2.7/subprocess.py", line 394, in __init__
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - errread, errwrite)
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/usr/lib64/python2.7/subprocess.py", line 938, in _execute_child
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - self.pid = os.fork()
[2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - OSError: [Errno 12] Cannot allocate memory
[2020-03-13 09:40:31,323] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-03-13 09:40:31,339] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
こちらも bq コマンド実行時のエラーだったので、試しにまたコンソールから直接 bq コマンド実行してみると、今度はすんなり通った。。(なら、Airflow からでも行けるんじゃない・・・?
で、実行中のプロセス確認してみると・・・
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ ps aux | grep airflow
ec2-user 4372 0.1 7.0 415996 70888 ? S 04:03 0:24 airflow scheduler -- DagFileProcessorManager
ec2-user 11597 0.1 7.9 455620 80092 ? S 07:27 0:14 airflow scheduler -- DagFileProcessorManager
ec2-user 12521 0.1 8.7 449744 88392 pts/2 S+ 07:55 0:10 /home/ec2-user/test_airflow/bin/python3 /home/ec2-user/test_airflow/bin/airflo webserver -p 8080
ec2-user 12524 0.0 6.8 421624 68988 pts/2 S+ 07:55 0:02 gunicorn: master [airflow-webserver]
ec2-user 15880 0.0 7.9 455636 80140 ? S 09:19 0:02 airflow scheduler -- DagFileProcessorManager
ec2-user 17714 0.3 9.0 449976 91220 pts/2 S+ 10:08 0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17730 0.5 9.0 449976 91224 pts/2 S+ 10:08 0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17748 0.8 9.0 449976 91220 pts/2 S+ 10:09 0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17766 2.6 9.0 449980 91216 pts/2 S+ 10:09 0:00 [ready] gunicorn: worker [airflow-webserver]
ec2-user 17778 0.0 0.0 119416 964 pts/5 S+ 10:10 0:00 grep --color=auto airflow
airflow scheduler ってこんなに多かったっけ? デーモンが残っているようです。。
念のため、サーバー再起動したら、エラー出なくなりました。
ERROR - INTERNAL: No default project is specified
うん、そういえば、GCP のプロジェクトID、どこでも指定してなかった気がするw
[2020-03-13 10:14:28,680] {taskinstance.py:1128} ERROR - INTERNAL: No default project is specified
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute
encryption_configuration=self.encryption_configuration)
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1212, in run_load
var_name='destination_project_dataset_table')
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 2189, in _split_tablename
raise ValueError("INTERNAL: No default project is specified")
ValueError: INTERNAL: No default project is specified
どこで設定すればいいんだっけ? と調べて、Airflow 管理画面から各種接続設定ができることを知りました。
ERROR - Invalid key JSON.
まだ正常系通らない。。
[2020-03-13 11:06:56,491] {taskinstance.py:1128} ERROR - Invalid key JSON.
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 165, in _get_credentials
keyfile_dict = json.loads(keyfile_dict)
File "/usr/lib64/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 246, in execute
conn = bq_hook.get_conn()
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 68, in get_conn
service = self.get_service()
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 82, in get_service
http_authorized = self._authorize()
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 195, in _authorize
credentials = self._get_credentials()
File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 179, in _get_credentials
raise AirflowException('Invalid key JSON.')
airflow.exceptions.AirflowException: Invalid key JSON.
例外吐いてるソースコードは以下。
gcp_api_base_hook.py
(省略)
if not key_path and not keyfile_dict:
self.log.info('Getting connection using `google.auth.default()` '
'since no key file is defined for hook.')
credentials, _ = google.auth.default(scopes=scopes)
elif key_path:
# Get credentials from a JSON file.
if key_path.endswith('.json'):
self.log.debug('Getting connection using JSON key file %s' % key_path)
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
key_path, scopes=scopes)
)
elif key_path.endswith('.p12'):
raise AirflowException('Legacy P12 key file are not supported, '
'use a JSON key file.')
else:
raise AirflowException('Unrecognised extension for key file.')
else:
# Get credentials from JSON data provided in the UI.
try:
print(keyfile_dict)
keyfile_dict = json.loads(keyfile_dict)
# Depending on how the JSON was formatted, it may contain
# escaped newlines. Convert those to actual newlines.
keyfile_dict['private_key'] = keyfile_dict['private_key'].replace(
'\\n', '\n')
credentials = (
google.oauth2.service_account.Credentials.from_service_account_info(
keyfile_dict, scopes=scopes)
)
except json.decoder.JSONDecodeError:
raise AirflowException('Invalid key JSON.')
(省略)
JSON て、BigQuery API のアクセスキーファイルのことだよね? ファイルの中身のJSON構文がおかしい? と調べてみたけど問題なし。
デバッグプリント追加して keyfile_dict の値見てみたら、アクセスキーのファイルパスになってる。。
Airflow 管理画面からの接続設定で、ファイルパス書く場所間違ってました。。
- 誤:Keyfile JSON
- 正:Keyfile Path
まとめ(所感)
Airflow も GCP もまだあまり深く理解していない状態ですが、比較的簡単に目的達成できました。
既存のオペレーターを参考にして、他にもやりたいことに合わせて処理拡張できそうです。
各サービスの接続情報を画面から簡単に設定できるUIもあるので、Airflow を使えばいろいろなサービスと連携できて、バッチ処理の幅も広がりそうです。