[Node.js] Streamを使って、DynamoDBの全ItemをScanで吸い出しつつCSVに変換してS3バケットにアップロードしたい

2022.04.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

広島の吉川です。

先日投稿した上の記事のさらに発展型として、「StreamでDynamoDBの全データをScanで読み込みつつ、CSVに変換してS3バケットにアップロードする」という処理を書いてみたいと思います。DynamoDBの内容をS3にテキストで出力すること自体はS3エクスポート機能で実現する方法もあり、基本的にはこちらに寄せた方が楽そうですが、エクスポート機能の仕様でカバーしきれない固有の要件に対応したい場合などは自前でLambda関数を組むことのメリットもあるかと思います。

環境

  • node 16.14.0
  • typescript 4.6.3
  • csv 6.0.5
  • @faker-js/faker 6.1.2

DynamoDBテーブル作成

AWSマネジメントコンソールより、DynamoDBコンソールを開き

  • テーブル名: "Users"
  • パーティションキー: "id" / 文字列
  • 設定: 設定をカスタマイズ
  • 読み込み/書き込みキャパシティーの設定: オンデマンド

の設定でテーブルを作成します。

DynamoDBにテストデータ投入

下記のスクリプトで50万件のテストデータを作成します。

// put-ddb-items.ts

import { DynamoDB } from '@aws-sdk/client-dynamodb'
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { v4 } from 'uuid'
import { faker } from '@faker-js/faker'

const ddb = new DynamoDB({
  region: 'ap-northeast-1',
})
const ddbDoc = DynamoDBDocument.from(ddb)

const main = async (): Promise<void> => {
  for (let i = 0; i < 500; i++) {
    await Promise.all(
      Array.from({ length: 1000 }).map(async () =>
        ddbDoc.put({
          TableName: 'Users',
          Item: {
            id: v4(),
            name: faker.name.findName(),
            address: faker.address.city(),
            company: faker.company.companyName(),
            image: faker.image.avatar(),
            lorem: faker.lorem.paragraphs(),
            phone: faker.phone.phoneNumber(),
          },
        })
      )
    )
  }
}

main()

直列に1件ずつ作成すると遅いので、1000件ずつパラレルにリクエストを投げるようにしました。本当は .put() より .batchWriteItem() を使った方が良いと思いますが、今回は雑にこのやり方としました。

なお、このデータから書き出されるCSVファイルのサイズは 約400MB でした。

カスタムReadableクラスを作る

Stream | Node.js v17.9.0 Documentation

こちらのドキュメントの「Implementing a readable stream」を見ながらカスタムReadableクラスを作りました。

// ddb-items-to-csv-converter.ts

import stream from 'stream'
import { stringify } from 'csv/sync'
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { DynamoDB } from '@aws-sdk/client-dynamodb'
import { User } from './types'

export class DdbItemsToCsvConverter extends stream.Readable {
  private _index: number
  private _lastEvaluatedKey: any
  private _ddbDoc: DynamoDBDocument

  constructor(options?: stream.ReadableOptions | undefined) {
    super(options)

    this._index = 0
    this._lastEvaluatedKey = null

    const ddb = new DynamoDB({
      region: 'ap-northeast-1',
    })
    this._ddbDoc = DynamoDBDocument.from(ddb)
  }

  _read(): void {
    const i = this._index
    this._index += 1

    if (this._lastEvaluatedKey === undefined) {
      // すべてScanし終わったら終了
      this.push(null)
      return
    }

    this._ddbDoc
      .scan({
        TableName: 'Users',
        ExclusiveStartKey: this._lastEvaluatedKey ?? undefined,
      })
      .then((scanOutput) => {
        this._lastEvaluatedKey = scanOutput.LastEvaluatedKey

        const users = scanOutput.Items as User[]
        const str = stringify(users, { header: i === 0 }) // 最初のみCSVヘッダーを付ける
        const buf = Buffer.from(str, 'ascii')
        this.push(buf)
      })
  }
}

当初は結構ハードルが高いと思いましたが、以下の点を押さえることで何とか書くことができました。

  • Readableを継承する
  • 最低限 constructor()_read() を実装すればいい
  • this.push(buf) でデータを流していく
  • this.push(null) で終了
  • Counter Exampleを参考にする

特にCounter Exampleのコードを見ることで雰囲気がかなりつかめると思いますので、やや敷居が高いと感じた方も是非読んでみてください。

CSVファイルに書き出してみる

StreamでScan+CSV変換→ローカルCSVファイルに吐き出す

DdbItemsToCsvConverter を作ったので、ほとんどこれを呼び出すだけでCSV書き出し処理を書くことができます。まずはローカル領域にファイルとして書き出してみます。

import { DdbItemsToCsvConverter } from './ddb-items-to-csv-converter'
import fs from 'fs'

new DdbItemsToCsvConverter().pipe(fs.createWriteStream('./ddb-items.csv'))

メモリ使用量は 146.755584MB でした。

StreamせずにScan+CSV変換→ローカルCSVファイルに吐き出す

今度は DdbItemsToCsvConverter を使わずに愚直に全Itemをロードして書き出すようにしてみます。

// main.ts

import { DynamoDB } from '@aws-sdk/client-dynamodb'
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { stringify } from 'csv/sync'
import fs from 'fs'

const ddb = new DynamoDB({
  region: 'ap-northeast-1',
})
const ddbDoc = DynamoDBDocument.from(ddb)

const main = async (): Promise<void> => {
  let scanOutput = await ddbDoc.scan({
    TableName: 'Users',
  })
  let users = scanOutput.Items!

  while (scanOutput.LastEvaluatedKey !== undefined) {
    scanOutput = await ddbDoc.scan({
      TableName: 'Users',
      ExclusiveStartKey: scanOutput.LastEvaluatedKey,
    })
    for (const user of scanOutput.Items!) {
      users.push(user)
    }
  }

  fs.writeFileSync('ddb-items.csv', stringify(users, { header: true }))
}

main()

メモリ使用量は 1754.546176MB でした。

S3バケットにアップロードしてみる

StreamでScan+CSV変換→S3バケットにCSVファイルアップロード

続いて、Streamを使ってDynamoDBのデータから変換したCSVをS3バケットにアップロードしてみます。

この前の記事の「S3のWriteStream」をベースに、 Body 引数に new DdbItemsToCsvConverter() を渡すようにしてみます。

// main.ts

import { S3Client } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { DdbItemsToCsvConverter } from './ddb-items-to-csv-converter'

const bucketName = 'YOUR_BUCKET_NAME'
const keyName = 'ddb-items.csv'

const s3Client = new S3Client({
  region: 'ap-northeast-1',
})

const main = async (): Promise<void> => {
  const upload = new Upload({
    client: s3Client,
    params: {
      Bucket: bucketName,
      Key: keyName,
      Body: new DdbItemsToCsvConverter(),
    },
  })
  upload.on('httpUploadProgress', (progress) => {
    console.log(progress)
  })
  await upload.done()
}

main()

メモリ使用量は 314.372096MB でした。

StreamせずにScan+CSV変換→S3バケットにCSVファイルアップロード

続いて同じ処理をStreamを使わずに行います。

// main.ts

import { DynamoDB } from '@aws-sdk/client-dynamodb'
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { S3 } from '@aws-sdk/client-s3'
import { stringify } from 'csv/sync'

const bucketName = 'YOUR_BUCKET_NAME'
const keyName = 'from-ddb-items.csv'

const s3 = new S3({
  region: 'ap-northeast-1',
})
const ddb = new DynamoDB({
  region: 'ap-northeast-1',
})
const ddbDoc = DynamoDBDocument.from(ddb)

const main = async (): Promise<void> => {
  let scanOutput = await ddbDoc.scan({
    TableName: 'Users',
  })
  let users = scanOutput.Items!

  while (scanOutput.LastEvaluatedKey !== undefined) {
    scanOutput = await ddbDoc.scan({
      TableName: 'Users',
      ExclusiveStartKey: scanOutput.LastEvaluatedKey,
    })
    for (const user of scanOutput.Items!) {
      users.push(user)
    }
  }

  await s3.putObject({
    Bucket: bucketName,
    Key: keyName,
    Body: stringify(users, { header: true }),
  })
}

main()

メモリ使用量は 2569.887744MB でした。

まとめ

ケース 時間
StreamでScan+CSV変換→ローカルCSVファイルに吐き出す 146.755584MB
StreamせずにScan+CSV変換→ローカルCSVファイルに吐き出す 1754.546176MB
StreamでScan+CSV変換→S3バケットにCSVファイルアップロード 314.372096MB
StreamせずにScan+CSV変換→S3バケットにCSVファイルアップロード 2569.887744MB

今回もStreamを使った方が使用メモリ量を抑えることができる結果となりました。カスタムStreamクラスを使うことで適用できるシーンがより広がったように感じます。

参考になれば幸いです。

参考