Cloud Data Fusion の HTTPプラグイン を使って BigQuery にデータを投入してみた

2022.05.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!エノカワです。

Google Cloud の データ統合サービスである Cloud Data Fusion は、 データパイプラインを迅速に構築・管理するためのフルマネージドなサービスです。

インフラストラクチャを管理することなく、ノンコーディングでデータパイプラインを構築できるということで、
「手軽に外部データをBigQueryテーブルに投入なんかもできるのかな?」と気になっていました。

そこで今回は、Webサービスから取得したデータをBigQueryテーブルに投入することを目標に Cloud Data Fusion を触ってみました。

概要

Cloud Data Fusion では、ノードを接続してパイプラインを構成します。
今回作成するパイプラインはこんな感じです。

HTTPノードでWebサービスからデータを取得し、BigQueryノードでBigQueryテーブルにデータを保存するパイプラインを作成します。

データ形式

シンプルなHTTPリクエスト&レスポンスで試したかったところに、うってつけのサービスがありましたので利用させていただきました!

HTTPリクエストをすると様々な形式のレスポンスを返してくれるWebサービスです。
例えば、下記URLにリクエストするとJSON形式のレスポンスを返してくれます。

リクエストURL

https://httpbin.org/json

レスポンスボディ

{
  "slideshow": {
    "author": "Yours Truly",
    "date": "date of publication",
    "slides": [
      {
        "title": "Wake up to WonderWidgets!",
        "type": "all"
      },
      {
        "items": [
          "Why <em>WonderWidgets</em> are great",
          "Who <em>buys</em> WonderWidgets"
        ],
        "title": "Overview",
        "type": "all"
      }
    ],
    "title": "Sample Slide Show"
  }
}

今回は上記JSON形式のレスポンスボディをBigQueryテーブルに投入することを目指します。

インスタンス作成

まずは、Cloud Data Fusion のインスタンスを作成します。

メニューからData Fusionを選択します。
Cloud Data Fusion APIが有効になっていれば、インスタンスページが表示されます。

Cloud Data Fusion APIが有効になっていない場合は、有効にするボタンをクリックしてAPIを有効にします。

インスタンスを作成をクリックします。

インスタンス作成画面が表示されるので、作成するインスタンスの情報を入力します。
今回は以下の情報を入力しました。

  • インスタンス名
    fusion-training
  • リージョン
    asia-northeast1 を選択
  • バージョン
    6.6.0 を選択
  • エディション
    Basic を選択

Cloud Data Fusion は、パイプラインの実行にDataproc クラスタを使用します。

Dataproc クラスタでパイプラインをプロビジョニングおよび実行できる権限を持つサービスアカウントをDataproc サービス アカウントに指定する必要があります。
デフォルトでは Compute Engine アカウントがあらかじめ選択されていますが、任意のサービスアカウントを指定できます。
必要な権限が足りない場合は画像にあるような警告メッセージが表示されますが、ここで権限を付与できます。

今回はデフォルトの Compute Engine アカウントを指定することにして、必要な権限を付与しましょう。
権限を付与ボタンをクリックします。

権限を付与ボタンをクリック後、「IAM」のページを見に行ってみると以下の権限が付与されていました。

  • Cloud Data Fusion 実行者
  • Dataproc ワーカー

インスタンス情報の入力が完了したので、作成ボタンをクリックします。

ちなみに、詳細オプションでは以下を設定できます。
今回はデフォルト(すべてOFF)のまま作成しました。

  • プライベートIPを有効化
  • Stackdriver Logging サービスを有効にする
  • Stackdriver Monitoring サービスを有効にする
  • 顧客管理の暗号鍵(CMEK)を使用する

作成ボタンをクリック後、しばらくすると、インスタンスが作成されました。
インスタンスを表示の箇所をクリックします。

これで準備が整いました。パイラインの作成に入っていきましょう!

パイプライン作成

Cloud Data Fusion のホーム画面が表示されました。
メニューからStudioを選択します。

Pipeline Studio のページに切り替わりました。
このページで、ソース、変換、シンクなどのノードを接続することで、パイプラインを形成できます。

今回は画面左のSourceからHTTPノード、SinkからBigQueryノードを選択して Pipeline Studio に配置し、それらを接続してパイプラインを形成します。
デフォルトではSourceHTTPノードは無いようです。

HTTPプラグイン展開

Cloud Data Fusion には機能を拡張するために使用できるカスタマイズ可能なモジュールとして、プラグインがあります。

画面右上のHUBをクリックするとコンポーネントの一覧が表示されるので、ここからHTTPプラグインを探します。

検索バーにHTTPと入力すると、HTTPプラグインが見つかりました。

HTTPプラグインを選択し、Deployボタンをクリックします。

デプロイ画面が表示されるので、Finishボタンをクリックします。

HTTPプラグインのデプロイが完了しました。
続けてパイプラインの作成を行いますので、Create a pipelineボタンをクリックします。

HTTPノード設定

画面左のSource内にHTTPノードが追加されています。
選択すると、HTTPノードが Pipeline Studio に配置されました。

ソースとしてのHTTPノードが配置されましたので、httpbinにリクエストする設定をしていきましょう。
HTTPノードのPropertiesをクリックします。

HTTPノードの設定画面が表示されました。
左側はHTTP設定、右側は出力データの形式です。

HTTP設定

識別名、リクエストURLを入力、HTTPメソッドを選択します。
今回は以下の情報を入力しました。

  • Reference Name
    from-httpbin
  • URL
    https://httpbin.org/json
  • HTTP Method
    GET を選択

続けてHTTPレスポンスのフォーマットを入力します。
データ形式がJSONの場合、JSONパスとフィールドマッピングも指定できるので、レスポンスボディの内容をもとにマッピングします。

  • Format
    json
  • JSON/MXL Result Path
    /slideshow
  • JSON/XML Fields Mapping
    Field Name Field Path
    author /author
    date /date
    slides /slides
    title /title

レスポンスボディ(再掲)

{
  "slideshow": {
    "author": "Yours Truly",
    "date": "date of publication",
    "slides": [
      {
        "title": "Wake up to WonderWidgets!",
        "type": "all"
      },
      {
        "items": [
          "Why <em>WonderWidgets</em> are great",
          "Who <em>buys</em> WonderWidgets"
        ],
        "title": "Overview",
        "type": "all"
      }
    ],
    "title": "Sample Slide Show"
  }
}

出力データ構造

Output Schemaに出力データ構造を入力します。
マッピングしたフィールドをもとにスキーマを構成します。
このデータ構造がBigQueryノードに渡されます。

BigQueryノード設定

次にBigQueryノードを設定します。
画面左のSinkからBigQueryを選択して、 Pipeline Studio にBigQueryノードを配置します。

HTTPノードから矢印を引っ張ってBigQueryノードに接続します。

BigQueryノードのPropertiesをクリックします。

BigQueryノードの設定画面が表示されました。
左側は入力データ構造、右側はBigQueryテーブルの情報です。

入力データ構造

Input Schemaに入力データ構造が表示されています。編集はできません。
接続されたHTTPノードから渡されるデータ構造がそのまま入力データ構造となります。

テーブル設定

識別名、データセット名、テーブル名を入力します。
今回は以下の情報を入力しました。

  • Reference Name
    to-bigquery
  • Dataset
    df_work
  • Table
    httpbin_json

出力データ構造

Output SchemaにはBigQueryテーブルのスキーマ情報を入力します。
デフォルトで入力データ構造と同じ値が設定されていました。
今回はHTTPノードから渡されるデータをそのまま投入するので、変更は加えず入力データ構造と同じ設定のままとしました。

パイプライン確認

Cloud Data Fusionにはプレビュー機能があります。
作成したパイプラインが正しく動作するか確認してみましょう。

画面右上のPreviewをクリックすると、プレビューモードになります。
その状態で続けてRunをクリックします。

パイプラインのプレビュー実行が開始されました。

画面上部にメッセージが表示されました。

プレビュー実行は正常に完了したようです。

パイプライン展開

プレビュー実行でパイプラインに問題無いことが確認できたので、パイプラインをデプロイします。
デプロイするためにはパイプラインを保存する必要があります。

Pipeline Studio 左上のName your pipelineの箇所をクリックします。

任意のパイプライン名を入力し、Saveボタンをクリックします。
今回はhttp-to-bigqueryとしました。

パイプラインが保存できたので、デプロイします。
Pipeline Studio 右上のDeployをクリックします。

パイプラインのデプロイが開始されました。

しばらくすると、画面が切り替わりました。

画面上部のStatusDeployedと表示されています。
パイプラインのデプロイが完了したようです。

パイプライン実行

それではパイプラインを実行してみましょう。

Pipeline Studio 右上のRunをクリックします。
StatusProvisioningに変わりました。
時間経過とともにStatusが変わっていきます。

Status: Provisioning

Status: Running

Status: Succeeded

パイプラインの実行が正常に完了したようです。

Dataproc クラスタ

パイプライン実行の裏では、Datapoc クラスタが以下の変遷をたどっていました。

Status: Provisioning
→プロビジョニング中

Status: Running
→実行中

Status: Succeeded
→クラスタなし

データ確認

最後に、BigQueryテーブルにデータが投入されたか確認してみましょう。

BigQuerydf_workデータセットが作成され、その下にhttpbin_jsonテーブルが作成されています。

プレビューでデータを確認すると、データが投入されていました!
httpbin のレスポンスボディの内容がちゃんと入っています。

スキーマも見てみましょう。
BigQueryノードのプロパティで設定したデータ構造でテーブルが作成されています!

まとめ

以上、Webサービスから取得したデータをBigQueryテーブルに投入するパイプラインを Cloud Data Fusion で構築してみました。

取り敢えずパイプランを構築することを目的としていたので、細かい設定は気にせず構築を進めてみましたが、期待動作するパイプラインが出来上がりました。
ノードを配置して接続してプロパティの必須項目を入力して、、という作業がすべてGUI上で完結するので、直感的にパイプライン構築ができました。

また、パイプライン実行時に自動でDataproc クラスタをデプロイ、実行、削除してくれるので、インフラ管理も気にする必要が無い点も実感することができました。
パイプラインのデプロイ前にプレビュー実行できるのも有り難い機能だと思いました!

ノード(プラグイン)も豊富なのでETL領域を広くカバーできそうです。

今回は取得したデータをそのまま保存するシンプルなパイプラインだったので変換(Transform)のノードは使いませんでしたが、次回はその辺りも含めて試してみたいと思います。

参考