この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
おつかれさまです。新井です。
今回は、AWS Glueのデータカタログについてです。
AWS Glueのデータカタログは、
- Amazon Kinesis Firehose
- AWS Glue Job
- Amazon Athena
などのサービスから参照して利用できます。
便利な反面、データカタログ上のデータスキーマに変更(追加・削除・更新)が生じた際には結構面倒なことが起こります。
そもそもデータスキーマがころころ変わるなら使うべきではないのですが、今回は調査した結果の一部を共有したいと思います。
※ AWS Glueの利用歴が浅いので間違っている部分は指摘していただけると助かります。
事前準備
リソースの作成
リソース | 説明 |
---|---|
Kinesis Firehose | データの投入口として用意します。「Convert record format」の設定で、出力形式にApache Parquetを、テーブル定義にGlueのデータカタログを指定します。 |
Glue Data Catalog | 今回利用するGlueのデータカタログです。スキーマの詳細はテンプレートを参照してください。 |
Glue Crawler | パーティションが追加された際に、データカタログを更新するためのクローラーです。 |
S3 | ソースデータを保存しておくストレージです。 |
IAM Role/Policy | 権限周りの設定です。 |
- Terraformのテンプレート
variables
の部分をお好みで変更してください。
# Terraform Setting
terraform {
required_version = "0.12.6"
}
# Provider
provider "aws" {
region = "ap-northeast-1"
}
# Kinesis Firehose
resource "aws_kinesis_firehose_delivery_stream" "firehose_delivery_stream" {
name = "${var.firehose_name}"
destination = "extended_s3"
extended_s3_configuration {
bucket_arn = "${aws_s3_bucket.s3_bucket.arn}"
buffer_interval = "60"
buffer_size = "128"
compression_format = "UNCOMPRESSED"
data_format_conversion_configuration {
enabled = "true"
input_format_configuration {
deserializer {
open_x_json_ser_de {}
}
}
output_format_configuration {
serializer {
parquet_ser_de {}
}
}
schema_configuration {
database_name = "default"
region = "ap-northeast-1"
role_arn = "${aws_iam_role.firehose_iam_role.arn}"
table_name = "${aws_glue_catalog_table.glue_table.name}"
version_id = "LATEST"
}
}
error_output_prefix = "err/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}"
prefix = "data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
role_arn = "${aws_iam_role.firehose_iam_role.arn}"
}
}
# Glue Catalog
resource "aws_glue_catalog_table" "glue_table" {
name = "${var.glue_table_name}"
database_name = "default"
parameters = {
"classification" = "parquet"
}
retention = 0
table_type = "EXTERNAL_TABLE"
partition_keys {
name = "data"
type = "string"
}
partition_keys {
name = "year"
type = "string"
}
partition_keys {
name = "month"
type = "string"
}
partition_keys {
name = "day"
type = "string"
}
storage_descriptor {
location = "s3://${aws_s3_bucket.s3_bucket.id}/"
input_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
ser_de_info {
name = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
parameters = {
"serialization.format" = 1
}
}
columns {
name = "user_id"
type = "string"
}
columns {
name = "user_name"
type = "string"
}
columns {
name = "unknown_column"
type = "int"
}
}
}
# Glue Crawler
resource "aws_glue_crawler" "glue_crawler" {
name = "${var.glue_crawler_name}"
database_name = "default"
role = "${aws_iam_role.glue_iam_role.arn}"
catalog_target {
database_name = "default"
tables = ["${aws_glue_catalog_table.glue_table.name}"]
}
configuration = "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\"}},\"Grouping\":{\"TableGroupingPolicy\":\"CombineCompatibleSchemas\"}}"
schema_change_policy {
delete_behavior = "LOG"
update_behavior = "LOG"
}
}
# S3
resource "aws_s3_bucket" "s3_bucket" {
bucket = "${var.bucket_name}"
region = "ap-northeast-1"
}
# IAM Role
resource "aws_iam_role" "firehose_iam_role" {
name = "${var.firehose_role_name}"
assume_role_policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
POLICY
}
resource "aws_iam_role" "glue_iam_role" {
name = "${var.glue_role_name}"
assume_role_policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
POLICY
}
# IAM Policy
# ※面倒なので権限は絞っていません
resource "aws_iam_role_policy" "firehose_access_policy" {
name = "${var.firehose_policy_name}"
role = "${aws_iam_role.firehose_iam_role.id}"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*",
"glue:*"
],
"Resource": "*"
}
]
}
POLICY
}
resource "aws_iam_role_policy" "glue_access_policy" {
name = "${var.glue_policy_name}"
role = "${aws_iam_role.glue_iam_role.id}"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*",
"glue:*"
],
"Resource": "*"
}
]
}
POLICY
}
# Variables
variable "firehose_role_name" {
default = "catalog-test-firehose-role"
}
variable "glue_role_name" {
default = "catalog-test-glue-role"
}
variable "firehose_policy_name" {
default = "catalog-test-firehose-policy"
}
variable "glue_policy_name" {
default = "catalog-test-glue-policy"
}
variable "firehose_name" {
default = "catalog-test-firehose-delivery-stream"
}
variable "glue_table_name" {
default = "catalog-test-glue-table"
}
variable "glue_crawler_name" {
default = "catalog-test-glue-crawler"
}
variable "bucket_name" {
default = "catalog-test-s3-bucket"
}
開発エンドポイントの立ち上げ
今回は開発エンドポイントでPySparkジョブを実行します。Glueのバージョンは1.0(spark 2.4, Python3)を利用します。
開発エンドポイントの立ち上げ方法は公式ドキュメントを確認してください。
- ソースコード
%pyspark
from pyspark.sql import SQLContext
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# 引数取得
args = {"JOB_NAME": "test"}
# セットアップ
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DATABASE_NAME = "default"
TABLE_NAME = "catalog-test-glue-table"
# DynamicFrameの作成
datasource = glueContext.create_dynamic_frame.from_catalog(
database=DATABASE_NAME,
table_name=TABLE_NAME
)
datasource.printSchema()
datasource.toDF().show()
job.commit()
事前にデータを登録する
作成済みのGlueデータカタログのスキーマに沿った形式でデータを投入しておきます。
- 作成済みのスキーマ
列名 | データ型 | パーティションキー |
---|---|---|
user_id | string | |
user_name | string | |
unknown_column | int | ※後で型を更新するカラムです |
year | string | Partition (0) |
month | string | Partition (1) |
day | string | Partition (2) |
- JSONデータをKinesis Firehose経由でS3に保存
[
{
"user_id": "001",
"user_name": "arai",
"unknown_column": 1
},
{
"user_id": "002",
"user_name": "seiichi",
"unknown_column": 2
}
]
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"001\", \"user_name\": \"arai\", \"unknown_column\": 1}"}'
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"002\", \"user_name\": \"seiichi\", \"unknown_column\": 2}"}'
事前準備のまとめ
事前準備に結構時間がかかりました。
この時点で、
- リソースが作成されていること
- 開発エンドポイントが利用できること
- 事前のデータ登録が済んでいること
が確認できればOKです。
ここからデータカタログに変更(追加・削除・更新)を加えていきます。
追加
データカタログへカラムを追加してみる
まずは、特定のカラムを追加した後の、Glue Jobの挙動を見てみます。
- データカタログのスキーマ
列名 | データ型 | パーティションキー |
---|---|---|
user_id | string | |
user_name | string | |
unknown_column | int | |
appended_column | string | 追加されたカラム |
year | string | Partition (0) |
month | string | Partition (1) |
day | string | Partition (2) |
- ジョブ実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+
DynamicFrameのスキーマ出力には、appended_column
が表示されていませんでした。
Crawlerを実行して、ジョブを再実行してみましたが、結果は変わりませんでした。
S3へappended_column
が含まれるファイルを追加してみる
この状態でappended_column
が含まれる下記のデータを投入して、再度Glue Jobの結果を見てみます。
- データ投入
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"001\", \"user_name\": \"arai\", \"unknown_column\": 1, \"appended_column\": \"string\"}"}'
- ジョブ実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 001| arai| 1|2019| 11| 8| 11|
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+
出力データ件数は増えましたが、appended_column
は出力されませんでした。
しばらく調査してみると、stack overflowで同じ質問している人を見つけました。
どうやらadditional_options={"mergeSchema": "true"}
このオプションを追加したらよさそうです。
- DynamicFrameを以下のように修正
datasource = glueContext.create_dynamic_frame.from_catalog(
database=DATABASE_NAME,
table_name=TABLE_NAME,
additional_options={"mergeSchema": "true"}
)
- ジョブ実行結果(
additional_options
を指定した場合)
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- appended_column: string
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+---------------+----+-----+---+----+
|user_id|user_name|unknown_column|appended_column|year|month|day|hour|
+-------+---------+--------------+---------------+----+-----+---+----+
| 001| arai| 1| string|2019| 11| 8| 11|
| 002| seiichi| 2| null|2019| 11| 4| 23|
| 001| arai| 1| null|2019| 11| 4| 23|
+-------+---------+--------------+---------------+----+-----+---+----+
これで、appended_column
がDynamicFrameで読み込まれるようになりました。
削除
データカタログからカラムを削除してみる
次に、appended_column
をカタログから削除してみます。
- データカタログのスキーマ
列名 | データ型 | パーティションキー |
---|---|---|
user_id | string | |
user_name | string | |
unknown_column | int | |
year | string | Partition (0) |
month | string | Partition (1) |
day | string | Partition (2) |
- ジョブの実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- appended_column: string
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+---------------+----+-----+---+----+
|user_id|user_name|unknown_column|appended_column|year|month|day|hour|
+-------+---------+--------------+---------------+----+-----+---+----+
| 001| arai| 1| string|2019| 11| 8| 11|
| 002| seiichi| 2| null|2019| 11| 4| 23|
| 001| arai| 1| null|2019| 11| 4| 23|
+-------+---------+--------------+---------------+----+-----+---+----+
この時点では、変化はありませんでした。
データカタログ上から削除されたのにかかわらず、appended_column
は出力されています。
S3からappended_column
が含まれるファイルを削除してみる
appended_column
が含まれている対象のparquetファイルを削除した後、ジョブを再実行してみます。
- ジョブの実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+
今度はappended_column
がスキーマから消えました。
どうやら、DynamicFrameでparquetファイルを読み込む際はparquetファイル側のスキーマが採用されている様です。
更新
データカタログの特定カラムのデータ型を更新してみる
unknown_column
のデータ型をint型からstring型に更新してみます。
- データカタログのスキーマ
列名 | データ型 | パーティションキー |
---|---|---|
user_id | string | |
user_name | string | |
unknown_column | string | |
year | string | Partition (0) |
month | string | Partition (1) |
day | string | Partition (2) |
- ジョブの実行結果
root
|-- user_id: string
|-- user_name: string
|-- unknown_column: int
|-- year: int
|-- month: int
|-- day: int
|-- hour: int
+-------+---------+--------------+----+-----+---+----+
|user_id|user_name|unknown_column|year|month|day|hour|
+-------+---------+--------------+----+-----+---+----+
| 002| seiichi| 2|2019| 11| 4| 23|
| 001| arai| 1|2019| 11| 4| 23|
+-------+---------+--------------+----+-----+---+----+
データカタログ上はstring型ですがint型として扱われています。 parquetファイル側のデータ型が採用されている様です。
s3へstring型のunknown_column
を含むファイルを追加してみる
データ型更新後のデータを投入してみます。
- データ追加
aws firehose put-record --delivery-stream-name catalog-test-firehose-delivery-stream --record '{"Data": "{\"user_id\": \"001\", \"user_name\": \"arai\", \"unknown_column\": \"string\"}"}'
- ジョブの実行結果
root
|-- user_id: string (nullable = true)
|-- user_name: string (nullable = true)
|-- unknown_column: string (nullable = true)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$11$$anonfun$apply$10.apply(ParquetFileFormat.scala:591)
...以下略
下記のエラーが出力されました。
Failed to merge incompatible data types IntegerType and StringType
parqeutファイル内の同一カラム(unknown_column
)のデータ型不一致によるエラーが出力されました。
まとめ
DynamicFrameでS3上のparquetファイルを読み込んだ際、データカタログ上のスキーマ(データ型)ではなく、ファイル中のスキーマが採用されているようです。
つまりparquetファイルに限った話であれば、データカタログ上のスキーマはあまり関係ないのかもしれません。 (csvファイルをDynamicFrameで読み込んだらデータカタログの型が反映されていました。)
parquetファイルにカラムが追加・削除された場合は、additional_options={"mergeSchema": "true"}
を追加すればうまく補完してくれそうです。
一方、カラムのデータ型が更新された場合は、データ型不一致でエラーになるので注意が必要です。
その他
上記を試している中で気づいたことを記載しておきます。
Kinesis FirehoseとGlueデータカタログ連携について
Glueのデータカタログのデータ型にある程度キャストしてくれるみたいです。
unknown_columnの値 | 登録 | 内容 |
---|---|---|
1 | 成功 | int型として登録 |
1.1 | エラー | "lastErrorMessage": "Data does not match the schema. For input string: \"1.1\"" |
922337203685477580 | エラー | "lastErrorMessage": "Data does not match the schema. For input string: \"922337203685477580\"" |
"111" | 成功 | int型として登録 |
データカタログのパーティション・プロパティについて
クローラー実行で更新される「パーティション」の「プロパティ」が何なのかずっと疑問でしたが、Athenaでの検索はここを参照しているぽいです。
- 公式ドキュメント
- テーブルメタデータの更新
- テーブルの詳細の表示と編集