S3からBigQueryへ連携する方法いろいろ(Omni/Transfer Service)

Google Cloudのサービス/機能を使って、Amazon S3上のデータをBigQueryに連携する。 BigQuery Omni、BigQuery Data Transfer Service、Cloud Storage Transfer Service + BigQuery 外部テーブルをそれぞれ利用。
2022.04.14

データアナリティクス事業本部、池田です。花粉症です。
Google CloudBigQuery Omniを使ってみたかったので、他の連携方法と比較してみました。 切りが無さそうだったので、ソースはS3に限定し、 Google Cloudのサービスや機能でできる(作り込みが要らない)方法として3つを試しました。
また、AWS側は再利用できるように CloudFormation でテンプレートにしました。

先にまとめ

各ガイドや触ってみた結果を基に、 個人的に選定時のポイントになりそうと思ったところをまとめてみました。
※制約や挙動などは執筆時点(2022/04/14)のものです。

BigQuery Omni
  • 定額料金のみ
  • リージョンが限られている
  • インタラクティブ クエリの結果の最大サイズは2MB
  • 標準テーブルを作成できない(→CTASやINSERT…SELECT…などでS3のデータをBigQueryへ持ち出せなかった)  ※Cross-cloud transferがプレビュー中
  • Cloud StorageやBigQueryのストレージにファイルを置かない
  • S3への変更はすぐにクエリ結果に反映された
BigQuery Data Transfer Service
  • 定期的な転送の最小間隔時間は24時間(ただしオンデマンドでの実行は可能?)
  • Cloud Storageにファイルを置かない
Cloud Storage Transfer Service

BigQuery 外部テーブル
  • 最短のスケジュール間隔は1時間(ただしオンデマンドでの実行は可能?)
  • Cloud Storageにファイルを置く


(04/19追記) Cross-cloud transfer (「LOAD DATA INTO」文)についてはGAされたら追記します。


実際の開発では目的とこの辺の条件に、データの性質(upsert要るかなど)やらネットワーク構成やら料金やらを交えて検討する感じでしょーか…

BigQuery Omni

初めは BigQuery Omni を使ってBigQueryからS3に対してクエリを発行してみます。
Omniは定額料金のみなので、 Flex Slots の100スロットを設定して検証しました。

↓公式ガイドの構築手順。
Use BigQuery Omni with AWS
※執筆時点(2022/04/14)では日本語版のガイドは古いと思われる記載が見受けられたので英語での参照をお勧めします。

Google Cloud側準備

BigQuery Connection APIを有効化します。


Cloud Shell からbqコマンドで BigQueryの接続を作成します。

app_name=bqomni-s3-access
aws_account_id=<AWSのアカウントID>
aws_iam_role_id="arn:aws:iam::${aws_account_id}:role/${app_name}-role"

bq mk --connection --connection_type='AWS' \
--iam_role_id=$aws_iam_role_id \
--location=aws-us-east-1 \
${app_name}-conn

執筆時点で選択できるリージョンは aws-us-east-1 だけでした。(というかこういう名前のリージョン(location)が有ることに少し驚きました。)

作成できると以下のような結果が返ってくるので、あとで使用する<Identity>の部分を控えておきます。

Connection <プロジェクト番号>.aws-us-east-1.bqomni-s3-access-conn successfully created
Please add the following identity to your AWS IAM Role 'arn:aws:iam::<AWSのアカウントID>:role/bqomni-s3-access-role'
Identity: '<Identity>'

BigQueryのコンソールからも作成結果やIdentityを確認できます。

※ガイドの手順だと「AWSのIAMロール仮作成→接続作成→AWSのIAMロールにIdentity設定」のような流れですが、 私はめんどくさいので「IAMロール名は決め打ちで接続作成→AWSのIAMロール作成」で作成しています。

AWS側準備

Google Cloudに接続を許可するため、IAMのロールとポリシーを作成します。 フェデレーションで設定できるようです。

↓CloudFormationで。前節のIdentityをパラメータにしてやります。

AWSTemplateFormatVersion: "2010-09-09"
Description: "IAM Role for BigQuery Omni"

Parameters:
  AppName:
    Type: String
    Default: "bqomni-s3-access"
  BucketName:
    Type: String
  BucketPrefix:
    Type: String
  BqIdentity:
    Type: String
    Description: "the result of 'bq mk --connection ...'"

Resources:
  BqS3AccessRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "${AppName}-role"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRoleWithWebIdentity
            Principal:
              Federated: "accounts.google.com"
            Condition:
              StringEquals:
                accounts.google.com:sub: !Ref BqIdentity
      MaxSessionDuration: 43200
      ManagedPolicyArns:
        - !Ref BqS3AccessPolicy

  BqS3AccessPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      ManagedPolicyName: !Sub "${AppName}-policy"
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action:
              - s3:ListBucket
            Resource:
              - !Sub "arn:aws:s3:::${BucketName}"
            Condition:
              StringLike:
                s3:prefix: !Sub "${BucketPrefix}/*"
          - Effect: Allow
            Action:
              - s3:GetObject
              - s3:PutObject  # If you need to write to S3.
            Resource:
              - !Sub "arn:aws:s3:::${BucketName}/${BucketPrefix}/*"

S3は作成済みのものを使用しました。
us-east-1 リージョンでないバケットでは、あとのクエリ実行でエラーになりました。)

BigQueryテーブル作成

BigQueryのコンソールからSQLを発行してデータセットと外部テーブルを作成します。

データセットaws-us-east-1 で作成。

CREATE SCHEMA bqomni_s3_access
OPTIONS(
    location="aws-us-east-1"
);

この時、定額料金を割り当てていないとエラーになりました。(逆に言えば、ここまではコミットを準備しなくても構築できそうです。)
BQ Omni Region aws-us-east-1 currently only supports flat rate slots. To continue running a query, please purchase a BQ Omni commitment and create a reservation.

↓作成した接続を使って外部テーブルを作成。

CREATE EXTERNAL TABLE bqomni_s3_access.sample_tbl
(
    station_number INTEGER, year INTEGER, month INTEGER, day INTEGER,
    fog BOOLEAN, rain BOOLEAN, snow BOOLEAN, hail BOOLEAN, thunder BOOLEAN, tornado BOOLEAN
)
WITH CONNECTION `aws-us-east-1.bqomni-s3-access-conn`
OPTIONS(
    format="JSON",
    compression="GZIP",
    uris=["s3://<S3バケット名>/bq-s3-access/*"]
);

(2~5行目のカラム定義は無くても良いです。自動で検出してくれます。)

使ってみる・感想

↓テーブルを作成すると、BigQueryのコンソールからクエリが発行でき、S3の中身を参照できました

S3に新たにファイルを追加してみると、その結果はすぐにクエリ結果にも反映されました。

ただし、出力結果が多過ぎると以下のエラーになりました。
Response too large to return. Please consider using EXPORT DATA.
ガイド によると2MBまでの制限があるそうです。 それ以上はS3にエクスポートしろってことですかね。

The maximum result size for interactive queries is 2 MB.

また、作成した外部テーブルをBigQuery内に永続化できないかとCTASやINSERTをしようとしてみましたが、 できなそうな感じでした。 S3からデータを持ち出させないための仕組みなのでしょうか…?

You can't create standard tables in BigQuery Omni. BigQuery Omni only supports external tables.


現時点では制約も多いのですが、S3を直接参照できるのは便利だと思いました。 Flex Slotsの準備も含め、それほど時間をかけずに構築できました。

BigQuery Data Transfer Service

次に BigQuery Data Transfer Service で、S3のデータをBigQueryのテーブルに転送してみます。

↓公式ガイドの設定手順。
Overview of Amazon S3 transfers

AWS側準備

アクセスキーでの設定方法しか無いようなので、IAMユーザーを作成します。

↓CloudFormation。

AWSTemplateFormatVersion: "2010-09-09"
Description: "IAM User for BigQuery Data Transfer Service"

Parameters:
  AppName:
    Type: String
    Default: "bqdts-s3-access"

Resources:
  BqS3AccessUser:
    Type: AWS::IAM::User
    Properties:
      UserName: !Sub "${AppName}-user"
      ManagedPolicyArns:
         - "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"

前章のOmniと同じようなポリシーだとエラーになったので、 ガイド に従って AmazonS3ReadOnlyAccess を与えています。

At a minimum, the Amazon S3 source data must have the AWS managed policy AmazonS3ReadOnlyAccess applied to it.

IAMユーザー作成後、コンソールからアクセスキー(アクセスキー ID&シークレットアクセスキー)を発行しておきます。

BigQueryテーブル作成

BigQueryのコンソールからSQLを発行して、データセットと宛先になるテーブルを作成しておきます。

CREATE SCHEMA bqdts_s3_access
OPTIONS(
    location="US"
);

CREATE TABLE bqdts_s3_access.sample_tbl
(
    station_number INTEGER, year INTEGER, month INTEGER, day INTEGER,
    fog BOOLEAN, rain BOOLEAN, snow BOOLEAN, hail BOOLEAN, thunder BOOLEAN, tornado BOOLEAN
);

ちなみに、S3のバケットが ap-northeast-1 、データセットが US という構成でも動作はしました。

Google Cloud側準備

BigQuery Data Transfer APIを有効化します。


Cloud ShellからbqコマンドでBigQueryの転送構成を作成します。 前々節のアクセスキーをここで使います。

app_name=bqdts-s3-access

bq mk --transfer_config --data_source=amazon_s3 \
--display_name=${app_name}-dts \
--target_dataset=bqdts_s3_access \
--params='{
"destination_table_name_template":"sample_tbl",
"data_path":"s3://<S3バケット名>/bq-s3-access/*",
"access_key_id":"<AWSのアクセスキー ID>",
"secret_access_key":"<AWSのシークレットアクセスキー>",
"file_format":"JSON"
}'

↓作成結果。

コマンドで作成するとスケジュールは「24時間ごと」で設定されるようです。 また、 ガイド によると転送の最小間隔も24時間とのことです。

The minimum interval time between recurring transfers is 24 hours. The default interval for a recurring transfer is 24 hours.

作成が成功すると、すぐに転送が始まりました。

使ってみる・感想

↓転送実行の完了後、BigQueryのコンソールからクエリが発行でき、転送されたデータを参照できました

S3にファイルを追加して動きを試してみたのですが、 24時間以内でも手動で転送を実行することで、追加分も連携することができました。
ただし、ファイルアップロードした直後は転送対象にならず、少し待つ必要があるようです。

When you transfer data from Amazon S3, it is possible that some of your data will not be transferred to BigQuery, particularly if the files were added to the bucket very recently. It should take approximately 10 minutes for a file to become available to the BigQuery Data Transfer Service after it is added to the bucket.


今回やってみた中では一番設定が楽だった気がします。たぶん。
S3だけでなく Redshiftでも使える ようなので、便利そうです。

Cloud Storage Transfer Service + BigQuery 外部テーブル

最後は Cloud Storage Transfer Service です。 このサービスでは「S3バケット→Cloud Storageバケット」の転送になるので、 BigQueryの外部テーブル を使って、 Cloud StorageのファイルをBigQueryで参照します。

↓公式ガイドの設定手順はこの辺。
Configuring access to data sources and sinks >Amazon S3 >Federated identity】
CREATE EXTERNAL TABLE statement

Google Cloud側準備1

Storage Transfer APIを有効化します。


S3とはアクセスキーとフェデレーションでの接続方法が選べるようなので、後者で進めます。

googleServiceAccounts.get のAPIを使って転送時に使用されるサービスアカウントの情報を取得します。
Method: googleServiceAccounts.get
このサイトからAPIを実行して…

以下のような情報が手に入るので、あとで使用する<subjectId>の値を控えておきます。

{
  "accountEmail": "project-<プロジェクト番号>@storage-transfer-service.iam.gserviceaccount.com",
  "subjectId": "<subjectId>"
}

また、前述のサービスアカウントがCloud Storageを操作できるように、 Cloud Shellからgsutilコマンドで権限を付与します。 Storage レガシー バケット書き込みストレージ オブジェクト閲覧者 が必要とのことです。

gsutil iam ch serviceAccount:project-<プロジェクト番号>@storage-transfer-service.iam.gserviceaccount.com:legacyBucketWriter \
gs://<Cloud Storageバケット名>

gsutil iam ch serviceAccount:project-<プロジェクト番号>@storage-transfer-service.iam.gserviceaccount.com:objectViewer \
gs://<Cloud Storageバケット名>

Cloud Storageは作成済みのものを使用しました。
ちなみに、S3のバケットが ap-northeast-1 、Cloud Storageのバケットが us-east1 という構成でも動作はしました。

AWS側準備

フェデレーション設定のIAMのロールとポリシーを作成します。

↓CloudFormation。前節でAPIで取得したサービスアカウントのsubjectIdを使います。

AWSTemplateFormatVersion: "2010-09-09"
Description: "IAM Role for Cloud Storage Transfer Service"

Parameters:
  AppName:
    Type: String
    Default: "sts-s3-access"
  BucketName:
    Type: String
  BucketPrefix:
    Type: String
  StsSubjectId:
    Type: String
    Description: "the result of 'https://cloud.google.com/storage-transfer/docs/reference/rest/v1/googleServiceAccounts/get'"

Resources:
  StsS3AccessRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "${AppName}-role"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRoleWithWebIdentity
            Principal:
              Federated: "accounts.google.com"
            Condition:
              StringEquals:
                accounts.google.com:sub: !Ref StsSubjectId
      ManagedPolicyArns:
        - !Ref StsS3AccessPolicy

  StsS3AccessPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      ManagedPolicyName: !Sub "${AppName}-policy"
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action:
              - s3:ListBucket
            Resource:
              - !Sub "arn:aws:s3:::${BucketName}"
            Condition:
              StringLike:
                s3:prefix: !Sub "${BucketPrefix}/*"
          - Effect: Allow
            Action:
              - s3:GetBucketLocation
            Resource:
              - !Sub "arn:aws:s3:::${BucketName}"
          - Effect: Allow
            Action:
              - s3:GetObject
              # - s3:DeleteObject  # If you need to delete from S3.
            Resource:
              - !Sub "arn:aws:s3:::${BucketName}/${BucketPrefix}/*"

ポリシーの内容は この辺のガイド を参考にしています。

Google Cloud側準備2

transferJobs.create のAPIを使って転送のジョブを作成します。
(gcloudコマンドでもジョブ作成はできますが、 執筆時点でフェデレーションのロールの指定の方法がよく分からない(未対応?)ので、APIにしました。)
Method: transferJobs.create
このサイトから,、以下のようなリクエストボディでAPIを実行します。

{
    "name": "transferJobs/s3-access",
    "projectId": "<プロジェクト番号>",
    "transferSpec": {
        "awsS3DataSource": {
            "bucketName": "<S3バケット名>",
            "roleArn": "arn:aws:iam::<AWSのアカウントID>:role/sts-s3-access-role"
        },
        "gcsDataSink": {
            "bucketName": "<Cloud Storageバケット名>",
            "path": "sts/"
        },
        "objectConditions": {
            "maxTimeElapsedSinceLastModification": "604800s",
            "includePrefixes": "bq-s3-access/"
        }
    },
    "schedule": {
        "repeatInterval": "3600s",
        "scheduleStartDate": {
            "day": 1,
            "month": 4,
            "year": 2022
        }
    },
    "status": "ENABLED"
}

↑では7日(604800s)以内に作成/更新されたファイルを1時間(3600s)ごとに転送するような設定にしています。 開始日( scheduleStartDate )は必須と言われたので、適当な過去日付を入れました。

作成の際に、AWS側の設定などが誤っているとエラーになり、ジョブを作成できませんでした。
また、転送の間隔( repeatInterval )は1時間以上にする必要があるそうです。

↓作成結果。

「ENABLED」を設定していたので、作成が成功すると、転送が始まりました。

BigQueryテーブル作成

BigQueryのコンソールからSQLを発行してデータセットと外部テーブルを作成します。

CREATE SCHEMA sts_s3_access
OPTIONS(
    location="us-east1"
);

CREATE EXTERNAL TABLE sts_s3_access.sample_tbl
(
    station_number INTEGER, year INTEGER, month INTEGER, day INTEGER,
    fog BOOLEAN, rain BOOLEAN, snow BOOLEAN, hail BOOLEAN, thunder BOOLEAN, tornado BOOLEAN
)
OPTIONS(
    format="JSON",
    compression="GZIP",
    uris=["gs://<Cloud Storageバケット名>/sts/bq-s3-access/*"]
);

(7~10行目のカラム定義は無くてもOK。)
接続の記述が無いこと以外は先述のOmniの時のテーブル作成文と同じ構文ですね。

データセットはCloud Storageのバケットと同じリージョンである必要がありました。

使ってみる・感想

↓Cloud Storageにファイルがあれば、BigQueryのコンソールから転送されたデータを参照できました。

こちらも、BigQuery Data Transfer Serviceと同じく、 手動で実行すれば、スケジュール間隔よりも短い間隔で転送が可能なようです。 (BigQuery Data Transfer Serviceと異なり、ファイルアップロード直後でも転送対象になっていました。)


今回試した方法の中では一番手間がかかった気がします。たぶん。 その分、一番自由が利くかなといった感触です。転送間隔も短いし。

おわりに

いろいろ試してみました。 ちょっと検証に時間がかかりましたが、CloudFormationやコマンドにしておいたので、 今後楽に構築できるでしょう。たぶん。(ネットワーク周りは…)


今回はAWS側をS3に限定しましたが、他のサービスであれば他にも方法がありそうです。
Amazon AuroraのデータをリアルタイムにGoogle BigQueryに連携してみた / Realtime data linkage from Amazon Aurora to Google BigQuery
へー。

逆にBigQueryからAWS環境に連携だと、Glueのカスタムコネクタとか。
AWS Glue Connector for Google BigQueryを使ってBigQueryからS3にデータを転送する

いろいろですね。

関連情報/参考にさせていただいたページ