Fluentdのコネクタを使ってBigQueryへのストリーミング挿入を試してみる

2021.04.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

BigQueryへのデータ取り込みの方法としてストリーミングがあります。その機能を試してみたいと思いGCPの公式ドキュメントにある「Fluentd と BigQuery を使用したリアルタイムのログ分析 」の内容に沿ってFluentdからのリアルタイムロギングを試してみたのでその内容をまとめます。

BigQueryのストリーミング挿入

BigQueryにデータを取り込む際には主にバルクロードとストリーミングがあります。 バルクロードはデータソースとしてCSV・ParquetなどのファイルをCloud Storageから読み込んでBigQueryに取り込んだり、BigQuery Data Transfer Service で他のサービスからデータを取り込む方法になります。一方ストリーミングはデータが発生するたびにBigQueryのストリーミングAPIを呼び出してBigQueryにデータを取り込む方法になります。

ストリーミングでデータを取り込むことによってほぼリアルタイムでBigQueryでデータを分析することができるようになります。

今回はこのストリーミング取り込みを試してみるために「Fluentd と BigQuery を使用したリアルタイムのログ分析  |  Cloud アーキテクチャ センター」 の内容を実施しますが、ドキュメントと違いGCP外にあるWebサーバーからのログ転送を試してみます。これは公式ドキュメントとはちょっと違うことをやってみたかったのと単に手持ちのサーバーでNginxのログを取れるサーバーが他のサービスにあったためです。

BigQueryのストリーミング挿入を試してみる

すでにNginxが動いているサーバーがある前提で進めるため、行う作業として以下になります。

  • WebサーバーにFluentdのインストールとBigQueryコネクタのインストールを行う
  • BigQueryにNginxのログ用のテーブルを作成する
  • BigQueryコネクタ用のサービスアカウントを作りロールを割り当てる
  • Fluentdの設定ファイルにBigQueryコネクタの設定する

環境

  • centos 7.8.2003
  • nginx 1.18.0

それではFluentd と BigQuery を使用したリアルタイムのログ分析を進めます。

WebサーバーにFluentdとBigQueryコネクタのインストールする

FluentdとFluentd-BigQueryコネクタのインストールは以下のコマンドで実行します。 ただコマンドを実行するだけなので特に詰まる箇所はないと思います。

$ sudo curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh

$ sudo systemctl start td-agent.service
$ sudo systemctl enable td-agent.service

$ sudo /usr/sbin/td-agent-gem install fluent-plugin-BigQuery

BigQueryにNginxのログ用のテーブルを作成する

BigQueryウェブUIでテーブルを作成します。はじめにデータセットを作成し、そのデータセット内にテーブルを作成します。

手順1) プロジェクトを選択しデータセットを作成を押下し、データセットを作成する。

手順2) 作成したデータセットを選択しテーブルを作成を押下し、テーブルを作成する。その際、スキーマでテキストとして編集を選択し、以下のカラム定義を入力する。

[
  {
    "type": "TIMESTAMP",
    "name": "time"
  },
  {
    "type": "STRING",
    "name": "remote"
  },
  {
    "type": "STRING",
    "name": "host"
  },
  {
    "type": "STRING",
    "name": "user"
  },
  {
    "type": "STRING",
    "name": "method"
  },
  {
    "type": "STRING",
    "name": "path"
  },
  {
    "type": "STRING",
    "name": "code"
  },
  {
    "type": "INTEGER",
    "name": "size"
  },
  {
    "type": "STRING",
    "name": "referer"
  },
  {
    "type": "STRING",
    "name": "agent"
  },
  {
    "type": "STRING",
    "name": "http_x_forwarded_for"
  }
]

以上でFluentdのログを保存するテーブルが作成できました。

BigQueryコネクタ用のサービスアカウントを作りロールを割り当てる

次にWebサーバーのBigQueryコネクタで使用するサービスアカウントを作成し、必要なロールを割り当てます。

手順1) GCPのウェブUIからIAMと管理 > サービスアカウントの画面でサービスアカウントを作成を押下しサービスを作成する。

  • サービスアカウント名 : 適当な名前の入力
  • 「サービスアカウントにプロジェクトへのアクセスを許可する」「ユーザーにこのサービスアカウントへのアクセスを許可」は何も入力せずに省略する

手順2) 作成したサービスアカントを選択し、キーのタブでサービスアカウントの鍵を作成する。

  • json形式のファイルがダウンロードされるので保存しておく。

これでサービスアカウントの作成は完了したので次にBigQueryのテーブルへのアクセス権を設定します。

手順3) BigQueryウェブUIへ移動し先程作成したテーブルを選択しテーブルを共有を押下する。

手順4) テーブルの権限ダイアログで、メンバーを追加で先程作成したサービスアカウントを選択し、権限にてBigQueryデータ編集者を選択して追加を押下する。

これでテーブルの権限にBigQueryデータ編集者としてサービスアカウントが追加されました。

Fluentdの設定ファイルにBigQueryコネクタの設定する

最後にWebサーバーのFluentdの設定ファイルにBigQueryコネクタの設定を行います。

手順1) Webサーバーへサーバーアカウントの鍵を送る。

$ scp {project_name}-1c3a5abc2e41.json {Webサーバー}:

手順2) Fluentdの設定ファイルに以下を追記する。

  • 公式のドキュメントと異なり認証方式をサービスアカウントの鍵方式へ変更しています。
$ vi /etc/td-agent/td-agent.conf

<source>
  @type tail
  @id input_tail
  <parse>
    @type nginx
  </parse>
  path /var/log/nginx/www.example.com/access.log # nginxのログを指定
  pos_file /var/log/td-agent/httpd-access.log.pos
  tag nginx.access
</source>
<match nginx.access>
  @type BigQuery_insert

  # Authenticate with BigQuery using the VM's service account.
  auth_method json_key
  json_key /home/centos/{project_name}-1c3a5abc2e41.json # サービスアカウントの鍵を指定
  project {project_name}
  dataset fluentd # BigQueryのデータセット名
  table nginx_access # BigQueryのテーブル名
  fetch_schema true

  <inject>
    # Convert fluentd timestamp into TIMESTAMP string
    time_key time
    time_type string
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </inject>
</match>

手順3) Fluentdを再起動する。

$ sudo systemctl restart td-agent.service

その際にFluentdのログを確認すると設定に問題がなければ以下のようなログが出ますが、

$ sudo tail -f  /var/log/td-agent/td-agent.log
2021-04-20 05:28:34 +0900 [info]: #0 shutting down input plugin type=:debug_agent plugin_id="input_debug_agent"
2021-04-20 05:28:34 +0900 [info]: #0 shutting down output plugin type=:tdlog plugin_id="output_td"
2021-04-20 05:28:34 +0900 [info]: #0 shutting down output plugin type=:BigQuery_insert plugin_id="object:7a8"
2021-04-20 05:28:34 +0900 [info]: #0 shutting down output plugin type=:stdout plugin_id="output_stdout"
2021-04-20 05:28:34 +0900 [info]: Worker 0 finished with status 0

サービスアカウントの権限周りでミスが有ると以下のようなログが出ます。最初、テーブルへの権限不足で出してしまいました。

$ sudo tail -f  /var/log/td-agent/td-agent.log
2021-04-20 04:54:25 +0900 [info]: #0 fluentd worker is now running worker=0
2021-04-20 04:58:39 +0900 [error]: #0 tables.get API project_id="{project_name}" dataset="fluentd" table="nginx_access" code=403 message="accessDenied: Access Denied: Table {project_name}:fluentd.nginx_access: Permission BigQuery.tables.get denied on table {project_name}:fluentd.nginx_access (or it may not exist)."
2021-04-20 04:58:39 +0900 [warn]: #0 emit transaction failed: error_class=RuntimeError error="failed to fetch schema from BigQuery" location="/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-BigQuery-2.2.0/lib/fluent/plugin/out_BigQuery_base.rb:196:in `fetch_schema'" tag="nginx.access"

以上で設定は終わりです。

Webサーバーにアクセスがあった後に BigQueryウェブUIを確認するとログがBigQueryへストリーミング挿入されていることが確認できます。

まとめ

BigQueryへのストリーミングデータの取り込みをFluentdのBigQueryコネクタを使って試してみました。少しの設定でリアルタイムのログをBigQueryへ取り込めました。

今回は単純にWebサーバーからBigQueryへ直接ストリーミングを行いましたが、BigQuery以外のGCPのサービスでFluentdのデータを使いたい場合はPub/Subを経由する方法もありますのでそちらの方も試してみたいと思います。

最後まで読んで頂いてありがとうございました。