
Cloudflare の DNS ログを BigQuery のテキスト検索機能で分析してみる
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
ウィスキー、シガー、パイプをこよなく愛する大栗です。
先日 BigQuery でテキスト検索が効率的になるアップデートがあり、ブログを書いていましたが、ログの検索や分析に向いているということが分かり、何か良い応用例はないかと考えていたら Cloudflare のログを BigQuery へ配信して検索や分析すると良さそうだと思いやってみました。
Google BigQuery search index は2022年4月15日現在において、プレビューのステータスです。このプロダクトまたは機能は、Google Cloud Platform の利用規約の一般提供前のサービス規約の対象となります。一般提供前のプロダクトと機能では、サポートが制限されることがあります。また、一般提供前のプロダクトや機能に変更が加えられると、他の一般提供前バージョンと互換性がない場合があります。詳細については、リリースステージの説明をご覧ください。
Cloudflare Logpush
Cloudflare Logpush が標準で出力できるサービスは以下になっており、実は BigQuery に対応していません。
- Amazon S3
- S3-compatible endpoints
- Datadog
- Google Cloud Storage
- Microsoft Azure Blob Storage
- New Relic
- Splunk
- Sumo Logic
しかし、Cloudflare 社が Apache License 2.0 で作成しているツールlogpush-to-bigqueryを使って、Google Cloud Storage から BigQuery へデータを連携することができます。
cloudflare/cloudflare-gcp/logpush-to-bigquery
構成としては以下のようになっており、Cloudflare Logpush でCloud Storage へ出力して、Cloud Scheduler が毎分 Pub/Sub をトリガーして Cloud Functions を起動して BigQuery へデータを取り込みます。

Cloudflare Logpush 自体については以下のエントリを参照ください。
BigQuery search index
テキストデータを効率的に検索させるインデックスを作成可能になり、SEARCH 関数で高速な検索を実行可能になっています。あいまい検索やタイポ修正はないのでログデータの検索や分析などに向いている機能になっています。
Cloudflare Logpush 自体については以下のエントリを参照ください。
やってみる
今回は DNS ログを対象に、以下の様な流れで設定を行います
- Cloud Storage の作成
- BigQuery でのテーブル作成
- Cloud Functions の作成
- Cloudflare Logpush で DNS ログを設定
- search index の作成と SEARCH 関数での検索
1. Cloud Storage の作成
Cloud Storage のバケットを作成します。
Cloud Storage コンソールでCREATE BUCKETをクリックします。

バケットの名称を入力します。

ここではリージョンをasia-northeast1(Tokyo)を選択しました。ちなみに他のリージョンでも問題ないようです。他の項目はログの保存に都合の良い設定を行ってCREATEをクリックします。

2. BigQuery でのテーブル作成
BigQuery で事前にテーブルを作成します。logpush-to-bigquery はデータセットやテーブルがなければ自動で作成してくれるのですが、ログを効率的に分析するための設定を行うため、事前にテーブルを作成します。
今のプロジェクトの右側にあるメニューからCreate datasetをクリックします。

Dataset ID にcloudflare_logstream、Data location はUSを選択1してCREATE DATASETをクリックします。

右上のCOMPOSE NEW QUERYをクリックしてクエリエディタを開きます。

クエリエディタに以下のクエリを入力してRUNをクリックします。ここではTimestamp列に対して時間単位列パーティショニングをして、パーティションの有効期限を400日にしましたが、必要に応じて変更してください。
CREATE TABLE `cloudflare_logstream.cloudflare_logs` ( ColoCode STRING, EDNSSubnet STRING, EDNSSubnetLength INT64, QueryName STRING, QueryType INT64, ResponseCached BOOL, ResponseCode INT64, SourceIP STRING, Timestamp TIMESTAMP ) PARTITION BY TIMESTAMP_TRUNC(Timestamp, DAY) OPTIONS( partition_expiration_days=400, require_partition_filter=true );

作成した cloudflare_logs テーブルを確認すると以下のようになっています。
 

右上のCOMPOSE NEW QUERYをクリックしてクエリエディタを開きます。

search index を全カラムを対象に作成します。作成できない列は無視されます。
CREATE SEARCH INDEX log_index ON cloudflare_logstream.cloudflare_logs(ALL COLUMNS);

これで BigQuery の事前準備は終わりました。
3. Cloud Functions の作成
ここで logpush-to-bigquery をデプロイします。ここでは Cloud Shell を使います。
README.mdを開き、OPEN IN GOOGLE CLOUD SHELLをクリックします。

対象リポジトリがhttps://github.com/cloudflare/cloudflare-gcpであることを確認して、Trust repoをチェックして、確認をクリックします。

Cloud Shell Editor が起動します。

必要となる Google Cloud サービスの API を有効にします。既に有効化されている API の場合はプロンプトに何も表示されません。無効であった API を有効化した場合はOperation "operations/xxxx-xxx-xxxxx" finished successfully.と言ったメッセージが表示されます。
$ gcloud services enable pubsub.googleapis.com $ gcloud services enable cloudscheduler.googleapis.com $ gcloud services enable cloudfunctions.googleapis.com $ gcloud services enable cloudbuild.googleapis.com
使用するプロジェクトを設定します。
$ gcloud config set project <PROJECT_ID> Updated property [core/project].
2022年4月15日時点では、logpush-to-bigquery で DNS ログが対応していなかったためスキーマファイルを追加します。後で Github に Issue を立てようと思います。
左の EXPLORER でcloudshell_open/cloudflare-gcp/logpush-to-bigqueryフォルダを右クリックしてNew Fileをクリックします。

schema-dns.jsonと入力してOKをクリックします。

schema-dns.jsonに以下の内容を記載して保存します。
[
  {
    "name": "ColoCode",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "EDNSSubnet",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "EDNSSubnetLength",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
  {
    "name": "QueryName",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "QueryType",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
  {
    "name": "ResponseCached",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
  {
    "name": "ResponseCode",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
  {
    "name": "SourceIP",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "Timestamp",
    "type": "TIMESTAMP",
    "mode": "NULLABLE"
  }
]
次にdeploy.shを編集します。以下の内容に従って編集します。18行目はそのままだと実行時エラー発生2したので修正を入れています。gcloud scheduler jobs create pubsubのオプション追加は、後で Issue を作成するかもしれません。
| 行番号 | 項目 | 内容 | 備考 | 
|---|---|---|---|
| 3 | SCHEMA | schema-dns.json | |
| 6 | DIRECTORY | 任意のパス | Cloudflare でログを出力するパス | 
| 7 | BUCKET_NAME | 先程作成した GCS バケット名 | |
| 8 | DATASET | 必要に応じて変更 | BigQuery で作成されるデータセット名 | 
| 9 | TABLE | 必要に応じて変更 | BigQuery で作成されるテーブル名 | 
| 10 | REGION | ここでは asia-northeast1 を設定 | Cloud Scheduler, Cloud Functions を起動できるリージョン | 
| 12 | FN_NAME | Cloud Functions の関数名が衝突する場合に変更 | |
| 13 | TOPIC_NAME | Cloud Pub/Sub のトピック名が衝突する場合に変更 | |
| 18 | gcloud scheduler jobs create pubsub | 末尾に" --location=$REGION"を追加 | 
#!/bin/sh SCHEMA="schema-dns.json" # The name of the subdirectory in your bucket used for Cloudflare Logpush logs, # for example, "logs/". If there is no subdirectory, use "" DIRECTORY="<LOG_PATH>/" BUCKET_NAME="<BUCKET_NAME>" DATASET="cloudflare_logstream" TABLE="cloudflare_logs" REGION="asia-northeast1" # You probably don't need to change these values: FN_NAME="cf-logs-to-bigquery" TOPIC_NAME="every_minute" # Create pubsub topic gcloud pubsub topics create $TOPIC_NAME # Create cron job gcloud scheduler jobs create pubsub cf_logs_cron --schedule="* * * * *" --topic=$TOPIC_NAME --message-body="60 seconds passed" --location=$REGION # Deploy function gcloud functions deploy $FN_NAME \ --runtime nodejs12 \ --trigger-topic $TOPIC_NAME \ --region=$REGION \ --memory=1024MB \ --entry-point=runLoadJob \ --set-env-vars DATASET=$DATASET,TABLE=$TABLE,SCHEMA=$SCHEMA,BUCKET_NAME=$BUCKET_NAME,DIRECTORY=$DIRECTORY
deploy.shを実行して環境をデプロイします。
$ sh ./deploy.sh Created topic [projects/PROJECT_ID/topics/every_minute]. name: projects/PROJECT_ID/locations/asia-northeast1/jobs/cf_logs_cron pubsubTarget: data: NjAgc2Vjb25kcyBwYXNzZWQ= topicName: projects/PROJECT_ID/topics/every_minute ・ ・ ・
暫く待つと Cloud Pub/Sub, Cloud Scheduler, Cloud Functions の環境がデプロイされます。
 
 

4. Cloudflare Logpush の設定
Cloudflare 側のログ出力の設定を行います。
基本的には以前の Cloudflare Logpush のエントリと同様で、ログを HTTP requests から DNS logs に変更するだけです。
各 Web サイトのページで、左のメニューから [Analytics]-[Logs] を開きます。

今回は DNS のログを対象にするため、DNS logsを選択してNextをクリックします。

今回は全フィールドを出力させるのでAll fieldsにチェックを入れてNextをクリックします。

ログの配信先を選択します。今回はGoogle Cloud Storageを選択して、Nextをクリックします。

配信先の情報を記載して、Google Cloud Storage bucket permissionsのリンクをクリックします。
- Bucket path: // 形式でdeploy.shに設定したログ出力先を入力
- Daily subfolders: Yes, automatically organize logs in daily subfoldersクエリを楽にするためにログを毎日のサブフォルダに自動的に整理します

Cloud Storage コンソールで作成した GCS バケットを表示して、PERMISSIONSタブのADDをクリックします。

New principals にlogpush@cloudflare-data.iam.gserviceaccount.comを入力して、Select a role でStorage Object Creatorを選択してSaveをクリックします。

Cloudflare のコンソールに戻りValidate accessをクリックします。

所有権確認のファイルのリンクをクリックして、Cloud Storage のコンソールへ遷移します。

ひとつ上のフォルダに移動します。

所有権確認用ファイルをクリックします。

ファイルの内容をコピーして、Cloudflare 側のコンソールへ戻します。

Ownership token の欄にファイルの内容をペーストして、Pushをクリックします。

DNS logs を Google Cloud Storage へ出力する設定を行いました。これ以降の DNS log は Cloud Storage と BigQuery に配信されます。

5. search index の作成と SEARCH 関数での検索
しばらくすると実際にデータが溜まってくるので、検索してみます。
列に127.0.0.1が含まれるレコードを検索してみます。ログテーブルは時間単位列パーティション分割テーブルなので、パーティション対象のTimestamp列を検索条件に入れます。
SELECT * FROM cloudflare_logstream.cloudflare_logs WHERE SEARCH(cloudflare_logs, '127.0.0.1') AND Timestamp > '2022-01-01';

次に問い合わせが来たドメインで検索してみます。今度は検索対象をテーブル全体ではなく、QueryName列だけで検索してみます。(なおbgptools-wildcard-confirmedという存在しないサブドメインを問い合わせたのはbgp.toolsという、IP アドレスがどの AS に属するのかを検索できるサービスの模様です)
SELECT * FROM cloudflare_logstream.cloudflare_logs
WHERE
     SEARCH(QueryName,'bgptools\\-wildcard\\-confirmed')
 and Timestamp > '2022-01-01'
ORDER BY Timestamp;

さいごに
BigQuery はストレージ料金が Cloud Storage の Standard Storage(アクティブストレージの場合)と Nearline Storage(長期保存の場合)と同程度なので、ログを貯めるストレージとしても有用でした。その上でテキスト検索機能が出てきたので、大量のログを分析するユースケースで活用するべきだと思いました。是非皆さんもお試し頂ければと思います。
- 2022年4月15日現在のプレビュー段階では、seach index や SEARCH 関数が US と EU のみで有効であるため。 ↩
- 
以前は Cloud Scheduler の起動リージョンが App Engine のリージョンに限られていましたが、自由にリージョンを選択可能になったためリージョン指定が必要になったと思われます。
 Cloud Scheduler: GCP の 23 リージョンで利用可能に ↩












