こんにちは、CX事業本部 Delivery部の若槻です。
S3 Bucket + Glue Table で管理しているデータを、次のようにスキーマの変換をしつつ移行(洗い替え)したいケースがありました。
移行元カラム名 | 移行先カラム名 | 移行後の値 |
---|---|---|
id | id | 引き継ぎ |
timestamp | timestamp | 引き継ぎ |
temperature | internal_temperature | 引き継ぎ |
- | external_temperature | 0 |
要するに、id
と timestamp
は同じスキーマ名および値を使用、temperature
は internal_temperature
にスキーマ名を変換、また external_temperature
スキーマを新しく設け既定値は 0
にするという洗い替えを行いたいです。
そこで今回は、大量のデータをサーバーレスな環境のみでクエリして分析することができる Amazon Athena を利用して、スキーマの変換を伴う洗い替えを行う方法を確認してみました。
環境作成
環境作成は AWS CDK (TypeScript)で行います。移行元および移行先となる S3 Bucket と Glue Table のリソースを作成しています。
lib/cdk-sample-app.ts
import { aws_s3, RemovalPolicy, Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
export class CdkSampleStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Source bucket
const mySourceBucket = new aws_s3.Bucket(this, 'MySourceBucket', {
removalPolicy: RemovalPolicy.DESTROY,
});
// Destination bucket
const myDestinationBucket = new aws_s3.Bucket(this, 'MyDestinationBucket', {
removalPolicy: RemovalPolicy.DESTROY,
});
const myDatabase = new glue_alpha.Database(this, 'MyDatabase', {
databaseName: 'my_database',
});
// Source table
new glue_alpha.Table(this, 'MySourceTable', {
database: myDatabase,
tableName: 'my_source_table',
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'timestamp',
type: glue_alpha.Schema.INTEGER,
},
{
name: 'temperature',
type: glue_alpha.Schema.FLOAT,
},
],
dataFormat: glue_alpha.DataFormat.JSON,
bucket: mySourceBucket,
s3Prefix: 'data',
});
// Destination table
new glue_alpha.Table(this, 'MyDestinationTable', {
database: myDatabase,
tableName: 'my_destination_table',
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'timestamp',
type: glue_alpha.Schema.INTEGER,
},
{
name: 'internal_temperature',
type: glue_alpha.Schema.FLOAT,
},
{
name: 'external_temperature',
type: glue_alpha.Schema.FLOAT,
},
],
dataFormat: glue_alpha.DataFormat.JSON,
bucket: myDestinationBucket,
s3Prefix: 'data',
});
}
}
洗い替えしてみる
移行元データ作成
まず INSERT INTO
文を利用したクエリで移行元テーブルにデータを作成します。
INSERT INTO my_source_table
VALUES
('d001', 1682866800, 12.7),
('d002', 1682953200, 29.3),
('d003', 1683039600, 7.9),
('d001', 1683126000, 26.5),
('d001', 1683212400, 18.2)
移行元テーブルに対して SELECT
クエリを実行してみます。
SELECT * FROM my_source_table
移行元のテーブルにデータが作成されています。
# | id | timestamp | temperature |
---|---|---|---|
1 | d001 | 1682866800 | 12.7 |
2 | d002 | 1682953200 | 29.3 |
3 | d003 | 1683039600 | 7.9 |
4 | d001 | 1683126000 | 26.5 |
5 | d001 | 1683212400 | 18.2 |
洗い替えする
あるテーブルから他のテーブルへ洗い替えを行う場合は、INSERT INTO ... SELECT ... FROM ...
というクエリを利用します。
データの取得元を先程のように VALUE
で直接指定するのではなく、SELECT ... FROM ...
で他のテーブルを指定します。
この時、スキーマの変換方法にいくつかパターンがあったので、それぞれ紹介します。
その1
まず、SELECT
の中で AS
を使用して移行元と移行先のスキーマを指定する方法です。
下記クエリを実行してみます。
INSERT INTO my_destination_table
SELECT
id,
timestamp,
temperature AS internal_temperature,
0 AS external_temperature
FROM my_source_table
そして移行先テーブルに対して SELECT
クエリを実行してみます。
SELECT * FROM my_destination_table
データが移行されていることが確認できました。
# | id | timestamp | internal_temperature | external_temperature |
---|---|---|---|---|
1 | d001 | 1682866800 | 12.7 | 0.0 |
2 | d002 | 1682953200 | 29.3 | 0.0 |
3 | d003 | 1683039600 | 7.9 | 0.0 |
4 | d001 | 1683126000 | 26.5 | 0.0 |
5 | d001 | 1683212400 | 18.2 | 0.0 |
その2
次に、INSERT INTO
側で移行先、SELECT
側で移行元のスキーマを指定する方法です。
下記クエリを実行してみます。
INSERT INTO my_destination_table
(id, timestamp, internal_temperature, external_temperature)
SELECT id, timestamp, temperature, 0
FROM my_source_table
そして移行先テーブルに対して SELECT
クエリを実行してみます。
SELECT * FROM my_destination_table
データが移行されていることが確認できました。(INSERT INTO
なので移行先のデータは上書きされずに残ります)
その3(非推奨)
非推奨ですが、移行先のスキーマの指定は必須ではありません。次のように省略することもできます。
INSERT INTO my_destination_table
SELECT id, timestamp, temperature, 0
FROM my_source_table
しかしこの場合は移行先のスキーマの順番が SELECT
で指定した順番と一致している必要があり、移行先のスキーマに変更があった場合はクエリを修正する必要があります。そのため移行先のスキーマを指定する方法を推奨します。
参考
以上