Airflow で BigQuery を操作してみた
こんにちは、みかみです。
Airflow から BigQuery に対して、いろいろ操作してみたい。
ということで。
やりたいこと
Airflow の DAG から、BigQuery に以下の処理を実行してみたい。
- データセット一覧を取得
- テーブル一覧を取得
- テーブルデータを取得
- テーブルにデータを insert
- テーブルデータを update
- テーブルデータを delete
- テーブルデータを GCS に export
オペレータの実装内容を確認
やりたいことが実現できそうなオペレータは既存でもいろいろあるようですが、自分で拡張が必要になる場合に備えて、まずは既存オペレータの実装内容を確認してみます。
bigquery と名がつくオペレータをざっと見てみましたが、ほとんどのオペレータクラスは、コンストラクタと execute() メソッドを持っているようです。
DAG ファイルではオペレータクラスの作成と依存関係定義しかしていないことから、クラスインスタンスを作成してあげれば Airflow フレームワークが execute() メソッドを実行してくれるようです。
BigQueryGetDatasetOperator の処理内容を確認
もう少し詳細に、/airflow/contrib/operators/bigquery_operator.py に実装されている、BigQueryGetDatasetOperator のコードを確認してみます。
execute() メソッドが実行されると、BigQueryHook インスタンスが作成され、connection を作り、cursor を取得して get_dataset() メソッドを実行します。
(省略) from airflow.contrib.hooks.bigquery_hook import BigQueryHook (省略) class BigQueryGetDatasetOperator(BaseOperator): (省略) template_fields = ('dataset_id', 'project_id') ui_color = '#f0eee4' @apply_defaults def __init__(self, dataset_id, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.dataset_id = dataset_id self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to super(BigQueryGetDatasetOperator, self).__init__(*args, **kwargs) def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = bq_hook.get_conn() cursor = conn.cursor() self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id) return cursor.get_dataset( dataset_id=self.dataset_id, project_id=self.project_id) (省略)
BigQueryHook の中身も確認してみます。
実体は /airflow/contrib/hooks/bigquery_hook.py に実装されています。
bigquery_hook.py で googleapiclient.discovery が import されていることから、どうやらここから googleapiclient を使用して GCP にアクセスしているようです。
BigQueryHook の get_conn() メソッドでは、BigQueryConnection インスタンスを作成して返却しています。 BigQueryConnection の cursor() メソッドでは、BigQueryCursor インスタンスを返却。 BigQueryCursor クラスは BigQueryBaseCursor クラスを継承していました。
(省略) from googleapiclient.discovery import build (省略) class BigQueryHook(GoogleCloudBaseHook, DbApiHook): """ Interact with BigQuery. This hook uses the Google Cloud Platform connection. """ conn_name_attr = 'bigquery_conn_id' def __init__(self, bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=True, location=None): super(BigQueryHook, self).__init__( gcp_conn_id=bigquery_conn_id, delegate_to=delegate_to) self.use_legacy_sql = use_legacy_sql self.location = location def get_conn(self): """ Returns a BigQuery PEP 249 connection object. """ service = self.get_service() project = self._get_field('project') return BigQueryConnection( service=service, project_id=project, use_legacy_sql=self.use_legacy_sql, location=self.location, num_retries=self.num_retries ) (省略) class BigQueryConnection(object): """ BigQuery does not have a notion of a persistent connection. Thus, these objects are small stateless factories for cursors, which do all the real work. """ def __init__(self, *args, **kwargs): self._args = args self._kwargs = kwargs def close(self): """ BigQueryConnection does not have anything to close. """ def commit(self): """ BigQueryConnection does not support transactions. """ def cursor(self): """ Return a new :py:class:`Cursor` object using the connection. """ return BigQueryCursor(*self._args, **self._kwargs) def rollback(self): raise NotImplementedError( "BigQueryConnection does not have transactions") (省略) class BigQueryCursor(BigQueryBaseCursor): """ A very basic BigQuery PEP 249 cursor implementation. The PyHive PEP 249 implementation was used as a reference: https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py """ def __init__(self, service, project_id, use_legacy_sql=True, location=None, num_retries=5): super(BigQueryCursor, self).__init__( service=service, project_id=project_id, use_legacy_sql=use_legacy_sql, location=location, num_retries=num_retries ) self.buffersize = None self.page_token = None self.job_id = None self.buffer = [] self.all_pages_loaded = False (省略)
継承元の BigQueryBaseCursor のコードを確認してみると、get_dataset() メソッドで datasets().get() を実行していることが確認できました。
(省略) class BigQueryBaseCursor(LoggingMixin): """ The BigQuery base cursor contains helper methods to execute queries against BigQuery. The methods can be used directly by operators, in cases where a PEP 249 cursor isn't needed. """ def __init__(self, service, project_id, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5): self.service = service self.project_id = project_id self.use_legacy_sql = use_legacy_sql if api_resource_configs: _validate_value("api_resource_configs", api_resource_configs, dict) self.api_resource_configs = api_resource_configs \ if api_resource_configs else {} self.running_job_id = None self.location = location self.num_retries = num_retries (省略) def get_dataset(self, dataset_id, project_id=None): """ Method returns dataset_resource if dataset exist and raised 404 error if dataset does not exist :param dataset_id: The BigQuery Dataset ID :type dataset_id: str :param project_id: The GCP Project ID :type project_id: str :return: dataset_resource .. seealso:: For more information, see Dataset Resource content: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource """ if not dataset_id or not isinstance(dataset_id, str): raise ValueError("dataset_id argument must be provided and has " "a type 'str'. You provided: {}".format(dataset_id)) dataset_project_id = project_id if project_id else self.project_id try: dataset_resource = self.service.datasets().get( datasetId=dataset_id, projectId=dataset_project_id).execute(num_retries=self.num_retries) self.log.info("Dataset Resource: %s", dataset_resource) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) return dataset_resource (省略)
データセット一覧を取得
データセット一覧は、datasets().list() API で取得できそうです。
BigQueryBaseCursor クラスに、datasets().list() API をコールする get_datasets_list() メソッドがありました。
(省略) class BigQueryBaseCursor(LoggingMixin): (省略) def get_datasets_list(self, project_id=None): """ Method returns full list of BigQuery datasets in the current project .. seealso:: For more information, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list :param project_id: Google Cloud Project for which you try to get all datasets :type project_id: str :return: datasets_list Example of returned datasets_list: :: { "kind":"bigquery#dataset", "location":"US", "id":"your-project:dataset_2_test", "datasetReference":{ "projectId":"your-project", "datasetId":"dataset_2_test" } }, { "kind":"bigquery#dataset", "location":"US", "id":"your-project:dataset_1_test", "datasetReference":{ "projectId":"your-project", "datasetId":"dataset_1_test" } } ] """ dataset_project_id = project_id if project_id else self.project_id try: datasets_list = self.service.datasets().list( projectId=dataset_project_id).execute(num_retries=self.num_retries)['datasets'] self.log.info("Datasets List: %s", datasets_list) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) return datasets_list (省略)
既存コードを grep してみましたが、get_datasets_list() を使っているオペレータはなさそうなので、他のオペレータクラスを参考にして、新規で作成してみます。
以下のクラスを /airflow/contrib/operators/bigquery_operator.py に追加しました。
※コード管理やメンテナンス性を考えると、拡張コードは別ディレクトリ、別ファイルに分けた方が良いと思いますが、今回は簡易性を優先して既存ファイルを更新しています。
class BigQueryGetDatasetListOperator(BaseOperator): template_fields = ('project_id',) ui_color = '#f0eee4' @apply_defaults def __init__(self, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to super(BigQueryGetDatasetListOperator, self).__init__(*args, **kwargs) def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = bq_hook.get_conn() cursor = conn.cursor() self.log.info('Start getting dataset list: %s', self.project_id) return cursor.get_datasets_list(project_id=self.project_id)
追加したオペレータを実行する DAG ファイルも追加しました。
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_operator = None # type: Any try: from airflow.contrib.operators import bigquery_operator except ImportError: pass if bigquery_operator is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_getdatasetlist_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_operator.BigQueryGetDatasetListOperator( task_id='bigquery_getdatasetlist_example', project_id='cm-da-mikami-yuki-258308', dag=dag)
現在、GCP の 対象プロジェクトには、airflow_test と test_s3 の2つのデータセットがある状態です。
DAG ファイルを dags ディレクトリに配置して、実行してみます。
正常に実行できたようです。
ログを確認してみると、取得したデータセットのリストが出力されています。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdatasetlist_operator/bigquery_getdatasetlist_example/2020-03-17T08:11:54.852025+00:00/1.log [2020-03-17 08:12:05,305] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdatasetlist_operator.bigquery_getdatasetlist_example 2020-03-17T08:11:54.852025+00:00 [queued]> [2020-03-17 08:12:05,312] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdatasetlist_operator.bigquery_getdatasetlist_example 2020-03-17T08:11:54.852025+00:00 [queued]> [2020-03-17 08:12:05,312] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:12:05,312] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 08:12:05,312] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:12:05,322] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetDatasetListOperator): bigquery_getdatasetlist_example> on 2020-03-17T08:11:54.852025+00:00 [2020-03-17 08:12:05,323] {standard_task_runner.py:53} INFO - Started process 12393 to run task [2020-03-17 08:12:05,382] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_getdatasetlist_operator.bigquery_getdatasetlist_example 2020-03-17T08:11:54.852025+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 08:12:05,399] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,399] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2020-03-17 08:12:05,402] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,402] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 08:12:05,701] {bigquery_operator.py:851} INFO - Start getting dataset list: cm-da-mikami-yuki-258308 [2020-03-17 08:12:05,703] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,703] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets?alt=json [2020-03-17 08:12:06,214] {logging_mixin.py:112} INFO - [2020-03-17 08:12:06,213] {bigquery_hook.py:1809} INFO - Datasets List: [{'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:airflow_test', 'datasetReference': {'datasetId': 'airflow_test', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'US'}, {'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:test_s3', 'datasetReference': {'datasetId': 'test_s3', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'asia-northeast1'}] [2020-03-17 08:12:06,225] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdatasetlist_operator, task_id=bigquery_getdatasetlist_example, execution_date=20200317T081154, start_date=20200317T081205, end_date=20200317T081206 [2020-03-17 08:12:15,303] {logging_mixin.py:112} INFO - [2020-03-17 08:12:15,303] {local_task_job.py:103} INFO - Task exited with return code 0
以下、レスポンス部分の抜粋です。
Datasets List: [ { 'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:airflow_test', 'datasetReference': { 'datasetId': 'airflow_test', 'projectId': 'cm-da-mikami-yuki-258308' }, 'location': 'US' }, { 'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:test_s3', 'datasetReference': { 'datasetId': 'test_s3', 'projectId': 'cm-da-mikami-yuki-258308' }, 'location': 'asia-northeast1' } ]
データセットが正常に取得できました。
テーブル一覧を取得
テーブル一覧は、tables().list() API で取得できるようです。
既存コードを確認したところ、テーブル一覧取得用のメソッドはないようです。
BigQueryBaseCursor クラスに、tables().list() API をコールする get_table_list() メソッドを追加しました。
def get_table_list(self, dataset_id, project_id=None): table_project_id = project_id if project_id else self.project_id try: tables_list = self.service.tables().list( projectId=table_project_id, datasetId=dataset_id).execute(num_retries=self.num_retries) self.log.info("Tables List: %s", tables_list) except HttpError as err: raise AirflowException( 'BigQuery job failed. Error was: {}'.format(err.content)) return tables_list
さらに、テーブルリスト取得用のオペレータと、オペレータを実行する DAG ファイルも追加します。
class BigQueryGetTableListOperator(BaseOperator): template_fields = ('dataset_id', 'project_id') ui_color = '#f0eee4' @apply_defaults def __init__(self, dataset_id, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): self.dataset_id = dataset_id self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to super(BigQueryGetTableListOperator, self).__init__(*args, **kwargs) def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = bq_hook.get_conn() cursor = conn.cursor() self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id) return cursor.get_table_list( dataset_id=self.dataset_id, project_id=self.project_id)
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_operator = None # type: Any try: from airflow.contrib.operators import bigquery_operator except ImportError: pass if bigquery_operator is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_gettablelist_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_operator.BigQueryGetTableListOperator( task_id='bigquery_gettablelist_example', dataset_id='airflow_test', project_id='cm-da-mikami-yuki-258308', dag=dag)
取得対象の airflow_test データセットには、name_[YYYY] テーブルが3つある状態です。
DAG を実行してログを確認してみます。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_gettablelist_operator/bigquery_gettablelist_example/2020-03-17T08:58:24.271847+00:00/1.log [2020-03-17 08:58:27,986] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_gettablelist_operator.bigquery_gettablelist_example 2020-03-17T08:58:24.271847+00:00 [queued]> [2020-03-17 08:58:27,994] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_gettablelist_operator.bigquery_gettablelist_example 2020-03-17T08:58:24.271847+00:00 [queued]> [2020-03-17 08:58:27,994] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:58:27,994] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 08:58:27,994] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 08:58:28,003] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetTableListOperator): bigquery_gettablelist_example> on 2020-03-17T08:58:24.271847+00:00 [2020-03-17 08:58:28,005] {standard_task_runner.py:53} INFO - Started process 14010 to run task [2020-03-17 08:58:28,079] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_gettablelist_operator.bigquery_gettablelist_example 2020-03-17T08:58:24.271847+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 08:58:28,106] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,105] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2020-03-17 08:58:28,111] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,111] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 08:58:28,417] {bigquery_operator.py:879} INFO - Start getting dataset: cm-da-mikami-yuki-258308:airflow_test [2020-03-17 08:58:28,421] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,421] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables?alt=json [2020-03-17 08:58:28,789] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,789] {bigquery_hook.py:1437} INFO - Tables List: {'kind': 'bigquery#tableList', 'etag': '4rK75YI14CTPa28HhVLyEQ==', 'tables': [{'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_1880'}, 'type': 'TABLE', 'creationTime': '1584434061049'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2000'}, 'type': 'TABLE', 'creationTime': '1584102254437'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2018'}, 'type': 'TABLE', 'creationTime': '1584434135714'}], 'totalItems': 3} [2020-03-17 08:58:28,801] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_gettablelist_operator, task_id=bigquery_gettablelist_example, execution_date=20200317T085824, start_date=20200317T085827, end_date=20200317T085828
取得したテーブルリストは以下です。
Tables List: { 'kind': 'bigquery#tableList', 'etag': '4rK75YI14CTPa28HhVLyEQ==', 'tables': [ { 'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 'tableReference': { 'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_1880' }, 'type': 'TABLE', 'creationTime': '1584434061049' }, { 'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 'tableReference': { 'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2000' }, 'type': 'TABLE', 'creationTime': '1584102254437' }, { 'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 'tableReference': { 'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2018' }, 'type': 'TABLE', 'creationTime': '1584434135714' } ], 'totalItems': 3 }
期待通り、3つのテーブルが取得できました。
テーブルデータを取得
テーブルデータを取得するためには、tabledata().list() をコールすれば良さそうです。
既存の /airflow/contrib/operators/bigquery_get_data.py ファイルにある BigQueryGetDataOperator が使えそうです。
DAG ファイルを追加して動作確認してみます。
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_get_data = None # type: Any try: from airflow.contrib.operators import bigquery_get_data except ImportError: pass if bigquery_get_data is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_getdata_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_get_data.BigQueryGetDataOperator( task_id='bigquery_getdata_example', dataset_id='airflow_test', table_id='name_2000', max_results='10', dag=dag)
完了したようなので、ログを確認してみると。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdata_operator/bigquery_getdata_example/2020-03-17T09:40:56.917329+00:00/1.log [2020-03-17 09:41:06,239] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:40:56.917329+00:00 [queued]> [2020-03-17 09:41:06,246] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:40:56.917329+00:00 [queued]> [2020-03-17 09:41:06,246] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 09:41:06,246] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 09:41:06,247] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 09:41:06,255] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetDataOperator): bigquery_getdata_example> on 2020-03-17T09:40:56.917329+00:00 [2020-03-17 09:41:06,257] {standard_task_runner.py:53} INFO - Started process 15511 to run task [2020-03-17 09:41:06,316] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:40:56.917329+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 09:41:06,330] {bigquery_get_data.py:92} INFO - Fetching Data from: [2020-03-17 09:41:06,330] {bigquery_get_data.py:94} INFO - Dataset: airflow_test ; Table: name_2000 ; Max Results: 10 [2020-03-17 09:41:06,335] {logging_mixin.py:112} INFO - [2020-03-17 09:41:06,335] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 09:41:06,676] {logging_mixin.py:112} INFO - [2020-03-17 09:41:06,676] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data?maxResults=10&alt=json [2020-03-17 09:41:07,286] {bigquery_get_data.py:106} INFO - Total Extracted rows: 29772 [2020-03-17 09:41:07,297] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdata_operator, task_id=bigquery_getdata_example, execution_date=20200317T094056, start_date=20200317T094106, end_date=20200317T094107 [2020-03-17 09:41:16,236] {logging_mixin.py:112} INFO - [2020-03-17 09:41:16,236] {local_task_job.py:103} INFO - Task exited with return code 0
行数の取得結果は実際のテーブルレコードと一致しているので、正常に実行できているようです。
が、データの中身も確認したい。。ので、BigQueryGetDataOperator に取得したデータを出力するログを追加して、再度実行してみます。
(省略) class BigQueryGetDataOperator(BaseOperator): (省略) def execute(self, context): self.log.info('Fetching Data from:') self.log.info('Dataset: %s ; Table: %s ; Max Results: %s', self.dataset_id, self.table_id, self.max_results) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() cursor = conn.cursor() response = cursor.get_tabledata(dataset_id=self.dataset_id, table_id=self.table_id, max_results=self.max_results, selected_fields=self.selected_fields) self.log.info('Total Extracted rows: %s', response['totalRows']) rows = response['rows'] # add mikami self.log.info('Data: %s', rows) table_data = [] for dict_row in rows: single_row = [] for fields in dict_row['f']: single_row.append(fields['v']) table_data.append(single_row) return table_data (省略)
今度はログに取得したデータも出力されました。
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdata_operator/bigquery_getdata_example/2020-03-17T09:53:54.223409+00:00/1.log [2020-03-17 09:55:10,380] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:53:54.223409+00:00 [queued]> [2020-03-17 09:55:10,387] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:53:54.223409+00:00 [queued]> [2020-03-17 09:55:10,387] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 09:55:10,387] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 09:55:10,387] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 09:55:10,396] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetDataOperator): bigquery_getdata_example> on 2020-03-17T09:53:54.223409+00:00 [2020-03-17 09:55:10,398] {standard_task_runner.py:53} INFO - Started process 16017 to run task [2020-03-17 09:55:10,456] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:53:54.223409+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 09:55:10,469] {bigquery_get_data.py:92} INFO - Fetching Data from: [2020-03-17 09:55:10,469] {bigquery_get_data.py:94} INFO - Dataset: airflow_test ; Table: name_2000 ; Max Results: 10 [2020-03-17 09:55:10,474] {logging_mixin.py:112} INFO - [2020-03-17 09:55:10,474] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 09:55:10,796] {logging_mixin.py:112} INFO - [2020-03-17 09:55:10,796] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data?maxResults=10&alt=json [2020-03-17 09:55:12,168] {bigquery_get_data.py:106} INFO - Total Extracted rows: 29772 [2020-03-17 09:55:12,168] {bigquery_get_data.py:109} INFO - Data: [{'f': [{'v': 'Emily'}, {'v': 'F'}, {'v': '25956'}]}, {'f': [{'v': 'Hannah'}, {'v': 'F'}, {'v': '23082'}]}, {'f': [{'v': 'Madison'}, {'v': 'F'}, {'v': '19968'}]}, {'f': [{'v': 'Ashley'}, {'v': 'F'}, {'v': '17997'}]}, {'f': [{'v': 'Sarah'}, {'v': 'F'}, {'v': '17702'}]}, {'f': [{'v': 'Alexis'}, {'v': 'F'}, {'v': '17629'}]}, {'f': [{'v': 'Samantha'}, {'v': 'F'}, {'v': '17265'}]}, {'f': [{'v': 'Jessica'}, {'v': 'F'}, {'v': '15709'}]}, {'f': [{'v': 'Elizabeth'}, {'v': 'F'}, {'v': '15099'}]}, {'f': [{'v': 'Taylor'}, {'v': 'F'}, {'v': '15078'}]}] [2020-03-17 09:55:12,180] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdata_operator, task_id=bigquery_getdata_example, execution_date=20200317T095354, start_date=20200317T095510, end_date=20200317T095512
Data: [ {'f': [{'v': 'Emily'}, {'v': 'F'}, {'v': '25956'}]}, {'f': [{'v': 'Hannah'}, {'v': 'F'}, {'v': '23082'}]}, {'f': [{'v': 'Madison'}, {'v': 'F'}, {'v': '19968'}]}, {'f': [{'v': 'Ashley'}, {'v': 'F'}, {'v': '17997'}]}, {'f': [{'v': 'Sarah'}, {'v': 'F'}, {'v': '17702'}]}, {'f': [{'v': 'Alexis'}, {'v': 'F'}, {'v': '17629'}]}, {'f': [{'v': 'Samantha'}, {'v': 'F'}, {'v': '17265'}]}, {'f': [{'v': 'Jessica'}, {'v': 'F'}, {'v': '15709'}]}, {'f': [{'v': 'Elizabeth'}, {'v': 'F'}, {'v': '15099'}]}, {'f': [{'v': 'Taylor'}, {'v': 'F'}, {'v': '15078'}]} ]
この API ではカラム名までは取得できないようですが、ひとまず、テーブルデータを取得することができました。
Repeated rows as result. The REST-based representation of this data leverages a series of JSON f,v objects for indicating fields and values.
テーブルにデータを insert / update / delete
BigQuery に対して、DAG から自由に SQL クエリを実行するにはどうするの? と思って調べたところ、BigQueryOperator に SQL を渡してあげれば実行できそうです。
name_2000 テーブルに name='Mikami' のレコードを insert する、以下の DAG を追加して実行しました。
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_operator = None # type: Any try: from airflow.contrib.operators import bigquery_operator except ImportError: pass if bigquery_operator is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_sql_operator', default_args=args, schedule_interval=None) query="INSERT INTO `cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami', 'F', 1)" bigquery_sql = bigquery_operator.BigQueryOperator( task_id='bigquery_sql_example', sql=query, use_legacy_sql=False, dag=dag)
正常終了したようなので、GCP の管理コンソールからデータを確認してみると。
ちゃんと DAG からレコードの追加が行われました。
続いて先ほど insert したレコードを、name='Yuki' で update してみます。
DAG ファイルの SQL を update クエリに変更して実行してみます。
(省略) dag = models.DAG( dag_id='example_bigquery_sql_operator', default_args=args, schedule_interval=None) # query="INSERT INTO `cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami', 'F', 1)" query="UPDATE `cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = 'Yuki' WHERE name = 'Mikami' and gender = 'F' and count = 1" bigquery_sql = bigquery_operator.BigQueryOperator( task_id='bigquery_sql_example', sql=query, use_legacy_sql=False, dag=dag)
DAG が正常終了したので、GCP 管理コンソールから select クエリ実行してデータを確認してみます。
正常に update されたことが確認できました。
さらに、先ほど insert & update したレコードを delete します。
(省略) dag = models.DAG( dag_id='example_bigquery_sql_operator', default_args=args, schedule_interval=None) # query="INSERT INTO `cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami', 'F', 1)" # query="UPDATE `cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = 'Yuki' WHERE name = 'Mikami' and gender = 'F' and count = 1" query="DELETE FROM `cm-da-mikami-yuki-258308.airflow_test.name_2000` WHERE name = 'Yuki' and gender = 'F' and count = 1" bigquery_sql = bigquery_operator.BigQueryOperator( task_id='bigquery_sql_example', sql=query, use_legacy_sql=False, dag=dag)
DAG 実行後、再度 GCP 管理コンソールからクエリ実行してデータを確認してみると。
delete も、期待通り実行できました。
テーブルデータを GCS に export する
最後に、テーブルデータを GCS にファイル出力してみます。
/airflow/contrib/operators/bigquery_to_gcs.py の BigQueryToCloudStorageOperator を使えば、DAG 追加だけで実行できそうです。
name_2000 テーブルのデータを、gs://test-cm-mikami/test_export/exp_name_2000.txt に export する、以下の DAG ファイルを追加して、実行してみます。
# -*- coding: utf-8 -*- from typing import Any import airflow from airflow import models bigquery_to_gcs = None # type: Any try: from airflow.contrib.operators import bigquery_to_gcs except ImportError: pass if bigquery_to_gcs is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_bigquery_to_gcs_operator', default_args=args, schedule_interval=None) get_dataset = bigquery_to_gcs.BigQueryToCloudStorageOperator( task_id='bigquery_to_gcs_example', source_project_dataset_table='cm-da-mikami-yuki-258308.airflow_test.name_2000', destination_cloud_storage_uris='gs://test-cm-mikami/test_export/exp_name_2000.txt', dag=dag)
実行前、GCS の対象パスには、何もファイルがない状態です。
DAG 実行が正常に完了したことを確認して
*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_to_gcs_operator/bigquery_to_gcs_example/2020-03-17T11:34:24.397858+00:00/1.log [2020-03-17 11:34:42,238] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_to_gcs_operator.bigquery_to_gcs_example 2020-03-17T11:34:24.397858+00:00 [queued]> [2020-03-17 11:34:42,245] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_to_gcs_operator.bigquery_to_gcs_example 2020-03-17T11:34:24.397858+00:00 [queued]> [2020-03-17 11:34:42,245] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-03-17 11:34:42,245] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-03-17 11:34:42,245] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-03-17 11:34:42,255] {taskinstance.py:887} INFO - Executing <Task(BigQueryToCloudStorageOperator): bigquery_to_gcs_example> on 2020-03-17T11:34:24.397858+00:00 [2020-03-17 11:34:42,257] {standard_task_runner.py:53} INFO - Started process 19929 to run task [2020-03-17 11:34:42,315] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_to_gcs_operator.bigquery_to_gcs_example 2020-03-17T11:34:24.397858+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal [2020-03-17 11:34:42,329] {bigquery_to_gcs.py:93} INFO - Executing extract of cm-da-mikami-yuki-258308.airflow_test.name_2000 into: gs://test-cm-mikami/test_export/exp_name_2000.txt [2020-03-17 11:34:42,334] {logging_mixin.py:112} INFO - [2020-03-17 11:34:42,334] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest [2020-03-17 11:34:42,645] {logging_mixin.py:112} INFO - [2020-03-17 11:34:42,644] {discovery.py:894} INFO - URL being requested: POST https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs?alt=json [2020-03-17 11:34:43,677] {logging_mixin.py:112} INFO - [2020-03-17 11:34:43,677] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY?location=US&alt=json [2020-03-17 11:34:44,198] {logging_mixin.py:112} INFO - [2020-03-17 11:34:44,198] {bigquery_hook.py:1347} INFO - Waiting for job to complete : cm-da-mikami-yuki-258308, job_NptEvLRIZj6NzF6O8fy2mgtoMlKY [2020-03-17 11:34:49,204] {logging_mixin.py:112} INFO - [2020-03-17 11:34:49,203] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY?location=US&alt=json [2020-03-17 11:34:49,493] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_to_gcs_operator, task_id=bigquery_to_gcs_example, execution_date=20200317T113424, start_date=20200317T113442, end_date=20200317T113449 [2020-03-17 11:34:52,237] {logging_mixin.py:112} INFO - [2020-03-17 11:34:52,236] {local_task_job.py:103} INFO - Task exited with return code 0
GCP 管理コンソールから GCS のファイルを確認してみると
ファイルが出力されています。
出力ファイルをダウンロードして中身を確認してみると
name,gender,count Lisette,F,256 Monserrat,F,256 Kalyn,F,256 Julianne,F,512 Tayler,F,512 Kayla,F,13312 Madison,F,19968 Keara,F,257 Randi,F,257 Reanna,F,257 Ingrid,F,257 Eryn,F,257 Alexus,F,1281 (省略)
テーブルデータが正常に export されたことが確認できました。
まとめ(所感)
Airflow の BigQuery 関連のオペレータでは、BigQuery API を使って Bigquery にアクセスしていて、実際にどの API をコールしているのかは bigquery_hook.py を確認すれば良さそうです。
また、既存にはない処理も、BigQuery API の仕様を確認しながら自由に実装できました。
他にも BashOperator で gcloud CLI も簡単に実行できるので、Airflow 経由での BigQuery 関連の操作に困ることはなさそうです。