【AWS Glue】AWS GlueのBigQueryコネクタを使って直接BigQueryへデータを書き込んでみよう

【AWS Glue】AWS GlueのBigQueryコネクタを使って直接BigQueryへデータを書き込んでみよう

2025.08.04

データ事業本部の川中子(かわなご)です。

先日ブログ記事をAIにレビューしてもらった際に以下のフィードバックを貰って、
日本全国に「川中子」を広められるようにこの命を燃やそうと誓いました。

「川中子」の読み方が「かわなご」となっていますが、一般的には「かわなかご」と読むのが自然です。

さて前回は、Cloud Run functionsを利用してS3 Tablesのデータの連携を検証しました。

https://dev.classmethod.jp/articles/bigquery-jwt-s3-tables-to-bigquery/

ただこの実装の場合、取得したデータを一時的にジョブ内で保持するため、
Cloud Run functionsの関数作成時のメモリサイズの調整が必要になってしまいます。
またコストがAWSとGoogleの双方で発生する点も、管理面のデメリットとなります。

そこで今回はよりシンプルな代替案として、GlueのBigQueryコネクタを使用して、
S3 TablesからBigQueryへデータを直接連携する方法を試してみました。

前提

GlueのBigQueryコネクタについて

Glue 4.0以降では、BigQueryへの読み書きがサポートされています。

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect-bigquery-home.html

AWSのドキュメントには以下のような記載があります。

If set to direct, your connector will write using the BigQuery Storage Write API.

BigQuery Storage Write APIについてGoogle Cloudのドキュメントによると、
ストリーミングなどの高速データ書き込みに特化したAPIのようです。

https://cloud.google.com/bigquery/docs/write-api

つまりデータは書き込み(INSERT)のみに対応していて、重複データの削除や、
UPDATEやDELETEなどのデータ操作には対応していないということでした。

この制限を踏まえ、今回はINSERTでのデータ連携についてのみ検証を行うことにしました。

コネクタの作成方法について

GlueのBigQueryコネクタを使用する際の事前準備は以下のようになります。

  • Google Cloud側で連携に使用するサービスアカウントを作成する
  • 作成したサービスアカウントの認証情報を取得
  • 認証情報をAWS側のSecrets Managerに登録
  • Secrets Managerを参照してGlueのBigQueryコネクタを作成

上記の手順については、同コネクタを利用したデータ取得を検証している
以下のブログで分かりやすく解説されていたので、こちらを参照してください。

https://dev.classmethod.jp/articles/aws-glue-connector-for-google-bigquery-bigquery-to-s3/

検証準備

ソーステーブルのデータ確認

本検証においてソースとなるS3 Tables上のテーブルのデータを確認しておきます。
今回は検証用にdaily_salesという名前でテーブルを作成しています。

-- データの内容確認
SELECT * FROM "s3tablescatalog"."cm_kawanago_tablebucket_namespace"."daily_sales"
ORDER BY sale_date

ソーステーブルには9レコードのデータが存在することが確認できます。

+------------+------------------+--------------+
| sale_date  | product_category | sales_amount |
+------------+------------------+--------------+
| 2024-01-15 | Laptop           |        900.0 |
| 2024-01-15 | Monitor          |        250.0 |
| 2024-01-16 | Laptop           |       1350.0 |
| 2024-02-01 | Monitor          |        300.0 |
| 2024-02-01 | Keyboard         |         60.0 |
| 2024-02-02 | Mouse            |         25.0 |
| 2024-02-02 | Laptop           |       1050.0 |
| 2024-02-03 | Laptop           |       1200.0 |
| 2024-02-03 | Monitor          |        375.0 |
+------------+------------------+--------------+

BigQueryテーブルの作成

ソーステーブルのスキーマに合わせて、
本検証のターゲットとなるBigQuery上のテーブルを作成します。

CREATE TABLE `<project-id>.cm_kawanago.daily_sales_from_aws` (
  sale_date DATE,
  product_category STRING,
  sales_amount FLOAT64
)
PARTITION BY sale_date
OPTIONS(
  description="AWS S3 Tables からのデータ連携用テーブル"
);

Glueジョブの作成

次にBigQueryへデータを転送するGlueジョブを作成します。
スクリプトの大まかな流れは以下のようになっています。

  • S3 Tablesのテーブルからspark.sqlでデータを取得
  • 取得したデータをDynamicFrameに変換
  • write_dynamic_frame.from_optionsでデータを書き込み
Glueスクリプト
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.conf import SparkConf

# ジョブパラメータ
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'bigquery_connection_name',
    'source_database',
    'source_table',
    'target_dataset',
    'target_table',
    'gcp_project_id'
])

# Spark設定
conf = SparkConf()
conf.set('spark.sql.catalog.s3tablesbucket', 'org.apache.iceberg.spark.SparkCatalog')
conf.set('spark.sql.catalog.s3tablesbucket.catalog-impl', 'software.amazon.s3tables.iceberg.S3TablesCatalog')
conf.set('spark.sql.catalog.s3tablesbucket.warehouse', 'arn:aws:s3tables:<region>:<account-id>:bucket/<bucket-name>')
conf.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')

# SparkとGlueコンテキストの初期化
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("=== Starting S3 Tables to BigQuery ETL Job ===")

try:
    # S3TablesCatalogを使用してS3 Tablesからデータを読み取り
    source_table_name = f"s3tablesbucket.{args['source_database']}.{args['source_table']}"
    print(f"Reading from S3 Tables: {source_table_name}")

    # SQLを使用してデータを読み取り
    df_source = spark.sql(f"SELECT * FROM {source_table_name}")

    # スキーマとレコード数を表示
    print("Source data schema:")
    df_source.printSchema()

    record_count = df_source.count()
    print(f"Number of records to transfer: {record_count}")

    # Spark DataFrameをGlue用のDynamicFrameに変換
    dynamic_frame = DynamicFrame.fromDF(df_source, glueContext, "dynamic_frame")

    # BigQueryに書き込み
    print("Writing data to BigQuery...")

    bigquery_write = glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="bigquery",
        connection_options={
            "connectionName": args['bigquery_connection_name'],
            "parentProject": args['gcp_project_id'],
            "table": f"{args['target_dataset']}.{args['target_table']}",
            "writeMethod": "direct"
        },
        transformation_ctx="bigquery_write"
    )

    print(f"Successfully transferred {record_count} records to BigQuery")

except Exception as e:
    print(f"Error occurred during ETL process: {str(e)}")
    raise e
finally:
    job.commit()

上記のスクリプトをS3にアップロードし、Glueジョブを作成していきます。

# スクリプトをS3にアップロード
aws s3 cp glue_s3tables_to_bigquery.py s3://<your-bucket>/scripts/

# Glueジョブを作成
aws glue create-job --cli-input-json file://glue_job_definition.json

glue_job_definition.jsonの設定は以下の通りです。

glue_job_definition.json
{
    "Name": "s3tables-to-bigquery-daily-sales",
    "Description": "S3 Tables から BigQuery へのデータ連携ジョブ",
    "Role": "arn:aws:iam::<account-id>:role/<your-glue-role>",
    "ExecutionProperty": {
        "MaxConcurrentRuns": 1
    },
    "Command": {
        "Name": "glueetl",
        "ScriptLocation": "s3://<your-bucket>/scripts/glue_s3tables_to_bigquery.py",
        "PythonVersion": "3"
    },
    "DefaultArguments": {
        "--enable-metrics": "true",
        "--enable-spark-ui": "true",
        "--spark-event-logs-path": "s3://<your-bucket>/spark-logs/",
        "--enable-job-insights": "true",
        "--enable-continuous-cloudwatch-log": "true",
        "--job-bookmark-option": "job-bookmark-disable",
        "--job-language": "python",
        "--datalake-formats": "iceberg",
        "--additional-python-modules": "pyarrow==14.0.1",
        "--extra-jars": "s3://<your-bucket>/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar",
        "--bigquery_connection_name": "Bigquery_connection",
        "--source_database": "cm_kawanago_tablebucket_namespace",
        "--source_table": "daily_sales",
        "--target_dataset": "cm_kawanago",
        "--target_table": "daily_sales_from_aws",
        "--gcp_project_id": "<your-gcp-project-id>"
    },
    "MaxRetries": 0,
    "Timeout": 60,
    "GlueVersion": "5.0",
    "MaxCapacity": 2.0,
    "Connections": {
        "Connections": ["Bigquery_connection"]
    }
}

検証結果

作成したGlueジョブを実行すると、正常に処理が完了しました。
出力されたログは以下のようになっています。

ログ出力
"=== Starting S3 Tables to BigQuery ETL Job ===
Source: cm_kawanago_tablebucket_namespace.daily_sales
Target: <your-gcp-project-id>.cm_kawanago.daily_sales_from_aws
Reading from S3 Tables: s3tablesbucket.cm_kawanago_tablebucket_namespace.daily_sales"

"Source data schema:"
"root
 |-- sale_date: date (nullable = true)
 |-- product_category: string (nullable = true)
 |-- sales_amount: double (nullable = true)"

"Number of records to transfer: 9
Sample data (first 10 rows):"

"
+----------+----------------+------------+
|sale_date |product_category|sales_amount|
+----------+----------------+------------+
|2024-01-15|Laptop          |900.0       |
|2024-01-15|Monitor         |250.0       |
|2024-01-16|Laptop          |1350.0      |
|2024-02-01|Monitor         |300.0       |
|2024-02-01|Keyboard        |60.0        |
|2024-02-02|Mouse           |25.0        |
|2024-02-02|Laptop          |1050.0      |
|2024-02-03|Laptop          |1200.0      |
|2024-02-03|Monitor         |375.0       |
+----------+----------------+------------+
"

"Writing data to BigQuery..."
"Successfully transferred 9 records to BigQuery
=== ETL Job Completed Successfully ==="

"Running autoDebugger shutdown hook."

ターゲットであるBigQueryのテーブルを確認してみます。

-- データの確認
SELECT * FROM `<project-id>.cm_kawanago.daily_sales_from_aws` ORDER BY sale_date

ソースデータと同じデータが、そのままINSERTされたことが分かります。

+------------+------------------+--------------+
| sale_date  | product_category | sales_amount |
+------------+------------------+--------------+
| 2024-01-15 | Laptop           |        900.0 |
| 2024-01-15 | Monitor          |        250.0 |
| 2024-01-16 | Laptop           |       1350.0 |
| 2024-02-01 | Monitor          |        300.0 |
| 2024-02-01 | Keyboard         |         60.0 |
| 2024-02-02 | Mouse            |         25.0 |
| 2024-02-02 | Laptop           |       1050.0 |
| 2024-02-03 | Laptop           |       1200.0 |
| 2024-02-03 | Monitor          |        375.0 |
+------------+------------------+--------------+

なお上記のINSERT処理の確認後、同じジョブを再度実行したところ、
想定通り各レコードが重複して計18行のデータとなりました。

さいごに

専用のコネクタを使用することで、GlueによってS3 Tablesから
BigQueryへのデータ連携が可能であることが確認できました。

しかしコネクタ内部で利用されているBigQuery Storage Write APIの制限により、
UPDATEDELETEなどの操作ができないことが課題として挙げられます。

もしUPSERTのような実装が必要な場合には、以下のような対策が考えられます。

  • 一度BigQuery側のデータを取得して、Glueジョブ内で比較した後に新規データのみ連携する
  • BigQueryの外部テーブルを中間テーブルとして利用し、もう一本のETLをBigQueryで実装する

この記事が少しでも参考になれば幸いです。
最後まで記事を閲覧いただき、ありがとうございました。

この記事をシェアする

facebookのロゴhatenaのロゴtwitterのロゴ

© Classmethod, Inc. All rights reserved.