Cloud Data FusionのチュートリアルでBigQueryにデータを作ってみる

2020.12.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

クラスメソッド BigQuery Advent Calendar 2020 の8日目のエントリになります。

データパイプラインGCPのフルマネージドサービスであるCloud Data Fusionは気になっていたサービスなのですが、なかな時間を取って調査することができずにいました。今回その機会ができたので公式ドキュメントに記載されているチュートリアルを実践して概要を掴みます。

Cloud Data Fusionとは

Cloud Data Fusionはフルマネージドサービスで迅速にデータパイプラインを構築・管理できるデータ統合サービスです。ノンコーディングでETL、ELTデータパイプラインを構築・管理できるので、データ アナリストやビジネスユーザーもデータを管理できるようになります。

特徴として主なところを挙げると以下になります。

  • CDAP というOSSベースのマネージドサービス
  • GUI上の直感的な操作で作成
  • GCP上のデータ、GCP以外の様々なデータソースに対応
  • バッチとストリーミング処理が可能
  • DataprocやGCP以外のHadoopの環境でも実行可能
  • データリネージュ、統合メタデータなどのデータ管理機能

Cloud Data Fusionの主な機能

パイプライン

GUIで、ノード(コンポーネント)を接続してデータパイプライン(DAG)を作成できます。

ノードの種類としては、

  • Sources
  • Transforms
  • Analytics
  • Actions
  • Sinks
  • Error Handling

があり、これらを組み合わせてデータパイプラインを作成します。

Wrangler UI

実際のデータを確認しながらインタラクティブに変換を作成できます。Wrangler UIで作成した変換はTransformとしてパイプラインに組み込めます。

メタデータ

データセットのメタデータを管理し、簡単にデータセットの検索を行えます。

リネージュ

データセットレベルと項目レベルでデータどのパイプラインで使われているかを可視化してトラッキングが行えます。

HUB

再利用可能なコンポーネント(各種データベースのドライバ、AWS、Azureなどのクラウドサービスへのコネクタ)やサンプルパイプラインなどが利用できます。

まだまだ説明し足りないですが、ここでチュートリアルを実践してみます。

パイプラインの作成

実践するチュートリアルは ターゲティング キャンペーン パイプライン | Google Cloud を行ってみます。

これは、CSVファイルとBigQueryにあるデータを統合してデータを作成し、そのデータをBigQueryへ新たなテーブルとして書き込むパイプラインになります。

最終的に作成するパイプラインは以下になります。

事前準備

Cloud Data Fusionでパイプラインを作成して実行するためには、Cloud Data FusionとCloud DataprocのAPIが有効にする必要があります。また今回はCloud Strage、BigQueryも使うのでそれらのAPIも有効にする必要があります。

次にCloud Data Fusionインスタンスを作成する必要があります。こちらは公式ドキュメンで詳しく説明されていますのでそちらを実施します。 Cloud Data Fusion インスタンスの作成 | Google Cloud

権限設定

当初、上記の事前準備だけでデータパイプラインを実行できると考えパイプラインの構築を進めたのですが、パイプライン作成して実行するとすぐにエラーになってしまい、詳細ログを確認すると下記のエラーが発生していました。

PROVISION task failed in REQUESTING_CREATE state for program run program_run:default.CampaignPipeline_v1.-SNAPSHOT.workflow.DataPipelineWorkflow.xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx due to Dataproc operation failure: INVALID_ARGUMENT: User not authorized to act as service account 'xxxxxxxxxxxx-compute@developer.gserviceaccount.com'. To act as a service account, user must have one of [Owner, Editor, Service Account Actor] roles. See https://cloud.google.com/iam/docs/understanding-service-accounts for additional details..

パイプラインの構築途中でPreviewで実行確認ができるのですが、この時点ではエラーがでていなかったため気づくのに時間がかかってしまいました。これは権限設定が足りないため発生するエラーできちんとドキュメントを読めば記載があったのですが、当初読み飛ばしていました。

Cloud Data Fusion サービス アカウント | Cloud Data Fusion のドキュメント

従って、事前準備に加え権限設定を予め行います。設定方法は サービス アカウントの権限借用の管理 | Google Cloud に沿って進めます。

手順1)IAMと管理コンソールに移動し、IAMを選択する。

手順2)画面右のGoogle提供のロール付与を含みますをチェックし、service-{プロジェクト番号}@gcp-sa- datafusion.iam.gserviceaccount.comのメンバーを選択する。

手順3)ロールの編集画面でサービスアカウントユーザー(roles/iam.serviceAccountUser)のロールを付与する。

service-{プロジェクト番号}@gcp-sa- datafusion.iam.gserviceaccount.comのメンバーのロールにサービスアカウントユーザーが追加されていればパイプラインの実行が行えます。

ここまでで準備が整ったのでパイプラインを作成していきます。

Wrangler UIによるデータ変換

手順1)初めに上記で作成したCloud Data Fusionインスタンスでインスタンスを表示を押下し、Cloud Data Fusionのホーム画面へ進む。

手順2)Wrangleを選択してWranglerへ進む。

手順3)左のメニューからGoogle Cloud Storage > Sample Bucketsを選択し、campaign-tutorial配下のcustomer.csvを選択する。

手順4)ファイルの中身がbody列に表示されるので、body列のプルダウンをクリックしParse > CSVを選択し、Commaで分割する。

手順5)左端のbody列は必要ないので、body列のプルダウンをクリックしDelete columnを選択し、削除する。

手順6)カラム名をわかりやすいものに変えるため、右ペインの全選択を押下し全カラムを選択状態にしたあと、Column names > Set allを選択するとモーダルが表示されるのでName,StreetAddress,City,State,Countryを入力しApplyを押下する。

ここでWranglerの目玉機能の一つであるInsightsでデータの中身を確認してみます。画面中央のInsightsを選択すると下図の様な画面が開き、1000個分のサブセットデータが表示されます。これにより変換やフィルタ条件を見積もることができ大変便利です。

手順7)City列の値を使ってフィルタリングを行うため、City列のプルダウンをクリックしFilter > Keep rows > value matches regexを選択し、テキストボックスに^(California|Oregon|Washington)$を入力しApplyを押下する。

*これでCity列がCalifornia、Oregon、Washingtonだけのデータになります

手順8)StyreetAddress列の値を使ってフィルタリングを行うため、StyreetAddress列のプルダウンをクリックしFilter > Keep rows > value containsを選択し、テキストボックスにAvenueを入力しApplyを押下する。

StyreetAddress列にAvenueが含まれているデータになります。

これでWranglerを使ってのデータ変換は完了です。これをパイプラインのソースとして組み込むので右上のを押下します。

Pipeline Studioによるパイプライン作成

下図がWranglerから遷移したPipeline Studioの画面になります。これにノードを付け足して行きパイプラインを完成させます。

手順1)GCSFileノードの上にマウスカーソルを合わせるとPropertiesが表示されるので押下し、Wranglerでの設定を確認する。

DirectivesのRecipeにWranglerで行った操作が表示されます。またWrangleボタンを押下するとWrangler経戻り再度変換設定を行えます。

次にCSVファイルのデータと結合するBigQueryのstate_abbreviationsデータをソースとして設定しますが、その前にBigQueryでデータの中身を確認します。 BigQueryに移動しクエリエディタで次のクエリを実行します。

SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`

すると、上図のようなデータが表示されます。これをCSVのデータと結合します。

手順2)左のメニューのSource > BigQueryを選択し、BigQueryノードを作成しPropertiesを押下しBigQueryをデータソスとする設定を行う。

  • Reference Name : state_abbreviationsを入力
  • Dataset Project ID Name : dis-user-guideを入力
  • Dataset : campaign_tutorialを入力
  • Table : state_abbreviationsを入力

手順3)左のメニューのAnalytics > Joinerを選択し、WranglerノードとBigQueryノードを接続する。

手順4)JoinerノードのPropertiesを押下し、Joinerの設定を行う。

*行っていることとしてはWranglerから出力されたレコードとBigQueryから出力されたレコードをStateとnameで外部結合しています。

  • Fields
    • Wrangler
      • Name : チェックする
      • StreetAddress : チェックする
      • City : チェックする
      • State : チェックしない
    • BigQuery
      • name : チェックしない
      • abbreviation : チェックする
  • Join Type : Outerを選択
    • Requiree Inputs : Wranglerをチェック
  • Join Condition
    • Wrangler : State
    • BigQuery : name

手順5)左のメニューのSink > BigQueryを選択し、Joinerノードを接続する。

手順6)BigQueryノードのPropertiesを押下し、SinkのBigQuery設定を行う。

  • Reference Name : customer_data_abbreviated_statesを入力
  • Dataset : dis_user_guideを入力
  • Table : customer_data_abbreviated_statesを入力

パイプラインの確認

Cloud Data FusionにはPreview機能があり、作成しているパイプラインをテストして実行に問題がなく想定通りかを確認することができます。ここまで作成したパイプラインでもこの機能を使って実行結果を確認してみます。

手順1)Pipeline Studioの画面上部のPreviewを押下する。

手順2)Preview画面のRunを押下してPreviewを実行する。

実行中は下図のようになり実行時間が表示されます。

Previewが完了すると下図のような画面になり、ログと各ノードでのデータが確認できます。ここでパイプラインが想定通りなのかを確認したり、エラー終了した際にはログを確認してパイプラインの修正を行えます。

  • Previewの実行ログ

  • Joinerノードのデータ

Previewで問題がなければPipeline Studioの画面上部のDeployを押下してパイプラインをデプロイします。

パイプラインの実行

デプロイが終わるとパイプラインを実行することができるようになリます。

手順1)画面上部のRunを押下してパイプラインを実行する。

パイプラインの実行に問題がなければStatusがProvisioning、Running、Successと遷移します。

Successが表示されデータパイプラインの実行が終わると下図のような画面になります。各ノードで処理されたレコード数とエラー数が表示され、これをクリックすると詳細な情報を見ることができます。

実行結果をBigQueryで確認すると変換されたデータが登録されていることがわかります。

パイプラインのエクスポート

作成したパイプラインはJson形式でエクスポートが行えます。これを利用すれば他のプロジェクトで同じパイプラインをインポートしたり、Gitなどでバージョン管理をすることができます。

今回作成したパイプラインを出力してみると以下のような形になります。

{
    "name": "CampaignPipeline",
    "description": "Data Pipeline Application",
    "artifact": {
        "name": "cdap-data-pipeline",
        "version": "6.2.3",
        "scope": "SYSTEM"
    },
    "config": {
        "resources": {
            "memoryMB": 2048,
            "virtualCores": 1
        },
        "driverResources": {
            "memoryMB": 2048,
            "virtualCores": 1
        },
        "connections": [
            {
                "from": "GCSFile",
                "to": "Wrangler"
            },
            {
                "from": "Wrangler",
                "to": "Joiner"
            },
            {
                "from": "Joiner",
                "to": "BigQuery1"
            },
            {
                "from": "BigQuery",
                "to": "Joiner"
            }
        ],
        "comments": [],
        "postActions": [],
        "properties": {},
        "processTimingEnabled": true,
        "stageLoggingEnabled": true,
        "stages": [
            {
                "name": "GCSFile",
                "plugin": {
                    "name": "GCSFile",
                    "type": "batchsource",
                    "label": "GCSFile",
                    "artifact": {
                        "name": "google-cloud",
                        "version": "0.15.3",
                        "scope": "SYSTEM"
                    },
                    "properties": {
                        "filenameOnly": "false",
                        "copyHeader": "false",
                        "schema": "{\"type\":\"record\",\"name\":\"text\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
                        "path": "gs://campaign-tutorial/customers.csv",
                        "format": "text",
                        "project": "xxxxxxxxx",
                        "recursive": "false",
                        "referenceName": "campaign-tutorial.customers.csv"
                    }
                },
                "outputSchema": "{\"type\":\"record\",\"name\":\"text\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
                "id": "GCSFile",
                "type": "batchsource",
                "label": "GCSFile",
                "icon": "fa-plug"
            },
            
        ....
        
        "schedule": "0 * * * *",
        "engine": "spark",
        "numOfRecordsPreview": 100,
        "description": "Data Pipeline Application",
        "maxConcurrentRuns": 1
    }
}

メタデータとリネージュ

メータデータとリネージュも確認してみます。

手順1)Cloud Data Fusionのホーム画面よりMetadataを選択する。

手順2)テキストボックスに検索したい項目名を入力し検索ボタンを押下する。

*ここでの検索はスキーマ名、フィールド名、タグ等様々なメタデータで検索が行なえます。

検索結果が表示されるので、確認したいデータセットを選択し詳細を確認します。

またLineageを選択するとそのデータセットがどのパイプラインでいつ使われ、どこへデータが流れていったかを確認することができます。

まとめ

Cloud Data Fusionを使ってデータパイプラインを構築し実行してみました。Cloud Data Fusionはノンコーディングで視覚的にデータパイプラインを作成できるので操作さえなれてしまえば複雑な処理も簡単に組むことができます。またメタデータとリネージュも確認しました。

今回はGCP内のデータソスでしたが、次回はHUB機能を使い他のクラウドサービスをデータソースとするパイプラインを作成してみたいと思います。

最後まで読んで頂いてありがとうございました。

クラスメソッド BigQuery Advent Calendar 2020 9日目は、しんや さんです。お楽しみに。