Airflow で BigQuery を操作してみた

2020.03.17

こんにちは、みかみです。

Airflow から BigQuery に対して、いろいろ操作してみたい。

ということで。

やりたいこと

Airflow の DAG から、BigQuery に以下の処理を実行してみたい。

  • データセット一覧を取得
  • テーブル一覧を取得
  • テーブルデータを取得
  • テーブルにデータを insert
  • テーブルデータを update
  • テーブルデータを delete
  • テーブルデータを GCS に export

オペレータの実装内容を確認

やりたいことが実現できそうなオペレータは既存でもいろいろあるようですが、自分で拡張が必要になる場合に備えて、まずは既存オペレータの実装内容を確認してみます。

bigquery と名がつくオペレータをざっと見てみましたが、ほとんどのオペレータクラスは、コンストラクタと execute() メソッドを持っているようです。

DAG ファイルではオペレータクラスの作成と依存関係定義しかしていないことから、クラスインスタンスを作成してあげれば Airflow フレームワークが execute() メソッドを実行してくれるようです。

BigQueryGetDatasetOperator の処理内容を確認

もう少し詳細に、/airflow/contrib/operators/bigquery_operator.py に実装されている、BigQueryGetDatasetOperator のコードを確認してみます。

execute() メソッドが実行されると、BigQueryHook インスタンスが作成され、connection を作り、cursor を取得して get_dataset() メソッドを実行します。

bigquery_operator.py

(省略)
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
(省略)
class BigQueryGetDatasetOperator(BaseOperator):
(省略)

    template_fields = ('dataset_id', 'project_id')
    ui_color = '#f0eee4'

    @apply_defaults
    def __init__(self,
                 dataset_id,
                 project_id=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args, **kwargs):
        self.dataset_id = dataset_id
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.delegate_to = delegate_to
        super(BigQueryGetDatasetOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
                               delegate_to=self.delegate_to)
        conn = bq_hook.get_conn()
        cursor = conn.cursor()

        self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id)

        return cursor.get_dataset(
            dataset_id=self.dataset_id,
            project_id=self.project_id)
(省略)

BigQueryHook の中身も確認してみます。

実体は /airflow/contrib/hooks/bigquery_hook.py に実装されています。

bigquery_hook.pygoogleapiclient.discovery が import されていることから、どうやらここから googleapiclient を使用して GCP にアクセスしているようです。

BigQueryHookget_conn() メソッドでは、BigQueryConnection インスタンスを作成して返却しています。 BigQueryConnectioncursor() メソッドでは、BigQueryCursor インスタンスを返却。 BigQueryCursor クラスは BigQueryBaseCursor クラスを継承していました。

bigquery_hook.py

(省略)
from googleapiclient.discovery import build
(省略)
class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
    """
    Interact with BigQuery. This hook uses the Google Cloud Platform
    connection.
    """
    conn_name_attr = 'bigquery_conn_id'

    def __init__(self,
                 bigquery_conn_id='bigquery_default',
                 delegate_to=None,
                 use_legacy_sql=True,
                 location=None):
        super(BigQueryHook, self).__init__(
            gcp_conn_id=bigquery_conn_id, delegate_to=delegate_to)
        self.use_legacy_sql = use_legacy_sql
        self.location = location

    def get_conn(self):
        """
        Returns a BigQuery PEP 249 connection object.
        """
        service = self.get_service()
        project = self._get_field('project')
        return BigQueryConnection(
            service=service,
            project_id=project,
            use_legacy_sql=self.use_legacy_sql,
            location=self.location,
            num_retries=self.num_retries
        )
(省略)
class BigQueryConnection(object):
    """
    BigQuery does not have a notion of a persistent connection. Thus, these
    objects are small stateless factories for cursors, which do all the real
    work.
    """

    def __init__(self, *args, **kwargs):
        self._args = args
        self._kwargs = kwargs

    def close(self):
        """ BigQueryConnection does not have anything to close. """

    def commit(self):
        """ BigQueryConnection does not support transactions. """

    def cursor(self):
        """ Return a new :py:class:`Cursor` object using the connection. """
        return BigQueryCursor(*self._args, **self._kwargs)

    def rollback(self):
        raise NotImplementedError(
            "BigQueryConnection does not have transactions")
(省略)
class BigQueryCursor(BigQueryBaseCursor):
    """
    A very basic BigQuery PEP 249 cursor implementation. The PyHive PEP 249
    implementation was used as a reference:

    https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py
    https://github.com/dropbox/PyHive/blob/master/pyhive/common.py
    """

    def __init__(self, service, project_id, use_legacy_sql=True, location=None, num_retries=5):
        super(BigQueryCursor, self).__init__(
            service=service,
            project_id=project_id,
            use_legacy_sql=use_legacy_sql,
            location=location,
            num_retries=num_retries
        )
        self.buffersize = None
        self.page_token = None
        self.job_id = None
        self.buffer = []
        self.all_pages_loaded = False
(省略)

継承元の BigQueryBaseCursor のコードを確認してみると、get_dataset() メソッドで datasets().get() を実行していることが確認できました。

bigquery_hook.py

(省略)
class BigQueryBaseCursor(LoggingMixin):
    """
    The BigQuery base cursor contains helper methods to execute queries against
    BigQuery. The methods can be used directly by operators, in cases where a
    PEP 249 cursor isn't needed.
    """

    def __init__(self,
                 service,
                 project_id,
                 use_legacy_sql=True,
                 api_resource_configs=None,
                 location=None,
                 num_retries=5):

        self.service = service
        self.project_id = project_id
        self.use_legacy_sql = use_legacy_sql
        if api_resource_configs:
            _validate_value("api_resource_configs", api_resource_configs, dict)
        self.api_resource_configs = api_resource_configs \
            if api_resource_configs else {}
        self.running_job_id = None
        self.location = location
        self.num_retries = num_retries
(省略)
    def get_dataset(self, dataset_id, project_id=None):
        """
        Method returns dataset_resource if dataset exist
        and raised 404 error if dataset does not exist

        :param dataset_id: The BigQuery Dataset ID
        :type dataset_id: str
        :param project_id: The GCP Project ID
        :type project_id: str
        :return: dataset_resource

            .. seealso::
                For more information, see Dataset Resource content:
                https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
        """

        if not dataset_id or not isinstance(dataset_id, str):
            raise ValueError("dataset_id argument must be provided and has "
                             "a type 'str'. You provided: {}".format(dataset_id))

        dataset_project_id = project_id if project_id else self.project_id

        try:
            dataset_resource = self.service.datasets().get(
                datasetId=dataset_id, projectId=dataset_project_id).execute(num_retries=self.num_retries)
            self.log.info("Dataset Resource: %s", dataset_resource)
        except HttpError as err:
            raise AirflowException(
                'BigQuery job failed. Error was: {}'.format(err.content))

        return dataset_resource
(省略)

データセット一覧を取得

データセット一覧は、datasets().list() API で取得できそうです。

BigQueryBaseCursor クラスに、datasets().list() API をコールする get_datasets_list() メソッドがありました。

bigquery_hook.py

(省略)
class BigQueryBaseCursor(LoggingMixin):
(省略)
    def get_datasets_list(self, project_id=None):
        """
        Method returns full list of BigQuery datasets in the current project

        .. seealso::
            For more information, see:
            https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list

        :param project_id: Google Cloud Project for which you
            try to get all datasets
        :type project_id: str
        :return: datasets_list

            Example of returned datasets_list: ::

                   {
                      "kind":"bigquery#dataset",
                      "location":"US",
                      "id":"your-project:dataset_2_test",
                      "datasetReference":{
                         "projectId":"your-project",
                         "datasetId":"dataset_2_test"
                      }
                   },
                   {
                      "kind":"bigquery#dataset",
                      "location":"US",
                      "id":"your-project:dataset_1_test",
                      "datasetReference":{
                         "projectId":"your-project",
                         "datasetId":"dataset_1_test"
                      }
                   }
                ]
        """
        dataset_project_id = project_id if project_id else self.project_id

        try:
            datasets_list = self.service.datasets().list(
                projectId=dataset_project_id).execute(num_retries=self.num_retries)['datasets']
            self.log.info("Datasets List: %s", datasets_list)

        except HttpError as err:
            raise AirflowException(
                'BigQuery job failed. Error was: {}'.format(err.content))

        return datasets_list
(省略)

既存コードを grep してみましたが、get_datasets_list() を使っているオペレータはなさそうなので、他のオペレータクラスを参考にして、新規で作成してみます。

以下のクラスを /airflow/contrib/operators/bigquery_operator.py に追加しました。

※コード管理やメンテナンス性を考えると、拡張コードは別ディレクトリ、別ファイルに分けた方が良いと思いますが、今回は簡易性を優先して既存ファイルを更新しています。

bigquery_operator.py

class BigQueryGetDatasetListOperator(BaseOperator):

    template_fields = ('project_id',)
    ui_color = '#f0eee4'

    @apply_defaults
    def __init__(self,
                 project_id=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args, **kwargs):
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.delegate_to = delegate_to
        super(BigQueryGetDatasetListOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
                               delegate_to=self.delegate_to)
        conn = bq_hook.get_conn()
        cursor = conn.cursor()

        self.log.info('Start getting dataset list: %s', self.project_id)

        return cursor.get_datasets_list(project_id=self.project_id)

追加したオペレータを実行する DAG ファイルも追加しました。

example_bigquery_getdatasetlist_operator.py

# -*- coding: utf-8 -*-

from typing import Any

import airflow
from airflow import models

bigquery_operator = None  # type: Any
try:
    from airflow.contrib.operators import bigquery_operator
except ImportError:
    pass

if bigquery_operator is not None:
    args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = models.DAG(
        dag_id='example_bigquery_getdatasetlist_operator', default_args=args,
        schedule_interval=None)

    get_dataset = bigquery_operator.BigQueryGetDatasetListOperator(
        task_id='bigquery_getdatasetlist_example',
        project_id='cm-da-mikami-yuki-258308',
        dag=dag)

現在、GCP の 対象プロジェクトには、airflow_testtest_s3 の2つのデータセットがある状態です。

DAG ファイルを dags ディレクトリに配置して、実行してみます。

正常に実行できたようです。

ログを確認してみると、取得したデータセットのリストが出力されています。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdatasetlist_operator/bigquery_getdatasetlist_example/2020-03-17T08:11:54.852025+00:00/1.log
[2020-03-17 08:12:05,305] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdatasetlist_operator.bigquery_getdatasetlist_example 2020-03-17T08:11:54.852025+00:00 [queued]>
[2020-03-17 08:12:05,312] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdatasetlist_operator.bigquery_getdatasetlist_example 2020-03-17T08:11:54.852025+00:00 [queued]>
[2020-03-17 08:12:05,312] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 08:12:05,312] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-03-17 08:12:05,312] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 08:12:05,322] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetDatasetListOperator): bigquery_getdatasetlist_example> on 2020-03-17T08:11:54.852025+00:00
[2020-03-17 08:12:05,323] {standard_task_runner.py:53} INFO - Started process 12393 to run task
[2020-03-17 08:12:05,382] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_getdatasetlist_operator.bigquery_getdatasetlist_example 2020-03-17T08:11:54.852025+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal
[2020-03-17 08:12:05,399] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,399] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-03-17 08:12:05,402] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,402] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2020-03-17 08:12:05,701] {bigquery_operator.py:851} INFO - Start getting dataset list: cm-da-mikami-yuki-258308
[2020-03-17 08:12:05,703] {logging_mixin.py:112} INFO - [2020-03-17 08:12:05,703] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets?alt=json
[2020-03-17 08:12:06,214] {logging_mixin.py:112} INFO - [2020-03-17 08:12:06,213] {bigquery_hook.py:1809} INFO - Datasets List: [{'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:airflow_test', 'datasetReference': {'datasetId': 'airflow_test', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'US'}, {'kind': 'bigquery#dataset', 'id': 'cm-da-mikami-yuki-258308:test_s3', 'datasetReference': {'datasetId': 'test_s3', 'projectId': 'cm-da-mikami-yuki-258308'}, 'location': 'asia-northeast1'}]
[2020-03-17 08:12:06,225] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdatasetlist_operator, task_id=bigquery_getdatasetlist_example, execution_date=20200317T081154, start_date=20200317T081205, end_date=20200317T081206
[2020-03-17 08:12:15,303] {logging_mixin.py:112} INFO - [2020-03-17 08:12:15,303] {local_task_job.py:103} INFO - Task exited with return code 0

以下、レスポンス部分の抜粋です。

Datasets List: [
	{
		'kind': 'bigquery#dataset', 
		'id': 'cm-da-mikami-yuki-258308:airflow_test', 
		'datasetReference': {
			'datasetId': 'airflow_test', 
			'projectId': 'cm-da-mikami-yuki-258308'
		}, 
		'location': 'US'
	}, 
	{
		'kind': 'bigquery#dataset', 
		'id': 'cm-da-mikami-yuki-258308:test_s3', 
		'datasetReference': {
			'datasetId': 'test_s3', 
			'projectId': 'cm-da-mikami-yuki-258308'
		}, 
		'location': 'asia-northeast1'
	}
]

データセットが正常に取得できました。

テーブル一覧を取得

テーブル一覧は、tables().list() API で取得できるようです。

既存コードを確認したところ、テーブル一覧取得用のメソッドはないようです。

BigQueryBaseCursor クラスに、tables().list() API をコールする get_table_list() メソッドを追加しました。

bigquery_hook.py

def get_table_list(self, dataset_id, project_id=None):

        table_project_id = project_id if project_id else self.project_id

        try:
            tables_list = self.service.tables().list(
                projectId=table_project_id, datasetId=dataset_id).execute(num_retries=self.num_retries)
            self.log.info("Tables List: %s", tables_list)

        except HttpError as err:
            raise AirflowException(
                'BigQuery job failed. Error was: {}'.format(err.content))

        return tables_list

さらに、テーブルリスト取得用のオペレータと、オペレータを実行する DAG ファイルも追加します。

bigquery_operator.py

class BigQueryGetTableListOperator(BaseOperator):
    template_fields = ('dataset_id', 'project_id')
    ui_color = '#f0eee4'

    @apply_defaults
    def __init__(self,
                 dataset_id,
                 project_id=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 *args, **kwargs):
        self.dataset_id = dataset_id
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.delegate_to = delegate_to
        super(BigQueryGetTableListOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
                               delegate_to=self.delegate_to)
        conn = bq_hook.get_conn()
        cursor = conn.cursor()

        self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id)

        return cursor.get_table_list(
            dataset_id=self.dataset_id,
            project_id=self.project_id)

example_bigquery_gettablelist_operator.py

# -*- coding: utf-8 -*-

from typing import Any

import airflow
from airflow import models

bigquery_operator = None  # type: Any
try:
    from airflow.contrib.operators import bigquery_operator
except ImportError:
    pass

if bigquery_operator is not None:
    args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = models.DAG(
        dag_id='example_bigquery_gettablelist_operator', default_args=args,
        schedule_interval=None)

    get_dataset = bigquery_operator.BigQueryGetTableListOperator(
        task_id='bigquery_gettablelist_example',
        dataset_id='airflow_test',
        project_id='cm-da-mikami-yuki-258308',
        dag=dag)

取得対象の airflow_test データセットには、name_[YYYY] テーブルが3つある状態です。

DAG を実行してログを確認してみます。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_gettablelist_operator/bigquery_gettablelist_example/2020-03-17T08:58:24.271847+00:00/1.log
[2020-03-17 08:58:27,986] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_gettablelist_operator.bigquery_gettablelist_example 2020-03-17T08:58:24.271847+00:00 [queued]>
[2020-03-17 08:58:27,994] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_gettablelist_operator.bigquery_gettablelist_example 2020-03-17T08:58:24.271847+00:00 [queued]>
[2020-03-17 08:58:27,994] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 08:58:27,994] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-03-17 08:58:27,994] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 08:58:28,003] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetTableListOperator): bigquery_gettablelist_example> on 2020-03-17T08:58:24.271847+00:00
[2020-03-17 08:58:28,005] {standard_task_runner.py:53} INFO - Started process 14010 to run task
[2020-03-17 08:58:28,079] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_gettablelist_operator.bigquery_gettablelist_example 2020-03-17T08:58:24.271847+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal
[2020-03-17 08:58:28,106] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,105] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-03-17 08:58:28,111] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,111] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2020-03-17 08:58:28,417] {bigquery_operator.py:879} INFO - Start getting dataset: cm-da-mikami-yuki-258308:airflow_test
[2020-03-17 08:58:28,421] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,421] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables?alt=json
[2020-03-17 08:58:28,789] {logging_mixin.py:112} INFO - [2020-03-17 08:58:28,789] {bigquery_hook.py:1437} INFO - Tables List: {'kind': 'bigquery#tableList', 'etag': '4rK75YI14CTPa28HhVLyEQ==', 'tables': [{'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_1880'}, 'type': 'TABLE', 'creationTime': '1584434061049'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2000'}, 'type': 'TABLE', 'creationTime': '1584102254437'}, {'kind': 'bigquery#table', 'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 'tableReference': {'projectId': 'cm-da-mikami-yuki-258308', 'datasetId': 'airflow_test', 'tableId': 'name_2018'}, 'type': 'TABLE', 'creationTime': '1584434135714'}], 'totalItems': 3}
[2020-03-17 08:58:28,801] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_gettablelist_operator, task_id=bigquery_gettablelist_example, execution_date=20200317T085824, start_date=20200317T085827, end_date=20200317T085828

取得したテーブルリストは以下です。

Tables List: {
	'kind': 'bigquery#tableList', 
	'etag': '4rK75YI14CTPa28HhVLyEQ==', 
	'tables': [
		{
			'kind': 'bigquery#table', 
			'id': 'cm-da-mikami-yuki-258308:airflow_test.name_1880', 
			'tableReference': {
				'projectId': 'cm-da-mikami-yuki-258308', 
				'datasetId': 'airflow_test', 
				'tableId': 'name_1880'
			}, 
			'type': 'TABLE', 
			'creationTime': '1584434061049'
		}, 
		{
			'kind': 'bigquery#table', 
			'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2000', 
			'tableReference': {
				'projectId': 'cm-da-mikami-yuki-258308', 
				'datasetId': 'airflow_test', 
				'tableId': 'name_2000'
			}, 
			'type': 'TABLE', 
			'creationTime': '1584102254437'
		}, 
		{
			'kind': 'bigquery#table', 
			'id': 'cm-da-mikami-yuki-258308:airflow_test.name_2018', 
			'tableReference': {
				'projectId': 'cm-da-mikami-yuki-258308', 
				'datasetId': 'airflow_test', 
				'tableId': 'name_2018'
			}, 
			'type': 'TABLE', 
			'creationTime': '1584434135714'
		}
	], 
	'totalItems': 3
}

期待通り、3つのテーブルが取得できました。

テーブルデータを取得

テーブルデータを取得するためには、tabledata().list() をコールすれば良さそうです。

既存の /airflow/contrib/operators/bigquery_get_data.py ファイルにある BigQueryGetDataOperator が使えそうです。

DAG ファイルを追加して動作確認してみます。

example_bigquery_getdata_operator.py

# -*- coding: utf-8 -*-

from typing import Any

import airflow
from airflow import models

bigquery_get_data = None  # type: Any
try:
    from airflow.contrib.operators import bigquery_get_data
except ImportError:
    pass

if bigquery_get_data is not None:
    args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = models.DAG(
        dag_id='example_bigquery_getdata_operator', default_args=args,
        schedule_interval=None)

    get_dataset = bigquery_get_data.BigQueryGetDataOperator(
        task_id='bigquery_getdata_example',
        dataset_id='airflow_test',
        table_id='name_2000',
        max_results='10',
        dag=dag)

完了したようなので、ログを確認してみると。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdata_operator/bigquery_getdata_example/2020-03-17T09:40:56.917329+00:00/1.log
[2020-03-17 09:41:06,239] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:40:56.917329+00:00 [queued]>
[2020-03-17 09:41:06,246] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:40:56.917329+00:00 [queued]>
[2020-03-17 09:41:06,246] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 09:41:06,246] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-03-17 09:41:06,247] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 09:41:06,255] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetDataOperator): bigquery_getdata_example> on 2020-03-17T09:40:56.917329+00:00
[2020-03-17 09:41:06,257] {standard_task_runner.py:53} INFO - Started process 15511 to run task
[2020-03-17 09:41:06,316] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:40:56.917329+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal
[2020-03-17 09:41:06,330] {bigquery_get_data.py:92} INFO - Fetching Data from:
[2020-03-17 09:41:06,330] {bigquery_get_data.py:94} INFO - Dataset: airflow_test ; Table: name_2000 ; Max Results: 10
[2020-03-17 09:41:06,335] {logging_mixin.py:112} INFO - [2020-03-17 09:41:06,335] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2020-03-17 09:41:06,676] {logging_mixin.py:112} INFO - [2020-03-17 09:41:06,676] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data?maxResults=10&alt=json
[2020-03-17 09:41:07,286] {bigquery_get_data.py:106} INFO - Total Extracted rows: 29772
[2020-03-17 09:41:07,297] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdata_operator, task_id=bigquery_getdata_example, execution_date=20200317T094056, start_date=20200317T094106, end_date=20200317T094107
[2020-03-17 09:41:16,236] {logging_mixin.py:112} INFO - [2020-03-17 09:41:16,236] {local_task_job.py:103} INFO - Task exited with return code 0

行数の取得結果は実際のテーブルレコードと一致しているので、正常に実行できているようです。

が、データの中身も確認したい。。ので、BigQueryGetDataOperator に取得したデータを出力するログを追加して、再度実行してみます。

bigquery_get_data.py

(省略)
class BigQueryGetDataOperator(BaseOperator):
(省略)
    def execute(self, context):
        self.log.info('Fetching Data from:')
        self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
                      self.dataset_id, self.table_id, self.max_results)

        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                            delegate_to=self.delegate_to)

        conn = hook.get_conn()
        cursor = conn.cursor()
        response = cursor.get_tabledata(dataset_id=self.dataset_id,
                                        table_id=self.table_id,
                                        max_results=self.max_results,
                                        selected_fields=self.selected_fields)

        self.log.info('Total Extracted rows: %s', response['totalRows'])
        rows = response['rows']
# add mikami
        self.log.info('Data: %s', rows)

        table_data = []
        for dict_row in rows:
            single_row = []
            for fields in dict_row['f']:
                single_row.append(fields['v'])
            table_data.append(single_row)

        return table_data
(省略)

今度はログに取得したデータも出力されました。

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_getdata_operator/bigquery_getdata_example/2020-03-17T09:53:54.223409+00:00/1.log
[2020-03-17 09:55:10,380] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:53:54.223409+00:00 [queued]>
[2020-03-17 09:55:10,387] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:53:54.223409+00:00 [queued]>
[2020-03-17 09:55:10,387] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 09:55:10,387] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-03-17 09:55:10,387] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 09:55:10,396] {taskinstance.py:887} INFO - Executing <Task(BigQueryGetDataOperator): bigquery_getdata_example> on 2020-03-17T09:53:54.223409+00:00
[2020-03-17 09:55:10,398] {standard_task_runner.py:53} INFO - Started process 16017 to run task
[2020-03-17 09:55:10,456] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_getdata_operator.bigquery_getdata_example 2020-03-17T09:53:54.223409+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal
[2020-03-17 09:55:10,469] {bigquery_get_data.py:92} INFO - Fetching Data from:
[2020-03-17 09:55:10,469] {bigquery_get_data.py:94} INFO - Dataset: airflow_test ; Table: name_2000 ; Max Results: 10
[2020-03-17 09:55:10,474] {logging_mixin.py:112} INFO - [2020-03-17 09:55:10,474] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2020-03-17 09:55:10,796] {logging_mixin.py:112} INFO - [2020-03-17 09:55:10,796] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/datasets/airflow_test/tables/name_2000/data?maxResults=10&alt=json
[2020-03-17 09:55:12,168] {bigquery_get_data.py:106} INFO - Total Extracted rows: 29772
[2020-03-17 09:55:12,168] {bigquery_get_data.py:109} INFO - Data: [{'f': [{'v': 'Emily'}, {'v': 'F'}, {'v': '25956'}]}, {'f': [{'v': 'Hannah'}, {'v': 'F'}, {'v': '23082'}]}, {'f': [{'v': 'Madison'}, {'v': 'F'}, {'v': '19968'}]}, {'f': [{'v': 'Ashley'}, {'v': 'F'}, {'v': '17997'}]}, {'f': [{'v': 'Sarah'}, {'v': 'F'}, {'v': '17702'}]}, {'f': [{'v': 'Alexis'}, {'v': 'F'}, {'v': '17629'}]}, {'f': [{'v': 'Samantha'}, {'v': 'F'}, {'v': '17265'}]}, {'f': [{'v': 'Jessica'}, {'v': 'F'}, {'v': '15709'}]}, {'f': [{'v': 'Elizabeth'}, {'v': 'F'}, {'v': '15099'}]}, {'f': [{'v': 'Taylor'}, {'v': 'F'}, {'v': '15078'}]}]
[2020-03-17 09:55:12,180] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_getdata_operator, task_id=bigquery_getdata_example, execution_date=20200317T095354, start_date=20200317T095510, end_date=20200317T095512
Data: [
	{'f': [{'v': 'Emily'}, {'v': 'F'}, {'v': '25956'}]}, 
	{'f': [{'v': 'Hannah'}, {'v': 'F'}, {'v': '23082'}]}, 
	{'f': [{'v': 'Madison'}, {'v': 'F'}, {'v': '19968'}]}, 
	{'f': [{'v': 'Ashley'}, {'v': 'F'}, {'v': '17997'}]}, 
	{'f': [{'v': 'Sarah'}, {'v': 'F'}, {'v': '17702'}]}, 
	{'f': [{'v': 'Alexis'}, {'v': 'F'}, {'v': '17629'}]}, 
	{'f': [{'v': 'Samantha'}, {'v': 'F'}, {'v': '17265'}]}, 
	{'f': [{'v': 'Jessica'}, {'v': 'F'}, {'v': '15709'}]}, 
	{'f': [{'v': 'Elizabeth'}, {'v': 'F'}, {'v': '15099'}]}, 
	{'f': [{'v': 'Taylor'}, {'v': 'F'}, {'v': '15078'}]}
]

この API ではカラム名までは取得できないようですが、ひとまず、テーブルデータを取得することができました。

Repeated rows as result. The REST-based representation of this data leverages a series of JSON f,v objects for indicating fields and values.

テーブルにデータを insert / update / delete

BigQuery に対して、DAG から自由に SQL クエリを実行するにはどうするの? と思って調べたところ、BigQueryOperator に SQL を渡してあげれば実行できそうです。

name_2000 テーブルに name='Mikami' のレコードを insert する、以下の DAG を追加して実行しました。

example_bigquery_sql_operator.py

# -*- coding: utf-8 -*-

from typing import Any

import airflow
from airflow import models

bigquery_operator = None  # type: Any
try:
    from airflow.contrib.operators import bigquery_operator
except ImportError:
    pass

if bigquery_operator is not None:
    args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = models.DAG(
        dag_id='example_bigquery_sql_operator', default_args=args,
        schedule_interval=None)

    query="INSERT INTO `cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami', 'F', 1)"
    bigquery_sql = bigquery_operator.BigQueryOperator(
        task_id='bigquery_sql_example',
        sql=query,
        use_legacy_sql=False,
        dag=dag)

正常終了したようなので、GCP の管理コンソールからデータを確認してみると。

ちゃんと DAG からレコードの追加が行われました。

続いて先ほど insert したレコードを、name='Yuki' で update してみます。

DAG ファイルの SQL を update クエリに変更して実行してみます。

example_bigquery_sql_operator.py

(省略)
    dag = models.DAG(
        dag_id='example_bigquery_sql_operator', default_args=args,
        schedule_interval=None)

#    query="INSERT INTO `cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami', 'F', 1)"
    query="UPDATE `cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = 'Yuki' WHERE name = 'Mikami' and gender = 'F' and count = 1"
    bigquery_sql = bigquery_operator.BigQueryOperator(
        task_id='bigquery_sql_example',
        sql=query,
        use_legacy_sql=False,
        dag=dag)

DAG が正常終了したので、GCP 管理コンソールから select クエリ実行してデータを確認してみます。

正常に update されたことが確認できました。

さらに、先ほど insert & update したレコードを delete します。

example_bigquery_sql_operator.py

(省略)
    dag = models.DAG(
        dag_id='example_bigquery_sql_operator', default_args=args,
        schedule_interval=None)

#    query="INSERT INTO `cm-da-mikami-yuki-258308.airflow_test.name_2000` VALUES ('Mikami', 'F', 1)"
#    query="UPDATE `cm-da-mikami-yuki-258308.airflow_test.name_2000` SET name = 'Yuki' WHERE name = 'Mikami' and gender = 'F' and count = 1"
    query="DELETE FROM `cm-da-mikami-yuki-258308.airflow_test.name_2000` WHERE name = 'Yuki' and gender = 'F' and count = 1"
    bigquery_sql = bigquery_operator.BigQueryOperator(
        task_id='bigquery_sql_example',
        sql=query,
        use_legacy_sql=False,
        dag=dag)

DAG 実行後、再度 GCP 管理コンソールからクエリ実行してデータを確認してみると。

delete も、期待通り実行できました。

テーブルデータを GCS に export する

最後に、テーブルデータを GCS にファイル出力してみます。

/airflow/contrib/operators/bigquery_to_gcs.pyBigQueryToCloudStorageOperator を使えば、DAG 追加だけで実行できそうです。

name_2000 テーブルのデータを、gs://test-cm-mikami/test_export/exp_name_2000.txt に export する、以下の DAG ファイルを追加して、実行してみます。

example_bigquery_to_gcs.py

# -*- coding: utf-8 -*-

from typing import Any

import airflow
from airflow import models

bigquery_to_gcs = None  # type: Any
try:
    from airflow.contrib.operators import bigquery_to_gcs
except ImportError:
    pass

if bigquery_to_gcs is not None:
    args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = models.DAG(
        dag_id='example_bigquery_to_gcs_operator', default_args=args,
        schedule_interval=None)

    get_dataset = bigquery_to_gcs.BigQueryToCloudStorageOperator(
        task_id='bigquery_to_gcs_example',
        source_project_dataset_table='cm-da-mikami-yuki-258308.airflow_test.name_2000',
        destination_cloud_storage_uris='gs://test-cm-mikami/test_export/exp_name_2000.txt',
        dag=dag)

実行前、GCS の対象パスには、何もファイルがない状態です。

DAG 実行が正常に完了したことを確認して

*** Reading local file: /home/ec2-user/airflow/logs/example_bigquery_to_gcs_operator/bigquery_to_gcs_example/2020-03-17T11:34:24.397858+00:00/1.log
[2020-03-17 11:34:42,238] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_to_gcs_operator.bigquery_to_gcs_example 2020-03-17T11:34:24.397858+00:00 [queued]>
[2020-03-17 11:34:42,245] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: example_bigquery_to_gcs_operator.bigquery_to_gcs_example 2020-03-17T11:34:24.397858+00:00 [queued]>
[2020-03-17 11:34:42,245] {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 11:34:42,245] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-03-17 11:34:42,245] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-03-17 11:34:42,255] {taskinstance.py:887} INFO - Executing <Task(BigQueryToCloudStorageOperator): bigquery_to_gcs_example> on 2020-03-17T11:34:24.397858+00:00
[2020-03-17 11:34:42,257] {standard_task_runner.py:53} INFO - Started process 19929 to run task
[2020-03-17 11:34:42,315] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: example_bigquery_to_gcs_operator.bigquery_to_gcs_example 2020-03-17T11:34:24.397858+00:00 [running]> ip-10-0-43-239.ap-northeast-1.compute.internal
[2020-03-17 11:34:42,329] {bigquery_to_gcs.py:93} INFO - Executing extract of cm-da-mikami-yuki-258308.airflow_test.name_2000 into: gs://test-cm-mikami/test_export/exp_name_2000.txt
[2020-03-17 11:34:42,334] {logging_mixin.py:112} INFO - [2020-03-17 11:34:42,334] {discovery.py:275} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2020-03-17 11:34:42,645] {logging_mixin.py:112} INFO - [2020-03-17 11:34:42,644] {discovery.py:894} INFO - URL being requested: POST https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs?alt=json
[2020-03-17 11:34:43,677] {logging_mixin.py:112} INFO - [2020-03-17 11:34:43,677] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY?location=US&alt=json
[2020-03-17 11:34:44,198] {logging_mixin.py:112} INFO - [2020-03-17 11:34:44,198] {bigquery_hook.py:1347} INFO - Waiting for job to complete : cm-da-mikami-yuki-258308, job_NptEvLRIZj6NzF6O8fy2mgtoMlKY
[2020-03-17 11:34:49,204] {logging_mixin.py:112} INFO - [2020-03-17 11:34:49,203] {discovery.py:894} INFO - URL being requested: GET https://bigquery.googleapis.com/bigquery/v2/projects/cm-da-mikami-yuki-258308/jobs/job_NptEvLRIZj6NzF6O8fy2mgtoMlKY?location=US&alt=json
[2020-03-17 11:34:49,493] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=example_bigquery_to_gcs_operator, task_id=bigquery_to_gcs_example, execution_date=20200317T113424, start_date=20200317T113442, end_date=20200317T113449
[2020-03-17 11:34:52,237] {logging_mixin.py:112} INFO - [2020-03-17 11:34:52,236] {local_task_job.py:103} INFO - Task exited with return code 0

GCP 管理コンソールから GCS のファイルを確認してみると

ファイルが出力されています。

出力ファイルをダウンロードして中身を確認してみると

name,gender,count
Lisette,F,256
Monserrat,F,256
Kalyn,F,256
Julianne,F,512
Tayler,F,512
Kayla,F,13312
Madison,F,19968
Keara,F,257
Randi,F,257
Reanna,F,257
Ingrid,F,257
Eryn,F,257
Alexus,F,1281
(省略)

テーブルデータが正常に export されたことが確認できました。

まとめ(所感)

Airflow の BigQuery 関連のオペレータでは、BigQuery API を使って Bigquery にアクセスしていて、実際にどの API をコールしているのかは bigquery_hook.py を確認すれば良さそうです。

また、既存にはない処理も、BigQuery API の仕様を確認しながら自由に実装できました。

他にも BashOperator で gcloud CLI も簡単に実行できるので、Airflow 経由での BigQuery 関連の操作に困ることはなさそうです。

参考