
Amazon Athena でスキーマの変換を伴うデータの洗い替えしてみた
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 Delivery部の若槻です。
S3 Bucket + Glue Table で管理しているデータを、次のようにスキーマの変換をしつつ移行(洗い替え)したいケースがありました。
| 移行元カラム名 | 移行先カラム名 | 移行後の値 | 
|---|---|---|
| id | id | 引き継ぎ | 
| timestamp | timestamp | 引き継ぎ | 
| temperature | internal_temperature | 引き継ぎ | 
| - | external_temperature | 0 | 
要するに、id と timestamp は同じスキーマ名および値を使用、temperature は internal_temperature にスキーマ名を変換、また external_temperature スキーマを新しく設け既定値は 0 にするという洗い替えを行いたいです。
そこで今回は、大量のデータをサーバーレスな環境のみでクエリして分析することができる Amazon Athena を利用して、スキーマの変換を伴う洗い替えを行う方法を確認してみました。
環境作成
環境作成は AWS CDK (TypeScript)で行います。移行元および移行先となる S3 Bucket と Glue Table のリソースを作成しています。
import { aws_s3, RemovalPolicy, Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);
    // Source bucket
    const mySourceBucket = new aws_s3.Bucket(this, 'MySourceBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
    });
    // Destination bucket
    const myDestinationBucket = new aws_s3.Bucket(this, 'MyDestinationBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
    });
    const myDatabase = new glue_alpha.Database(this, 'MyDatabase', {
      databaseName: 'my_database',
    });
    // Source table
    new glue_alpha.Table(this, 'MySourceTable', {
      database: myDatabase,
      tableName: 'my_source_table',
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'timestamp',
          type: glue_alpha.Schema.INTEGER,
        },
        {
          name: 'temperature',
          type: glue_alpha.Schema.FLOAT,
        },
      ],
      dataFormat: glue_alpha.DataFormat.JSON,
      bucket: mySourceBucket,
      s3Prefix: 'data',
    });
    // Destination table
    new glue_alpha.Table(this, 'MyDestinationTable', {
      database: myDatabase,
      tableName: 'my_destination_table',
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'timestamp',
          type: glue_alpha.Schema.INTEGER,
        },
        {
          name: 'internal_temperature',
          type: glue_alpha.Schema.FLOAT,
        },
        {
          name: 'external_temperature',
          type: glue_alpha.Schema.FLOAT,
        },
      ],
      dataFormat: glue_alpha.DataFormat.JSON,
      bucket: myDestinationBucket,
      s3Prefix: 'data',
    });
  }
}
洗い替えしてみる
移行元データ作成
まず INSERT INTO 文を利用したクエリで移行元テーブルにデータを作成します。
INSERT INTO my_source_table
VALUES
    ('d001', 1682866800, 12.7),
    ('d002', 1682953200, 29.3),
    ('d003', 1683039600, 7.9),
    ('d001', 1683126000, 26.5),
    ('d001', 1683212400, 18.2)
移行元テーブルに対して SELECT クエリを実行してみます。
SELECT * FROM my_source_table
移行元のテーブルにデータが作成されています。

| # | id | timestamp | temperature | 
|---|---|---|---|
| 1 | d001 | 1682866800 | 12.7 | 
| 2 | d002 | 1682953200 | 29.3 | 
| 3 | d003 | 1683039600 | 7.9 | 
| 4 | d001 | 1683126000 | 26.5 | 
| 5 | d001 | 1683212400 | 18.2 | 
洗い替えする
あるテーブルから他のテーブルへ洗い替えを行う場合は、INSERT INTO ... SELECT ... FROM ... というクエリを利用します。
データの取得元を先程のように VALUE で直接指定するのではなく、SELECT ... FROM ... で他のテーブルを指定します。
この時、スキーマの変換方法にいくつかパターンがあったので、それぞれ紹介します。
その1
まず、SELECT の中で AS を使用して移行元と移行先のスキーマを指定する方法です。
下記クエリを実行してみます。
INSERT INTO my_destination_table SELECT id, timestamp, temperature AS internal_temperature, 0 AS external_temperature FROM my_source_table
そして移行先テーブルに対して SELECT クエリを実行してみます。
SELECT * FROM my_destination_table
データが移行されていることが確認できました。

| # | id | timestamp | internal_temperature | external_temperature | 
|---|---|---|---|---|
| 1 | d001 | 1682866800 | 12.7 | 0.0 | 
| 2 | d002 | 1682953200 | 29.3 | 0.0 | 
| 3 | d003 | 1683039600 | 7.9 | 0.0 | 
| 4 | d001 | 1683126000 | 26.5 | 0.0 | 
| 5 | d001 | 1683212400 | 18.2 | 0.0 | 
その2
次に、INSERT INTO 側で移行先、SELECT 側で移行元のスキーマを指定する方法です。
下記クエリを実行してみます。
INSERT INTO my_destination_table (id, timestamp, internal_temperature, external_temperature) SELECT id, timestamp, temperature, 0 FROM my_source_table
そして移行先テーブルに対して SELECT クエリを実行してみます。
SELECT * FROM my_destination_table
データが移行されていることが確認できました。(INSERT INTO なので移行先のデータは上書きされずに残ります)

その3(非推奨)
非推奨ですが、移行先のスキーマの指定は必須ではありません。次のように省略することもできます。
INSERT INTO my_destination_table SELECT id, timestamp, temperature, 0 FROM my_source_table
しかしこの場合は移行先のスキーマの順番が SELECT で指定した順番と一致している必要があり、移行先のスキーマに変更があった場合はクエリを修正する必要があります。そのため移行先のスキーマを指定する方法を推奨します。
参考
以上






