AWS CLIによるはじめてのAWS Glue

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

中山(順)です

クラスメソッドでAWSソリューションアーキテクトをやっていると、AWSに関するあらゆることを聞かれます。 全てのサービスについて詳しくなるのは物理的に厳しいのですが、少なくとも大半のサービスについて「コノサービスチョットワカル」くらいは言えないといけないのかなーと思っています。

ただ、Webinerを見たりするだけでは全然理解度上がらないわけです。 やっぱり、やってみてナンボです。 で、マネージメントコンソールでポチポチやって理解する、ってパターンが多いです。 しかし、最初はそれが良いものの、理解を深めるために繰り返しているとだんだんめんどくさくなってきました。

というわけで、今日はCLIでGlueをさわった結果をまとめます。

Glueとは?

一言で言うと、マネージドなETLサービスです。 基本的にインスタンスを管理する必要はありません。

ETLスクリプトはPythonもしくはScalaで書けます。

詳細は弊社ブログをご覧ください(丸投げ)。

Glue – 特集カテゴリー –

Glueで登場するリソース

今回、以下のリソースを利用します。

  • Database
  • Table
  • Connection
  • Crawler
  • Job

これ以外に以下のようなリソースがあります。この記事ではこれらの説明は割愛します。

  • Classifier
  • Trrigger
  • Partition
  • DevEndpoint

やってみる

今回は最初の一歩ということで、「MySQLのテーブルをJSONに変換」してみたいと思います。

流れは以下のような感じです。

  • 事前準備
    • VPCの作成(DNS、S3 Endpoint、Security Group)
    • サービス用IAM Role
    • データソース(今回はMySQL)
  • Glue環境構築
    • データソースへの"Connection"の定義
    • "Database"(Glue)の作成
    • "Crawler"によるデータカタログ("Table"(Glue))の作成
    • "Job"の定義
  • Jobの実行と動作確認

構成図

MySQL上のテーブルに対して何らかの処理を施し、S3バケットに変換したデータをロードしたいと思います。

やってみた

VPCの作成

データソースがVPC上に存在する場合、いくつか留意すべき点があります。

  • 名前解決
    • GlueをVPCで利用する場合、VPCネットワーク属性であるenableDnsHostnamesとenableDnsSupportをtrueに設定する必要があります。
    • VPC での DNS のセットアップ
  • S3へのアクセス
    • Glueで利用するENIからS3にアクセスする必要があります。このENIにはEIPおよびPublic IPを割り当てることができませんので、NAT GatewayへのルートもしくはS3 Endpointへのルートを構成する必要があります。
    • Amazon S3 における Amazon VPC エンドポイント
  • セキュリティグループ

これを踏まえてVPCの設定をやっていきましょう。

まず、VPCを作成します。

VPC_CIDR="10.0.0.0/16"

aws ec2 create-vpc \
    --cidr-block ${VPC_CIDR}

VPCの属性を確認します。

VPC_ID="vpc-xxxxxxxxxxxxxxxxx"

aws ec2 describe-vpc-attribute \
    --attribute enableDnsSupport \
    --vpc-id ${VPC_ID}
{
    "VpcId": "vpc-02de374b7879fa4da",
    "EnableDnsSupport": {
        "Value": true
    }
}
aws ec2 describe-vpc-attribute \
    --attribute enableDnsHostnames \
    --vpc-id ${VPC_ID}
{
    "VpcId": "vpc-02de374b7879fa4da",
    "EnableDnsHostnames": {
        "Value": false
    }
}

enableDnsHostnamesがfalseなので、動作要件を満たすためにtrueへ変更します。

aws ec2 modify-vpc-attribute \
    --enable-dns-hostnames \
    --vpc-id ${VPC_ID}

属性が変更されたことを確認します。

aws ec2 describe-vpc-attribute \
    --attribute enableDnsHostnames \
    --vpc-id ${VPC_ID}
{
    "VpcId": "vpc-02de374b7879fa4da",
    "EnableDnsHostnames": {
        "Value": true
    }
}

次に、S3 Endpointを構成します。

今回はデフォルトのルートテーブルにルートを追加します。 Route Table IDを確認します。

aws ec2 describe-route-tables \
    --filters Name=vpc-id,Values=${VPC_ID} Name=association.main,Values=true \
    --query RouteTables[0].RouteTableId \
    --output text
rtb-09ee148f5ed4f48c1

確認できたら、S3 Endpointを作成と併せてRoute TableにS3へのルートを追加します。

ROUTE_TABLE_ID="rtb-09ee148f5ed4f48c1"

aws ec2 create-vpc-endpoint \
    --vpc-endpoint-type Gateway \
    --vpc-id ${VPC_ID} \
    --service-name "com.amazonaws.ap-northeast-1.s3" \
    --route-table-ids ${ROUTE_TABLE_ID}
{
    "VpcEndpoint": {
        "PolicyDocument": "{\"Version\":\"2008-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"*\",\"Resource\":\"*\"}]}",
        "VpcId": "vpc-02de374b7879fa4da",
        "NetworkInterfaceIds": [],
        "SubnetIds": [],
        "PrivateDnsEnabled": false,
        "State": "available",
        "ServiceName": "com.amazonaws.ap-northeast-1.s3",
        "RouteTableIds": [
            "rtb-09ee148f5ed4f48c1"
        ],
        "Groups": [],
        "VpcEndpointId": "vpce-01ff913755a7e9e6c",
        "VpcEndpointType": "Gateway",
        "CreationTimestamp": "2018-07-31T09:37:06Z",
        "DnsEntries": []
    }
}

次に、セキュリティグループを作成します。 今回は公式ドキュメントに則って自己参照ルールを作成します。

まずはセキュリティグループを作成します。

aws ec2 create-security-group \
    --description "for AWS Glue" \
    --group-name glue \
    --vpc-id ${VPC_ID}
{
    "GroupId": "sg-0bca817b34c2654d0"
}

次に、インバウンドの自己参照ルールを追加します。

SECURITY_GROUP_ID="sg-0bca817b34c2654d0"

aws ec2 authorize-security-group-ingress \
    --group-id ${SECURITY_GROUP_ID} \
    --protocol tcp \
    --port 0-65535 \
    --source-group ${SECURITY_GROUP_ID}

次に、アウトバウンドの自己参照ルールを追加します。

aws ec2 authorize-security-group-egress \
    --group-id ${SECURITY_GROUP_ID} \
    --protocol tcp \
    --port 0-65535 \
    --source-group ${SECURITY_GROUP_ID}

最後にセキュリティグループの内容を確認します。

aws ec2 describe-security-groups \
    --group-ids ${SECURITY_GROUP_ID}
{
    "SecurityGroups": [
        {
            "IpPermissionsEgress": [
                {
                    "PrefixListIds": [],
                    "FromPort": 0,
                    "IpRanges": [],
                    "ToPort": 65535,
                    "IpProtocol": "tcp",
                    "UserIdGroupPairs": [
                        {
                            "UserId": "XXXXXXXXXXXX",
                            "GroupId": "sg-0bca817b34c2654d0"
                        }
                    ],
                    "Ipv6Ranges": []
                },
                {
                    "IpProtocol": "-1",
                    "PrefixListIds": [],
                    "IpRanges": [
                        {
                            "CidrIp": "0.0.0.0/0"
                        }
                    ],
                    "UserIdGroupPairs": [],
                    "Ipv6Ranges": []
                }
            ],
            "Description": "for AWS Glue",
            "IpPermissions": [
                {
                    "PrefixListIds": [],
                    "FromPort": 0,
                    "IpRanges": [],
                    "ToPort": 65535,
                    "IpProtocol": "tcp",
                    "UserIdGroupPairs": [
                        {
                            "UserId": "XXXXXXXXXXXX",
                            "GroupId": "sg-0bca817b34c2654d0"
                        }
                    ],
                    "Ipv6Ranges": []
                }
            ],
            "GroupName": "glue",
            "VpcId": "vpc-02de374b7879fa4da",
            "OwnerId": "XXXXXXXXXXXX",
            "GroupId": "sg-0bca817b34c2654d0"
        }
    ]
}

サービス用IAM Role

Glueを利用するためにはサービス用のロールが必要です。

必要な権限等についてはドキュメントをご確認ください。

AWS Glue の IAM アクセス許可のセットアップ

まず、信頼ポリシーを記述します。 信頼するPrincipalはGlueです。

TRUST_POLICY_FILE='trust-glue.json'

cat << EOF > ${TRUST_POLICY_FILE}
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
EOF

cat ${TRUST_POLICY_FILE}

次にポリシーの準備を行います。 今回はAWS管理ポリシーをそのまま使いたいと思います。 本番環境では必要に応じてポリシーを調整してください。 詳細はドキュメントを確認してください。

ステップ 1: AWS Glue サービスの IAM ポリシーを作成します。

利用するポリシーは"AWSGlueServiceRole"および"AmazonS3FullAccess"です。

GLUE_POLICY_ARN="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
S3_POLICY_ARN="arn:aws:iam::aws:policy/AmazonS3FullAccess"

ポリシーの準備ができたので、まずはロールを作成します。 この際、ロール名のプレフィックスが"AWSGlueServiceRole"となるように設定してください。

ROLE_NAME="AWSGlueServiceRole-DevIO"

aws iam create-role \
    --role-name ${ROLE_NAME} \
    --assume-role-policy-document file://${TRUST_POLICY_FILE}
{
    "Role": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Action": "sts:AssumeRole",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": [
                            "glue.amazonaws.com"
                        ]
                    }
                }
            ]
        },
        "RoleId": "AROAJLFYD3I7TU7C7AWIU",
        "CreateDate": "2018-07-31T10:23:20.492Z",
        "RoleName": "AWSGlueServiceRole-DevIO",
        "Path": "/",
        "Arn": "arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole-DevIO"
    }
}

最後にポリシーをロールにアタッチします。

aws iam attach-role-policy \
    --role-name ${ROLE_NAME} \
    --policy-arn ${GLUE_POLICY_ARN}

aws iam attach-role-policy \
    --role-name ${ROLE_NAME} \
    --policy-arn ${S3_POLICY_ARN}

ロールにポリシーがアタッチされていることを確認できれば、IAM関連の準備は完了です。

aws iam list-attached-role-policies \
    --role-name ${ROLE_NAME}
{
    "AttachedPolicies": [
        {
            "PolicyName": "AmazonS3FullAccess",
            "PolicyArn": "arn:aws:iam::aws:policy/AmazonS3FullAccess"
        },
        {
            "PolicyName": "AWSGlueServiceRole",
            "PolicyArn": "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
        }
    ]
}

データソース(今回はMySQL)

このパートで前準備は最後です。

データソースを用意します。 今回はMySQLです。

まず、サブネットを作成します。

aws ec2 create-subnet \
    --availability-zone ap-northeast-1a \
    --cidr-block "10.0.0.0/24" \
    --vpc-id ${VPC_ID}
{
    "Subnet": {
        "AvailabilityZone": "ap-northeast-1a",
        "AvailableIpAddressCount": 251,
        "DefaultForAz": false,
        "Ipv6CidrBlockAssociationSet": [],
        "VpcId": "vpc-02de374b7879fa4da",
        "State": "pending",
        "MapPublicIpOnLaunch": false,
        "SubnetId": "subnet-01a116ee9cc4b9741",
        "CidrBlock": "10.0.0.0/24",
        "AssignIpv6AddressOnCreation": false
    }
}
aws ec2 create-subnet \
    --availability-zone ap-northeast-1d \
    --cidr-block "10.0.1.0/24" \
    --vpc-id ${VPC_ID}
{
    "Subnet": {
        "AvailabilityZone": "ap-northeast-1d",
        "AvailableIpAddressCount": 251,
        "DefaultForAz": false,
        "Ipv6CidrBlockAssociationSet": [],
        "VpcId": "vpc-02de374b7879fa4da",
        "State": "pending",
        "MapPublicIpOnLaunch": false,
        "SubnetId": "subnet-0e03b5c0c27d8bd28",
        "CidrBlock": "10.0.1.0/24",
        "AssignIpv6AddressOnCreation": false
    }
}

次に、サブネットグループの作成を行います。

SUBNET_ID_1="subnet-01a116ee9cc4b9741"
SUBNET_ID_2="subnet-0e03b5c0c27d8bd28"
DB_SUBNET_GROUP_NAME="glue-data-source"

aws rds create-db-subnet-group \
    --db-subnet-group-name ${DB_SUBNET_GROUP_NAME} \
    --db-subnet-group-description "AWS Glue Data Source" \
    --subnet-ids ${SUBNET_ID_1} ${SUBNET_ID_2}
{
    "DBSubnetGroup": {
        "Subnets": [
            {
                "SubnetStatus": "Active",
                "SubnetIdentifier": "subnet-01a116ee9cc4b9741",
                "SubnetAvailabilityZone": {
                    "Name": "ap-northeast-1a"
                }
            },
            {
                "SubnetStatus": "Active",
                "SubnetIdentifier": "subnet-0e03b5c0c27d8bd28",
                "SubnetAvailabilityZone": {
                    "Name": "ap-northeast-1d"
                }
            }
        ],
        "VpcId": "vpc-02de374b7879fa4da",
        "DBSubnetGroupDescription": "AWS Glue Data Source",
        "SubnetGroupStatus": "Complete",
        "DBSubnetGroupArn": "arn:aws:rds:ap-northeast-1:XXXXXXXXXXXX:subgrp:glue-data-source",
        "DBSubnetGroupName": "glue-data-source"
    }
}

RDSインスタンスを作成します。 パラメーターは適宜調整してください。 ここで指定するセキュリティグループはGlueの設定("Connection"の作成)でも利用します。

DB_INSTANCE_ID="myinstance-devio"
DB_MASTER_USER="master"

aws rds create-db-instance \
    --db-name mydatabase \
    --db-instance-identifier ${DB_INSTANCE_ID} \
    --allocated-storage 20 \
    --db-instance-class db.t2.micro \
    --engine mysql \
    --master-username ${DB_MASTER_USER} \
    --master-user-password mypassword \
    --vpc-security-group-ids ${SECURITY_GROUP_ID} \
    --db-subnet-group-name ${DB_SUBNET_GROUP_NAME} \
    --no-multi-az \
    --storage-type gp2
{
    "DBInstance": {
        "PubliclyAccessible": false,
        "MasterUsername": "master",
        "MonitoringInterval": 0,
        "LicenseModel": "general-public-license",
        "VpcSecurityGroups": [
            {
                "Status": "active",
                "VpcSecurityGroupId": "sg-0bca817b34c2654d0"
            }
        ],
        "CopyTagsToSnapshot": false,
        "OptionGroupMemberships": [
            {
                "Status": "in-sync",
                "OptionGroupName": "default:mysql-5-6"
            }
        ],
        "PendingModifiedValues": {
            "MasterUserPassword": "****"
        },
        "Engine": "mysql",
        "MultiAZ": false,
        "DBSecurityGroups": [],
        "DBParameterGroups": [
            {
                "DBParameterGroupName": "default.mysql5.6",
                "ParameterApplyStatus": "in-sync"
            }
        ],
        "PerformanceInsightsEnabled": false,
        "AutoMinorVersionUpgrade": true,
        "PreferredBackupWindow": "15:32-16:02",
        "DBSubnetGroup": {
            "Subnets": [
                {
                    "SubnetStatus": "Active",
                    "SubnetIdentifier": "subnet-01a116ee9cc4b9741",
                    "SubnetAvailabilityZone": {
                        "Name": "ap-northeast-1a"
                    }
                },
                {
                    "SubnetStatus": "Active",
                    "SubnetIdentifier": "subnet-0e03b5c0c27d8bd28",
                    "SubnetAvailabilityZone": {
                        "Name": "ap-northeast-1d"
                    }
                }
            ],
            "DBSubnetGroupName": "glue-data-source",
            "VpcId": "vpc-02de374b7879fa4da",
            "DBSubnetGroupDescription": "AWS Glue Data Source",
            "SubnetGroupStatus": "Complete"
        },
        "ReadReplicaDBInstanceIdentifiers": [],
        "AllocatedStorage": 20,
        "DBInstanceArn": "arn:aws:rds:ap-northeast-1:XXXXXXXXXXXX:db:myinstance-devio",
        "BackupRetentionPeriod": 1,
        "DBName": "mydatabase",
        "PreferredMaintenanceWindow": "wed:19:08-wed:19:38",
        "DBInstanceStatus": "creating",
        "IAMDatabaseAuthenticationEnabled": false,
        "EngineVersion": "5.6.39",
        "DomainMemberships": [],
        "StorageType": "gp2",
        "DbiResourceId": "db-EZ75PXOYWUQWC77SL63P2EGPLE",
        "CACertificateIdentifier": "rds-ca-2015",
        "StorageEncrypted": false,
        "DBInstanceClass": "db.t2.micro",
        "DbInstancePort": 0,
        "DBInstanceIdentifier": "myinstance-devio"
    }
}

作成できたら、エンドポイントを確認します。

aws rds describe-db-instances \
    --db-instance-identifier ${DB_INSTANCE_ID} \
    --query DBInstances[0].Endpoint.Address \
    --output text
myinstance-devio.cd8id7gqyjw5.ap-northeast-1.rds.amazonaws.com

現時点ではRDSインスタンスにデータが入っていない状態のため、データを投入します。 今回はGitHubのaws-samplesより拝借します(こういうのほんと助かります!)。

aws-samples/aws-database-migration-samples

今回はEC2で踏み台となるインスタンスを作成してデータの投入を行います。 EC2インスタンスの作成やセキュリティグループの設定、インターネットゲートウェイの設定などの手順は割愛します。 踏み台となるEC2インスタンスからMySQLクライアントでRDSインスタンスに接続できる状態のところから説明を続けます。

まず、gitでリポジトリーをクローンします。

git clone https://github.com/aws-samples/aws-database-migration-samples.git

MySQLサーバーにアクセスし、データベースを作成します。

cd aws-database-migration-samples/mysql/sampledb/v1/
DB_ENDPOINT="myinstance-devio.cd8id7gqyjw5.ap-northeast-1.rds.amazonaws.com"

mysql -h ${DB_ENDPOINT} -u ${DB_MASTER_USER} -p 

MySQLにログインできたら、データベースを作成するSQL文を実行します。 移動したディレクトリ上に存在します。

データベースの作成には1時間弱かかります。 途中でフリーズしたような感じになりますが、ちゃんと動いていると思うので気長に待ちましょう。

\. install-rds.sql

ここまでで、Glueをお試しするための環境が整いました。 ちなみに、S3をデータソースにするにはIAMロールの作成だけで事前準備はOKです。

データソースへの"Connection"の定義

データソースへのアクセスにアクセスするための接続情報を定義します。 クロールおよびジョブの実行時に利用します。 この際に事前に作成したセキュリティグループを指定し、RDSインスタンスと通信できるように構成してください。

GLUE_CONN_NAME="connection-devio"
GLUE_CONN_DESC="for Developers.IO"
GLUE_CONN_TYPE="JDBC"

aws glue create-connection \
    --connection-input "Name=${GLUE_CONN_NAME},Description=\"${GLUE_CONN_DESC}\",ConnectionType=${GLUE_CONN_TYPE},ConnectionProperties={USERNAME=${DB_MASTER_USER},PASSWORD=mypassword,JDBC_CONNECTION_URL=\"jdbc:mysql://${DB_ENDPOINT}:3306/dms_sample\"},PhysicalConnectionRequirements={SubnetId=${SUBNET_ID_1},SecurityGroupIdList=[${SECURITY_GROUP_ID}]}"

"Database"(Glue)の作成

データベースを作成します。

GLUE_DB_NAME="glue-db"
GLUE_DB_DESC="Developers.IO"

aws glue create-database \
    --database-input Name=${GLUE_DB_NAME},Description="${GLUE_DB_DESC}"

データベースが作成されたことを確認します。

aws glue get-database \
    --name ${GLUE_DB_NAME}
{
    "Database": {
        "Name": "glue-db",
        "Description": "Developers.IO"
    }
}

"Crawler"によるデータカタログ("Table"(Glue))の作成

クローラを作成します。

クローラを定義するための事前準備として、ターゲット(テータソース)情報を定義します。 ここではConnectionをクロール対象のテーブルを指定します。 今回は作成したデータベース上の全てのテーブルを指定します。

CRAWLER_TARGET_FILE='crawler-target.json'

cat << EOF > ${CRAWLER_TARGET_FILE}
{
  "JdbcTargets": [
    {
      "ConnectionName": "${GLUE_CONN_NAME}",
      "Path": "dms_sample/%"
    }
  ]
}
EOF

cat ${CRAWLER_TARGET_FILE}

次にクローラを作成します。

GLUE_DRAWLER_NAME="crawler-devio"

aws glue create-crawler \
    --name ${GLUE_DRAWLER_NAME} \
    --role ${ROLE_NAME} \
    --database-name ${GLUE_DB_NAME} \
    --targets file://${CRAWLER_TARGET_FILE}

クローラが正常に定義できたことを確認します。

aws glue get-crawler \
    --name ${GLUE_DRAWLER_NAME}
{
    "Crawler": {
        "CrawlElapsedTime": 0,
        "Name": "crawler-devio",
        "CreationTime": 1533039226.0,
        "LastUpdated": 1533039226.0,
        "Targets": {
            "JdbcTargets": [
                {
                    "Path": "dms_sample/%",
                    "Exclusions": [],
                    "ConnectionName": "connection-devio"
                }
            ],
            "S3Targets": []
        },
        "State": "READY",
        "Version": 1,
        "Role": "AWSGlueServiceRole-DevIO",
        "DatabaseName": "glue-db",
        "SchemaChangePolicy": {
            "DeleteBehavior": "DEPRECATE_IN_DATABASE",
            "UpdateBehavior": "UPDATE_IN_DATABASE"
        },
        "Classifiers": []
    }
}

クロールを実行します。

aws glue start-crawler \
    --name ${GLUE_DRAWLER_NAME}

データベースが作成されたことを確認します。 全てのテーブル情報を表示すると長すぎるので、一つだけ表示します。

aws glue get-tables \
    --database-name ${GLUE_DB_NAME} \
    --query TableList[?Name==\`dms_sample_sporting_event\`]
[
    {
        "StorageDescriptor": {
            "Parameters": {
                "CrawlerSchemaDeserializerVersion": "1.0",
                "compressionType": "none",
                "UPDATED_BY_CRAWLER": "crawler-devio",
                "classification": "mysql",
                "typeOfData": "table",
                "CrawlerSchemaSerializerVersion": "1.0",
                "connectionName": "connection-devio"
            },
            "SortColumns": [],
            "BucketColumns": [],
            "Columns": [
                {
                    "Type": "int",
                    "Name": "home_team_id"
                },
                {
                    "Type": "boolean",
                    "Name": "sold_out"
                },
                {
                    "Type": "bigint",
                    "Name": "id"
                },
                {
                    "Type": "int",
                    "Name": "away_team_id"
                },
                {
                    "Type": "timestamp",
                    "Name": "start_date_time"
                },
                {
                    "Type": "string",
                    "Name": "sport_type_name"
                },
                {
                    "Type": "smallint",
                    "Name": "location_id"
                },
                {
                    "Type": "date",
                    "Name": "start_date"
                }
            ],
            "Location": "dms_sample.sporting_event",
            "NumberOfBuckets": -1,
            "StoredAsSubDirectories": false,
            "SerdeInfo": {
                "Parameters": {}
            },
            "Compressed": false
        },
        "UpdateTime": 1533051032.0,
        "PartitionKeys": [],
        "Name": "dms_sample_sporting_event",
        "Parameters": {
            "CrawlerSchemaDeserializerVersion": "1.0",
            "compressionType": "none",
            "UPDATED_BY_CRAWLER": "crawler-devio",
            "classification": "mysql",
            "typeOfData": "table",
            "CrawlerSchemaSerializerVersion": "1.0",
            "connectionName": "connection-devio"
        },
        "LastAccessTime": 1533051032.0,
        "CreatedBy": "arn:aws:sts::XXXXXXXXXXXX:assumed-role/AWSGlueServiceRole-DevIO/AWS-Crawler",
        "TableType": "EXTERNAL_TABLE",
        "Owner": "owner",
        "CreateTime": 1533051032.0,
        "Retention": 0
    }
]

"Job"の定義

ここまで来たらあと少しです。

最後にETLジョブを作って実行するのですが、そのためにスクリプトの作成とジョブの実行に必要なS3バケットの作成を行います。

まず、ジョブの実行に必要なS3バケットは以下の通りです。

  • ETL処理対象の保存(データソースがS3の場合)
  • ライブラリの保存(外部のライブラリを利用する場合)
  • ".jar"ファイルの保存
  • 任意のファイルの保存
  • ETLスクリプトの配置
  • 中間結果の一時保存
  • ETL処理結果の保存(ロード先をS3にする場合)

今回は以下の用途のバケットのみを作成します。

  • ETLスクリプトの配置
  • 中間結果の一時保存(今回利用しませんが、利用する氏シーンが多そうなので設定だけ行います)
  • ETL処理結果の保存
SCRIPT_BUCKET_NAME="aws-glue-script-devio-ap-northeast-1"
TEMP_BUCKET_NAME="aws-glue-temporary-devio-ap-northeast-1"
RESULT_BUCKET_NAME="aws-glue-result-devio-ap-northeast-1"

aws s3 mb s3://${SCRIPT_BUCKET_NAME}
aws s3 mb s3://${TEMP_BUCKET_NAME}
aws s3 mb s3://${RESULT_BUCKET_NAME}

スクリプトを作成します。 今回は指定したテーブルを読み込んでJSONに変換しS3にロードするだけのスクリプトです。

ETL_SCRIPT_FILE='tb2json'

cat << EOF > ${ETL_SCRIPT_FILE}
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "${GLUE_DB_NAME}", table_name = "dms_sample_sporting_event", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("home_team_id", "int", "home_team_id", "int"), ("sold_out", "boolean", "sold_out", "boolean"), ("id", "long", "id", "long"), ("away_team_id", "int", "away_team_id", "int"), ("start_date_time", "timestamp", "start_date_time", "timestamp"), ("sport_type_name", "string", "sport_type_name", "string"), ("location_id", "short", "location_id", "short"), ("start_date", "date", "start_date", "date")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://${RESULT_BUCKET_NAME}"}, format = "json", transformation_ctx = "datasink2")

job.commit()
EOF

cat ${ETL_SCRIPT_FILE}

作成したスクリプトをS3バケットに配置します。

aws s3 cp ${ETL_SCRIPT_FILE} s3://${SCRIPT_BUCKET_NAME}/${ETL_SCRIPT_FILE}

ジョブを定義します。

DEFAULT_ARGUMENT_FILE='argument.json'

cat << EOF > ${DEFAULT_ARGUMENT_FILE}
{
  "--TempDir": "s3://${TEMP_BUCKET_NAME}",
  "--job-language": "python",
  "--job-bookmark-option": "job-bookmark-disable"
}
EOF

cat ${DEFAULT_ARGUMENT_FILE}

GLUE_JOB_NAME="job-devio"

aws glue create-job \
    --name ${GLUE_JOB_NAME} \
    --role ${ROLE_NAME} \
    --command "Name=glueetl,ScriptLocation=s3://${SCRIPT_BUCKET_NAME}/${ETL_SCRIPT_FILE}" \
    --connections Connections=${GLUE_CONN_NAME} \
    --default-arguments file://${DEFAULT_ARGUMENT_FILE}
{
    "Name": "job-devio"
}

最後にジョブを実行します。

aws glue start-job-run \
    --job-name ${GLUE_JOB_NAME}
{
    "JobRunId": "jr_a032b148dd4f46d9ae243301515fc7668dee371630a9fd0e0f1bd77be0f1ca75"
}

ETL処理の結果を確認します。

aws s3 ls s3://${RESULT_BUCKET_NAME}
2018-08-01 03:11:06      10125 run-1533060658972-part-r-00000
2018-08-01 03:11:06      10300 run-1533060658972-part-r-00001
2018-08-01 03:11:06      10302 run-1533060658972-part-r-00002
2018-08-01 03:11:06      10301 run-1533060658972-part-r-00003
2018-08-01 03:11:11      10302 run-1533060658972-part-r-00004
2018-08-01 03:11:11      10302 run-1533060658972-part-r-00005
2018-08-01 03:11:11      10303 run-1533060658972-part-r-00006
2018-08-01 03:11:11      10305 run-1533060658972-part-r-00007
2018-08-01 03:11:06      10305 run-1533060658972-part-r-00008
2018-08-01 03:11:06      10304 run-1533060658972-part-r-00009
2018-08-01 03:11:06      10307 run-1533060658972-part-r-00010
2018-08-01 03:11:06      10308 run-1533060658972-part-r-00011
2018-08-01 03:11:06      10308 run-1533060658972-part-r-00012
2018-08-01 03:11:06      10307 run-1533060658972-part-r-00013
2018-08-01 03:11:06      10306 run-1533060658972-part-r-00014
2018-08-01 03:11:06      10301 run-1533060658972-part-r-00015
2018-08-01 03:11:06      10301 run-1533060658972-part-r-00016
2018-08-01 03:11:06      10301 run-1533060658972-part-r-00017
2018-08-01 03:11:06      10301 run-1533060658972-part-r-00018
2018-08-01 03:11:07      10123 run-1533060658972-part-r-00019

ファイルをダウンロードし、中身を確認します。

aws s3 cp s3://${RESULT_BUCKET_NAME}/run-1533060658972-part-r-00000 run-1533060658972-part-r-00000

head aws s3 cp s3://${RESULT_BUCKET_NAME}/run-1533060658972-part-r-00000 run-1533060658972-part-r-00000
{"home_team_id":33,"sold_out":false,"id":20,"away_team_id":53,"start_date_time":"2018-08-11 15:00:00.0","sport_type_name":"baseball","location_id":20,"start_date":"2018-08-11"}
{"home_team_id":34,"sold_out":false,"id":40,"away_team_id":45,"start_date_time":"2018-06-09 17:00:00.0","sport_type_name":"baseball","location_id":26,"start_date":"2018-06-09"}
{"home_team_id":35,"sold_out":false,"id":60,"away_team_id":37,"start_date_time":"2018-04-07 17:00:00.0","sport_type_name":"baseball","location_id":27,"start_date":"2018-04-07"}
{"home_team_id":35,"sold_out":false,"id":80,"away_team_id":57,"start_date_time":"2018-08-25 13:00:00.0","sport_type_name":"baseball","location_id":27,"start_date":"2018-08-25"}
{"home_team_id":36,"sold_out":false,"id":100,"away_team_id":49,"start_date_time":"2018-06-23 13:00:00.0","sport_type_name":"baseball","location_id":30,"start_date":"2018-06-23"}
{"home_team_id":37,"sold_out":false,"id":120,"away_team_id":41,"start_date_time":"2018-04-21 15:00:00.0","sport_type_name":"baseball","location_id":11,"start_date":"2018-04-21"}
{"home_team_id":37,"sold_out":false,"id":140,"away_team_id":61,"start_date_time":"2018-09-08 13:00:00.0","sport_type_name":"baseball","location_id":11,"start_date":"2018-09-08"}
{"home_team_id":38,"sold_out":false,"id":160,"away_team_id":53,"start_date_time":"2018-07-07 17:00:00.0","sport_type_name":"baseball","location_id":28,"start_date":"2018-07-07"}
{"home_team_id":39,"sold_out":false,"id":180,"away_team_id":45,"start_date_time":"2018-05-05 19:00:00.0","sport_type_name":"baseball","location_id":19,"start_date":"2018-05-05"}
{"home_team_id":39,"sold_out":false,"id":200,"away_team_id":35,"start_date_time":"2018-09-29 14:00:00.0","sport_type_name":"baseball","location_id":19,"start_date":"2018-09-29"}

ということで、ETL処理できることまで確認できました。

まとめ

このように、ETL処理を簡単に実現できることが確認できました。

実際に手を動かすことで理解も深まりますし、設定できること・できないことも見えてきます。

考えるのもいいんですけど、感じるのいいんじゃないでしょうか?

現場からは以上です。