【AWS Glue】AWS GlueのBigQueryコネクタを使って直接BigQueryへデータを書き込んでみよう
データ事業本部の川中子(かわなご)です。
先日ブログ記事をAIにレビューしてもらった際に以下のフィードバックを貰って、
日本全国に「川中子」を広められるようにこの命を燃やそうと誓いました。
「川中子」の読み方が「かわなご」となっていますが、一般的には「かわなかご」と読むのが自然です。
さて前回は、Cloud Run functions
を利用してS3 Tablesのデータの連携を検証しました。
ただこの実装の場合、取得したデータを一時的にジョブ内で保持するため、
Cloud Run functions
の関数作成時のメモリサイズの調整が必要になってしまいます。
またコストがAWSとGoogleの双方で発生する点も、管理面のデメリットとなります。
そこで今回はよりシンプルな代替案として、GlueのBigQueryコネクタを使用して、
S3 TablesからBigQueryへデータを直接連携する方法を試してみました。
前提
GlueのBigQueryコネクタについて
Glue 4.0
以降では、BigQueryへの読み書きがサポートされています。
AWSのドキュメントには以下のような記載があります。
If set to direct, your connector will write using the BigQuery Storage Write API.
BigQuery Storage Write API
についてGoogle Cloudのドキュメントによると、
ストリーミングなどの高速データ書き込みに特化したAPIのようです。
つまりデータは書き込み(INSERT)のみに対応していて、重複データの削除や、
UPDATEやDELETEなどのデータ操作には対応していないということでした。
この制限を踏まえ、今回はINSERTでのデータ連携についてのみ検証を行うことにしました。
コネクタの作成方法について
GlueのBigQueryコネクタを使用する際の事前準備は以下のようになります。
- Google Cloud側で連携に使用するサービスアカウントを作成する
- 作成したサービスアカウントの認証情報を取得
- 認証情報をAWS側のSecrets Managerに登録
- Secrets Managerを参照してGlueのBigQueryコネクタを作成
上記の手順については、同コネクタを利用したデータ取得を検証している
以下のブログで分かりやすく解説されていたので、こちらを参照してください。
検証準備
ソーステーブルのデータ確認
本検証においてソースとなるS3 Tables上のテーブルのデータを確認しておきます。
今回は検証用にdaily_sales
という名前でテーブルを作成しています。
-- データの内容確認
SELECT * FROM "s3tablescatalog"."cm_kawanago_tablebucket_namespace"."daily_sales"
ORDER BY sale_date
ソーステーブルには9レコードのデータが存在することが確認できます。
+------------+------------------+--------------+
| sale_date | product_category | sales_amount |
+------------+------------------+--------------+
| 2024-01-15 | Laptop | 900.0 |
| 2024-01-15 | Monitor | 250.0 |
| 2024-01-16 | Laptop | 1350.0 |
| 2024-02-01 | Monitor | 300.0 |
| 2024-02-01 | Keyboard | 60.0 |
| 2024-02-02 | Mouse | 25.0 |
| 2024-02-02 | Laptop | 1050.0 |
| 2024-02-03 | Laptop | 1200.0 |
| 2024-02-03 | Monitor | 375.0 |
+------------+------------------+--------------+
BigQueryテーブルの作成
ソーステーブルのスキーマに合わせて、
本検証のターゲットとなるBigQuery上のテーブルを作成します。
CREATE TABLE `<project-id>.cm_kawanago.daily_sales_from_aws` (
sale_date DATE,
product_category STRING,
sales_amount FLOAT64
)
PARTITION BY sale_date
OPTIONS(
description="AWS S3 Tables からのデータ連携用テーブル"
);
Glueジョブの作成
次にBigQuery
へデータを転送するGlue
ジョブを作成します。
スクリプトの大まかな流れは以下のようになっています。
- S3 Tablesのテーブルから
spark.sql
でデータを取得 - 取得したデータを
DynamicFrame
に変換 write_dynamic_frame.from_options
でデータを書き込み
Glueスクリプト
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.conf import SparkConf
# ジョブパラメータ
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'bigquery_connection_name',
'source_database',
'source_table',
'target_dataset',
'target_table',
'gcp_project_id'
])
# Spark設定
conf = SparkConf()
conf.set('spark.sql.catalog.s3tablesbucket', 'org.apache.iceberg.spark.SparkCatalog')
conf.set('spark.sql.catalog.s3tablesbucket.catalog-impl', 'software.amazon.s3tables.iceberg.S3TablesCatalog')
conf.set('spark.sql.catalog.s3tablesbucket.warehouse', 'arn:aws:s3tables:<region>:<account-id>:bucket/<bucket-name>')
conf.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
# SparkとGlueコンテキストの初期化
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print("=== Starting S3 Tables to BigQuery ETL Job ===")
try:
# S3TablesCatalogを使用してS3 Tablesからデータを読み取り
source_table_name = f"s3tablesbucket.{args['source_database']}.{args['source_table']}"
print(f"Reading from S3 Tables: {source_table_name}")
# SQLを使用してデータを読み取り
df_source = spark.sql(f"SELECT * FROM {source_table_name}")
# スキーマとレコード数を表示
print("Source data schema:")
df_source.printSchema()
record_count = df_source.count()
print(f"Number of records to transfer: {record_count}")
# Spark DataFrameをGlue用のDynamicFrameに変換
dynamic_frame = DynamicFrame.fromDF(df_source, glueContext, "dynamic_frame")
# BigQueryに書き込み
print("Writing data to BigQuery...")
bigquery_write = glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="bigquery",
connection_options={
"connectionName": args['bigquery_connection_name'],
"parentProject": args['gcp_project_id'],
"table": f"{args['target_dataset']}.{args['target_table']}",
"writeMethod": "direct"
},
transformation_ctx="bigquery_write"
)
print(f"Successfully transferred {record_count} records to BigQuery")
except Exception as e:
print(f"Error occurred during ETL process: {str(e)}")
raise e
finally:
job.commit()
上記のスクリプトをS3にアップロードし、Glue
ジョブを作成していきます。
# スクリプトをS3にアップロード
aws s3 cp glue_s3tables_to_bigquery.py s3://<your-bucket>/scripts/
# Glueジョブを作成
aws glue create-job --cli-input-json file://glue_job_definition.json
glue_job_definition.json
の設定は以下の通りです。
glue_job_definition.json
{
"Name": "s3tables-to-bigquery-daily-sales",
"Description": "S3 Tables から BigQuery へのデータ連携ジョブ",
"Role": "arn:aws:iam::<account-id>:role/<your-glue-role>",
"ExecutionProperty": {
"MaxConcurrentRuns": 1
},
"Command": {
"Name": "glueetl",
"ScriptLocation": "s3://<your-bucket>/scripts/glue_s3tables_to_bigquery.py",
"PythonVersion": "3"
},
"DefaultArguments": {
"--enable-metrics": "true",
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://<your-bucket>/spark-logs/",
"--enable-job-insights": "true",
"--enable-continuous-cloudwatch-log": "true",
"--job-bookmark-option": "job-bookmark-disable",
"--job-language": "python",
"--datalake-formats": "iceberg",
"--additional-python-modules": "pyarrow==14.0.1",
"--extra-jars": "s3://<your-bucket>/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar",
"--bigquery_connection_name": "Bigquery_connection",
"--source_database": "cm_kawanago_tablebucket_namespace",
"--source_table": "daily_sales",
"--target_dataset": "cm_kawanago",
"--target_table": "daily_sales_from_aws",
"--gcp_project_id": "<your-gcp-project-id>"
},
"MaxRetries": 0,
"Timeout": 60,
"GlueVersion": "5.0",
"MaxCapacity": 2.0,
"Connections": {
"Connections": ["Bigquery_connection"]
}
}
検証結果
作成したGlueジョブを実行すると、正常に処理が完了しました。
出力されたログは以下のようになっています。
ログ出力
"=== Starting S3 Tables to BigQuery ETL Job ===
Source: cm_kawanago_tablebucket_namespace.daily_sales
Target: <your-gcp-project-id>.cm_kawanago.daily_sales_from_aws
Reading from S3 Tables: s3tablesbucket.cm_kawanago_tablebucket_namespace.daily_sales"
"Source data schema:"
"root
|-- sale_date: date (nullable = true)
|-- product_category: string (nullable = true)
|-- sales_amount: double (nullable = true)"
"Number of records to transfer: 9
Sample data (first 10 rows):"
"
+----------+----------------+------------+
|sale_date |product_category|sales_amount|
+----------+----------------+------------+
|2024-01-15|Laptop |900.0 |
|2024-01-15|Monitor |250.0 |
|2024-01-16|Laptop |1350.0 |
|2024-02-01|Monitor |300.0 |
|2024-02-01|Keyboard |60.0 |
|2024-02-02|Mouse |25.0 |
|2024-02-02|Laptop |1050.0 |
|2024-02-03|Laptop |1200.0 |
|2024-02-03|Monitor |375.0 |
+----------+----------------+------------+
"
"Writing data to BigQuery..."
"Successfully transferred 9 records to BigQuery
=== ETL Job Completed Successfully ==="
"Running autoDebugger shutdown hook."
ターゲットであるBigQuery
のテーブルを確認してみます。
-- データの確認
SELECT * FROM `<project-id>.cm_kawanago.daily_sales_from_aws` ORDER BY sale_date
ソースデータと同じデータが、そのままINSERTされたことが分かります。
+------------+------------------+--------------+
| sale_date | product_category | sales_amount |
+------------+------------------+--------------+
| 2024-01-15 | Laptop | 900.0 |
| 2024-01-15 | Monitor | 250.0 |
| 2024-01-16 | Laptop | 1350.0 |
| 2024-02-01 | Monitor | 300.0 |
| 2024-02-01 | Keyboard | 60.0 |
| 2024-02-02 | Mouse | 25.0 |
| 2024-02-02 | Laptop | 1050.0 |
| 2024-02-03 | Laptop | 1200.0 |
| 2024-02-03 | Monitor | 375.0 |
+------------+------------------+--------------+
なお上記のINSERT処理の確認後、同じジョブを再度実行したところ、
想定通り各レコードが重複して計18行のデータとなりました。
さいごに
専用のコネクタを使用することで、GlueによってS3 Tablesから
BigQueryへのデータ連携が可能であることが確認できました。
しかしコネクタ内部で利用されているBigQuery Storage Write API
の制限により、
UPDATE
やDELETE
などの操作ができないことが課題として挙げられます。
もしUPSERT
のような実装が必要な場合には、以下のような対策が考えられます。
- 一度BigQuery側のデータを取得して、Glueジョブ内で比較した後に新規データのみ連携する
- BigQueryの外部テーブルを中間テーブルとして利用し、もう一本のETLをBigQueryで実装する
この記事が少しでも参考になれば幸いです。
最後まで記事を閲覧いただき、ありがとうございました。