AWS Glueデータカタログのスキーマが後から変更された際の挙動について調べてみた

2019.11.11

おつかれさまです。新井です。

今回は、AWS Glueのデータカタログについてです。

AWS Glueのデータカタログは、

  • Amazon Kinesis Firehose
  • AWS Glue Job
  • Amazon Athena

などのサービスから参照して利用できます。

便利な反面、データカタログ上のデータスキーマに変更(追加・削除・更新)が生じた際には結構面倒なことが起こります。

そもそもデータスキーマがころころ変わるなら使うべきではないのですが、今回は調査した結果の一部を共有したいと思います。

※ AWS Glueの利用歴が浅いので間違っている部分は指摘していただけると助かります。

事前準備

リソースの作成

リソース 説明
Kinesis Firehose データの投入口として用意します。「Convert record format」の設定で、出力形式にApache Parquetを、テーブル定義にGlueのデータカタログを指定します。
Glue Data Catalog 今回利用するGlueのデータカタログです。スキーマの詳細はテンプレートを参照してください。
Glue Crawler パーティションが追加された際に、データカタログを更新するためのクローラーです。
S3 ソースデータを保存しておくストレージです。
IAM Role/Policy 権限周りの設定です。
  • Terraformのテンプレート

variablesの部分をお好みで変更してください。

# Terraform Setting
terraform {
required_version = "0.12.6"
}

# Provider
provider "aws" {
region = "ap-northeast-1"
}

# Kinesis Firehose
resource "aws_kinesis_firehose_delivery_stream" "firehose_delivery_stream" {
name = "${var.firehose_name}"
destination = "extended_s3"
extended_s3_configuration {
bucket_arn = "${aws_s3_bucket.s3_bucket.arn}"
buffer_interval = "60"
buffer_size = "128"
compression_format = "UNCOMPRESSED"
data_format_conversion_configuration {
enabled = "true"
input_format_configuration {
deserializer {
open_x_json_ser_de {}
}
}
output_format_configuration {
serializer {
parquet_ser_de {}
}
}
schema_configuration {
database_name = "default"
region = "ap-northeast-1"
role_arn = "${aws_iam_role.firehose_iam_role.arn}"
table_name = "${aws_glue_catalog_table.glue_table.name}"
version_id = "LATEST"
}
}
error_output_prefix = "err/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}"
prefix = "data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
role_arn = "${aws_iam_role.firehose_iam_role.arn}"
}
}

# Glue Catalog
resource "aws_glue_catalog_table" "glue_table" {
name = "${var.glue_table_name}"
database_name = "default"
parameters = {
"classification" = "parquet"
}
retention = 0
table_type = "EXTERNAL_TABLE"
partition_keys {
name = "data"
type = "string"
}
partition_keys {
name = "year"
type = "string"
}
partition_keys {
name = "month"
type = "string"
}
partition_keys {
name = "day"
type = "string"
}
storage_descriptor {
location = "s3://${aws_s3_bucket.s3_bucket.id}/"
input_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"

ser_de_info {
name = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
parameters = {
"serialization.format" = 1
}
}
columns {
name = "user_id"
type = "string"
}
columns {
name = "user_name"
type = "string"
}
columns {
name = "unknown_column"
type = "int"
}
}
}

# Glue Crawler
resource "aws_glue_crawler" "glue_crawler" {
name = "${var.glue_crawler_name}"
database_name = "default"
role = "${aws_iam_role.glue_iam_role.arn}"
catalog_target {
database_name = "default"
tables = ["${aws_glue_catalog_table.glue_table.name}"]
}
configuration = "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\"}},\"Grouping\":{\"TableGroupingPolicy\":\"CombineCompatibleSchemas\"}}"
schema_change_policy {
delete_behavior = "LOG"
update_behavior = "LOG"
}
}

# S3
resource "aws_s3_bucket" "s3_bucket" {
bucket = "${var.bucket_name}"
region = "ap-northeast-1"
}

# IAM Role
resource "aws_iam_role" "firehose_iam_role" {
name = "${var.firehose_role_name}"
assume_role_policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
POLICY
}

resource "aws_iam_role" "glue_iam_role" {
name = "${var.glue_role_name}"

assume_role_policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
POLICY
}

# IAM Policy
# ※面倒なので権限は絞っていません
resource "aws_iam_role_policy" "firehose_access_policy" {
name = "${var.firehose_policy_name}"
role = "${aws_iam_role.firehose_iam_role.id}"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*",
"glue:*"
],
"Resource": "*"
}
]
}
POLICY
}

resource "aws_iam_role_policy" "glue_access_policy" {
name = "${var.glue_policy_name}"
role = "${aws_iam_role.glue_iam_role.id}"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*",
"glue:*"
],
"Resource": "*"
}
]
}
POLICY
}

# Variables
variable "firehose_role_name" {
default = "catalog-test-firehose-role"
}
variable "glue_role_name" {
default = "catalog-test-glue-role"
}
variable "firehose_policy_name" {
default = "catalog-test-firehose-policy"
}
variable "glue_policy_name" {
default = "catalog-test-glue-policy"
}
variable "firehose_name" {
default = "catalog-test-firehose-delivery-stream"
}
variable "glue_table_name" {
default = "catalog-test-glue-table"
}
variable "glue_crawler_name" {
default = "catalog-test-glue-crawler"
}
variable "bucket_name" {
default = "catalog-test-s3-bucket"
}

開発エンドポイントの立ち上げ

今回は開発エンドポイントでPySparkジョブを実行します。Glueのバージョンは1.0(spark 2.4, Python3)を利用します。

開発エンドポイントの立ち上げ方法は公式ドキュメントを確認してください。

  • ソースコード
%pyspark
from pyspark.sql import SQLContext
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# 引数取得
args = {"JOB_NAME": "test"}

# セットアップ
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DATABASE_NAME = "default"
TABLE_NAME = "catalog-test-glue-table"

# DynamicFrameの作成
datasource = glueContext.create_dynamic_frame.from_catalog(
database=DATABASE_NAME,
table_name=TABLE_NAME
)
datasource.printSchema()
datasource.toDF().show()

job.commit()

事前にデータを登録する

作成済みのGlueデータカタログのスキーマに沿った形式でデータを投入しておきます。

  • 作成済みのスキーマ
列名 データ型 パーティションキー
user_id string
user_name string
unknown_column int ※後で型を更新するカラムです
year string Partition (0)
month string Partition (1)
day string Partition (2)
  • JSONデータをKinesis Firehose経由でS3に保存
[
{
"user_id": "001",
"user_name": "arai",
"unknown_column": 1
},
{
"user_id": "002",
"user_name": "seiichi",
"unknown_column": 2
}
]
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"001\", \"user_name\": \"arai\", \"unknown_column\": 1}"}'

aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"002\", \"user_name\": \"seiichi\", \"unknown_column\": 2}"}'

事前準備のまとめ

事前準備に結構時間がかかりました。

この時点で、

  • リソースが作成されていること
  • 開発エンドポイントが利用できること
  • 事前のデータ登録が済んでいること

が確認できればOKです。

ここからデータカタログに変更(追加・削除・更新)を加えていきます。

追加

データカタログへカラムを追加してみる

まずは、特定のカラムを追加した後の、Glue Jobの挙動を見てみます。

  • データカタログのスキーマ
列名 データ型 パーティションキー
user_id string
user_name string
unknown_column int
appended_column string 追加されたカラム
year string Partition (0)
month string Partition (1)
day string Partition (2)
  • ジョブ実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+

DynamicFrameのスキーマ出力には、appended_columnが表示されていませんでした。 Crawlerを実行して、ジョブを再実行してみましたが、結果は変わりませんでした。

S3へappended_columnが含まれるファイルを追加してみる

この状態でappended_columnが含まれる下記のデータを投入して、再度Glue Jobの結果を見てみます。

  • データ投入
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"001\", \"user_name\": \"arai\", \"unknown_column\": 1, \"appended_column\": \"string\"}"}'
  • ジョブ実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 001| arai| 1|2019| 11| 8| 11|
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+

出力データ件数は増えましたが、appended_columnは出力されませんでした。

しばらく調査してみると、stack overflowで同じ質問している人を見つけました。

どうやらadditional_options={"mergeSchema": "true"}このオプションを追加したらよさそうです。

  • DynamicFrameを以下のように修正
datasource = glueContext.create_dynamic_frame.from_catalog(
database=DATABASE_NAME,
table_name=TABLE_NAME,
additional_options={"mergeSchema": "true"}
)
  • ジョブ実行結果(additional_optionsを指定した場合)
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- appended_column: string
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+---------------+----+-----+---+----+
|user_id|user_name|unknown_column|appended_column|year|month|day|hour|
+-------+---------+--------------+---------------+----+-----+---+----+
| 001| arai| 1| string|2019| 11| 8| 11|
| 002| seiichi| 2| null|2019| 11| 4| 23|
| 001| arai| 1| null|2019| 11| 4| 23|
+-------+---------+--------------+---------------+----+-----+---+----+

これで、appended_columnがDynamicFrameで読み込まれるようになりました。

削除

データカタログからカラムを削除してみる

次に、appended_columnをカタログから削除してみます。

  • データカタログのスキーマ
列名 データ型 パーティションキー
user_id string
user_name string
unknown_column int
year string Partition (0)
month string Partition (1)
day string Partition (2)
  • ジョブの実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- appended_column: string
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+---------------+----+-----+---+----+
|user_id|user_name|unknown_column|appended_column|year|month|day|hour|
+-------+---------+--------------+---------------+----+-----+---+----+
| 001| arai| 1| string|2019| 11| 8| 11|
| 002| seiichi| 2| null|2019| 11| 4| 23|
| 001| arai| 1| null|2019| 11| 4| 23|
+-------+---------+--------------+---------------+----+-----+---+----+

この時点では、変化はありませんでした。 データカタログ上から削除されたのにかかわらず、appended_columnは出力されています。

S3からappended_columnが含まれるファイルを削除してみる

appended_columnが含まれている対象のparquetファイルを削除した後、ジョブを再実行してみます。

  • ジョブの実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+

今度はappended_columnがスキーマから消えました。

どうやら、DynamicFrameでparquetファイルを読み込む際はparquetファイル側のスキーマが採用されている様です。

更新

データカタログの特定カラムのデータ型を更新してみる

unknown_columnのデータ型をint型からstring型に更新してみます。

  • データカタログのスキーマ
列名 データ型 パーティションキー
user_id string
user_name string
unknown_column string
year string Partition (0)
month string Partition (1)
day string Partition (2)
  • ジョブの実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+

データカタログ上はstring型ですがint型として扱われています。 parquetファイル側のデータ型が採用されている様です。

s3へstring型のunknown_columnを含むファイルを追加してみる

データ型更新後のデータを投入してみます。

  • データ追加
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"001\", \"user_name\": \"arai\", \"unknown_column\": \"string\"}"}'
  • ジョブの実行結果
root
|-- user_id: string (nullable = true)
|-- user_name: string (nullable = true)
|-- unknown_column: string (nullable = true)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$11$$anonfun$apply$10.apply(ParquetFileFormat.scala:591)
...以下略

下記のエラーが出力されました。

Failed to merge incompatible data types IntegerType and StringType

parqeutファイル内の同一カラム(unknown_column)のデータ型不一致によるエラーが出力されました。

まとめ

DynamicFrameでS3上のparquetファイルを読み込んだ際、データカタログ上のスキーマ(データ型)ではなく、ファイル中のスキーマが採用されているようです。

つまりparquetファイルに限った話であれば、データカタログ上のスキーマはあまり関係ないのかもしれません。 (csvファイルをDynamicFrameで読み込んだらデータカタログの型が反映されていました。)

parquetファイルにカラムが追加・削除された場合は、additional_options={"mergeSchema": "true"}を追加すればうまく補完してくれそうです。

一方、カラムのデータ型が更新された場合は、データ型不一致でエラーになるので注意が必要です。

その他

上記を試している中で気づいたことを記載しておきます。

Kinesis FirehoseとGlueデータカタログ連携について

Glueのデータカタログのデータ型にある程度キャストしてくれるみたいです。

unknown_columnの値 登録 内容
1 成功 int型として登録
1.1 エラー "lastErrorMessage": "Data does not match the schema. For input string: \"1.1\""
922337203685477580 エラー "lastErrorMessage": "Data does not match the schema. For input string: \"922337203685477580\""
"111" 成功 int型として登録

データカタログのパーティション・プロパティについて

クローラー実行で更新される「パーティション」の「プロパティ」が何なのかずっと疑問でしたが、Athenaでの検索はここを参照しているぽいです。