firehoseを使ってs3にparquet形式でデータを保存する

firehoseを使ってs3にparquet形式でデータを保存する

firehoseを使ってs3にparquet形式でデータを保存してみました。 IoT Core * firehose * S3 はよく使うのですが、parquet形式はやったことなかったので勉強になりました。
Clock Icon2025.03.04

firehoseを使ってs3にparquet形式でデータを保存する

firehoseを使ってs3にparquet形式でデータを保存してみました。
IoT Core * firehose * S3 はよく使うのですが、parquet形式はやったことなかったので勉強になりました。

コードは以下のリポジトリにおいてあります。

https://github.com/yamatatsu/play-firehose-s3-glue-athena

今回やること

  • IoT Core -> Firehose -> S3の構成でデータをS3に保存する
    • データはparquet形式で保存する
    • データはSNAPPYで圧縮する
  • データはathenaからクエリできるようにする
    • パーティションを用いてコストとパフォーマンスを最適化する
    • パーティションのキーは日付(JST)を用いる
  • CDKで構築する
    • cdkデプロイするだけでテストデータの投入も行われる

構成図

Untitled diagram-2025-03-04-102931

CDK

import * as iot from '@aws-cdk/aws-iot-alpha';
import * as iotActions from '@aws-cdk/aws-iot-actions-alpha';
import * as glue from '@aws-cdk/aws-glue-alpha';
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';

const app = new cdk.App();
const stack = new cdk.Stack(app, 'PlayFirehoseS3GlueAthenaStack');

// ================================================
// MQTTで送信されたデータを溜めておくS3バケット
// firehose&glueがparquetに変換してここに置く

const bucket = new s3.Bucket(stack, "Bucket", {
  encryption: s3.BucketEncryption.S3_MANAGED,
  blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
  // サンプル実装なので
  removalPolicy: cdk.RemovalPolicy.DESTROY,
  autoDeleteObjects: true,
});

// ================================================
// Glueの設定
// 以下の二つの目的のためにGlueを使用する
// - FirehoseがデータをS3に保存する際のparquetスキーマを提供
// - Athenaでデータをクエリするためのスキーマを提供

const glueDatabase = new glue.Database(stack, "Database");
const glueTable = new glue.S3Table(stack, "S3Table", {
  database: glueDatabase,
  partitionKeys: [
    // glue tableでは、カラム名は全て小文字で扱う必要がある
    { name: "devicename", type: glue.Schema.STRING },
    { name: "date", type: glue.Schema.STRING },
  ],
  columns: [
    { name: "hour", type: glue.Schema.STRING },
    { name: "minute", type: glue.Schema.STRING },
    { name: "second", type: glue.Schema.STRING },
    { name: "metrics", type: glue.Schema.map(glue.Schema.STRING, glue.Schema.FLOAT) },
  ],
  bucket,
  s3Prefix: "data/",
  dataFormat: glue.DataFormat.PARQUET,
  storageParameters: [
    glue.StorageParameter.compressionType(glue.CompressionType.SNAPPY),
  ],
  // partition projectionの設定
  parameters: {
    "projection.enabled": "true",
    "projection.devicename.type": "injected",
    "projection.date.type": "date",
    "projection.date.range": "2025/03/01,NOW",
    "projection.date.format": "yyyy/MM/dd",
    "projection.date.interval": "1",
    "projection.date.interval.unit": "DAYS",
    "storage.location.template": "s3://" + bucket.bucketName + "/data/${devicename}/${date}/",
  },
});

/**
 * JSTタイムゾーンでパーティショニングを行うためにLambda Processorを使用する。
 */
const handler = new NodejsFunction(stack, "FirehoseEnricher", {
  runtime: lambda.Runtime.NODEJS_22_X,
  architecture: lambda.Architecture.ARM_64,
  // firehose上のコンソールで1分以上の値にするように警告された
  timeout: cdk.Duration.minutes(1),
  environment: {
    // 日付を扱うため、重要
    TZ: "Asia/Tokyo",
  },
});

/**
 * firehoseがGlueのSchemaを参照するためのIAMロール
 * 本当はDeliveryStreamから取得したかったけど、公開されてなかった。。。なぜ。。。。
 */
const firehoseSchemaRole = new iam.Role(stack, "FirehoseSchemaRole", {
  assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
  inlinePolicies: {
    catalogPolicy: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ["glue:GetTableVersions"],
          resources: [stack.formatArn({ service: "glue", resource: "*" })],
        }),
      ],
    }),
  },
});
glueTable.grantRead(firehoseSchemaRole);

// ================================================
// firehoseの設定

const firehoseStream = new firehose.DeliveryStream(stack, "FirehoseStream", {
  destination: new firehose.S3Bucket(bucket, {
    // バッファは最大に
    bufferingInterval: cdk.Duration.minutes(15),
    bufferingSize: cdk.Size.mebibytes(128),
    // `OutputFormatConfiguration.Serializer.ParquetSerDe.Compression`に指定がある場合、`compression`には`UNCOMPRESSED`を指定する。
    compression: firehose.Compression.of("UNCOMPRESSED"),
    processor: new firehose.LambdaFunctionProcessor(handler),
    dataOutputPrefix: "data/!{partitionKeyFromLambda:deviceName}/!{partitionKeyFromLambda:date}/",
    errorOutputPrefix: "error/",
  }),
});
const cfnFirehoseStream = firehoseStream.node.defaultChild as firehose.CfnDeliveryStream;
cfnFirehoseStream.addPropertyOverride(
  "ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration",
  { Enabled: true },
);
cfnFirehoseStream.addPropertyOverride(
  "ExtendedS3DestinationConfiguration.DataFormatConversionConfiguration",
  /**
   * Glue TableのSchemaを参照してparquetに変換する設定
   */
  {
    Enabled: true,
    /** Glue Tableへの参照 */
    SchemaConfiguration: {
      CatalogId: glueDatabase.catalogId,
      RoleARN: firehoseSchemaRole.roleArn,
      DatabaseName: glueDatabase.databaseName,
      TableName: glueTable.tableName,
      Region: stack.region,
      VersionId: "LATEST",
    },
    /**
     * 入力設定
     * glueでは列名に大文字を含めることができないため、ここで小文字に変換する
     */
    InputFormatConfiguration: {
      Deserializer: {
        OpenXJsonSerDe: { CaseInsensitive: false },
      },
    },
    /**
     * 出力設定
     * SNAPPYで圧縮したparquetを出力する
     */
    OutputFormatConfiguration: {
      Serializer: {
        ParquetSerDe: { Compression: "SNAPPY" },
      },
    },
  },
);

// ================================================
// IoT Coreの設定

const errorLog = new logs.LogGroup(stack, "ErrorLog", {
  retention: logs.RetentionDays.ONE_DAY,
});

new iot.TopicRule(stack, "TopicRule", {
  sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'iot-data'"),
  actions: [
    new iotActions.FirehosePutRecordAction(firehoseStream)
  ],
  errorAction: new iotActions.CloudWatchLogsAction(errorLog),
});

// ================================================
// 擬似的にIoTデータを投入する

const dataPublisher = new NodejsFunction(stack, "DataPublisher", {
  runtime: lambda.Runtime.NODEJS_22_X,
  architecture: lambda.Architecture.ARM_64,
  timeout: cdk.Duration.minutes(15),
  initialPolicy: [
    new iam.PolicyStatement({
      actions: ["iot:Publish"],
      resources: ["*"],
    }),
  ]
});

for (const num of [...Array(3).keys()]) {
  new cdk.triggers.Trigger(stack, `Trigger${num}`, {
    timeout: cdk.Duration.minutes(15),
    handler: dataPublisher,
  })
}

まとめ

Glue TableはAthenaのためにスキーマ定義を書くときに使っていて「なぜAthenaの機能じゃないんだ?」と思っていました。
今回parquet変換のためにスキーマが使われるのを見て「Athenaのサブ機能ではなかったのか。。。」ということを理解できました。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.