[Node.js] Streamを使って、DynamoDBの全ItemをScanで吸い出しつつCSVに変換してS3バケットにアップロードしたい
広島の吉川です。
- Node.js Streamに入門してみた | DevelopersIO
- [Node.js] [AWS SDK v3] StreamでCSVファイルをS3からS3にコピーする | DevelopersIO
先日投稿した上の記事のさらに発展型として、「StreamでDynamoDBの全データをScanで読み込みつつ、CSVに変換してS3バケットにアップロードする」という処理を書いてみたいと思います。DynamoDBの内容をS3にテキストで出力すること自体はS3エクスポート機能で実現する方法もあり、基本的にはこちらに寄せた方が楽そうですが、エクスポート機能の仕様でカバーしきれない固有の要件に対応したい場合などは自前でLambda関数を組むことのメリットもあるかと思います。
環境
- node 16.14.0
- typescript 4.6.3
- csv 6.0.5
- @faker-js/faker 6.1.2
DynamoDBテーブル作成
AWSマネジメントコンソールより、DynamoDBコンソールを開き
- テーブル名: "Users"
- パーティションキー: "id" / 文字列
- 設定: 設定をカスタマイズ
- 読み込み/書き込みキャパシティーの設定: オンデマンド
の設定でテーブルを作成します。
DynamoDBにテストデータ投入
下記のスクリプトで50万件のテストデータを作成します。
// put-ddb-items.ts import { DynamoDB } from '@aws-sdk/client-dynamodb' import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { v4 } from 'uuid' import { faker } from '@faker-js/faker' const ddb = new DynamoDB({ region: 'ap-northeast-1', }) const ddbDoc = DynamoDBDocument.from(ddb) const main = async (): Promise<void> => { for (let i = 0; i < 500; i++) { await Promise.all( Array.from({ length: 1000 }).map(async () => ddbDoc.put({ TableName: 'Users', Item: { id: v4(), name: faker.name.findName(), address: faker.address.city(), company: faker.company.companyName(), image: faker.image.avatar(), lorem: faker.lorem.paragraphs(), phone: faker.phone.phoneNumber(), }, }) ) ) } } main()
直列に1件ずつ作成すると遅いので、1000件ずつパラレルにリクエストを投げるようにしました。本当は .put()
より .batchWriteItem()
を使った方が良いと思いますが、今回は雑にこのやり方としました。
なお、このデータから書き出されるCSVファイルのサイズは 約400MB でした。
カスタムReadableクラスを作る
Stream | Node.js v17.9.0 Documentation
こちらのドキュメントの「Implementing a readable stream」を見ながらカスタムReadableクラスを作りました。
// ddb-items-to-csv-converter.ts import stream from 'stream' import { stringify } from 'csv/sync' import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { DynamoDB } from '@aws-sdk/client-dynamodb' import { User } from './types' export class DdbItemsToCsvConverter extends stream.Readable { private _index: number private _lastEvaluatedKey: any private _ddbDoc: DynamoDBDocument constructor(options?: stream.ReadableOptions | undefined) { super(options) this._index = 0 this._lastEvaluatedKey = null const ddb = new DynamoDB({ region: 'ap-northeast-1', }) this._ddbDoc = DynamoDBDocument.from(ddb) } _read(): void { const i = this._index this._index += 1 if (this._lastEvaluatedKey === undefined) { // すべてScanし終わったら終了 this.push(null) return } this._ddbDoc .scan({ TableName: 'Users', ExclusiveStartKey: this._lastEvaluatedKey ?? undefined, }) .then((scanOutput) => { this._lastEvaluatedKey = scanOutput.LastEvaluatedKey const users = scanOutput.Items as User[] const str = stringify(users, { header: i === 0 }) // 最初のみCSVヘッダーを付ける const buf = Buffer.from(str, 'ascii') this.push(buf) }) } }
当初は結構ハードルが高いと思いましたが、以下の点を押さえることで何とか書くことができました。
- Readableを継承する
- 最低限
constructor()
と_read()
を実装すればいい this.push(buf)
でデータを流していくthis.push(null)
で終了- Counter Exampleを参考にする
特にCounter Exampleのコードを見ることで雰囲気がかなりつかめると思いますので、やや敷居が高いと感じた方も是非読んでみてください。
CSVファイルに書き出してみる
StreamでScan+CSV変換→ローカルCSVファイルに吐き出す
DdbItemsToCsvConverter
を作ったので、ほとんどこれを呼び出すだけでCSV書き出し処理を書くことができます。まずはローカル領域にファイルとして書き出してみます。
import { DdbItemsToCsvConverter } from './ddb-items-to-csv-converter' import fs from 'fs' new DdbItemsToCsvConverter().pipe(fs.createWriteStream('./ddb-items.csv'))
メモリ使用量は 146.755584MB でした。
StreamせずにScan+CSV変換→ローカルCSVファイルに吐き出す
今度は DdbItemsToCsvConverter
を使わずに愚直に全Itemをロードして書き出すようにしてみます。
// main.ts import { DynamoDB } from '@aws-sdk/client-dynamodb' import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { stringify } from 'csv/sync' import fs from 'fs' const ddb = new DynamoDB({ region: 'ap-northeast-1', }) const ddbDoc = DynamoDBDocument.from(ddb) const main = async (): Promise<void> => { let scanOutput = await ddbDoc.scan({ TableName: 'Users', }) let users = scanOutput.Items! while (scanOutput.LastEvaluatedKey !== undefined) { scanOutput = await ddbDoc.scan({ TableName: 'Users', ExclusiveStartKey: scanOutput.LastEvaluatedKey, }) for (const user of scanOutput.Items!) { users.push(user) } } fs.writeFileSync('ddb-items.csv', stringify(users, { header: true })) } main()
メモリ使用量は 1754.546176MB でした。
S3バケットにアップロードしてみる
StreamでScan+CSV変換→S3バケットにCSVファイルアップロード
続いて、Streamを使ってDynamoDBのデータから変換したCSVをS3バケットにアップロードしてみます。
この前の記事の「S3のWriteStream」をベースに、 Body
引数に new DdbItemsToCsvConverter()
を渡すようにしてみます。
// main.ts import { S3Client } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import { DdbItemsToCsvConverter } from './ddb-items-to-csv-converter' const bucketName = 'YOUR_BUCKET_NAME' const keyName = 'ddb-items.csv' const s3Client = new S3Client({ region: 'ap-northeast-1', }) const main = async (): Promise<void> => { const upload = new Upload({ client: s3Client, params: { Bucket: bucketName, Key: keyName, Body: new DdbItemsToCsvConverter(), }, }) upload.on('httpUploadProgress', (progress) => { console.log(progress) }) await upload.done() } main()
メモリ使用量は 314.372096MB でした。
StreamせずにScan+CSV変換→S3バケットにCSVファイルアップロード
続いて同じ処理をStreamを使わずに行います。
// main.ts import { DynamoDB } from '@aws-sdk/client-dynamodb' import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { S3 } from '@aws-sdk/client-s3' import { stringify } from 'csv/sync' const bucketName = 'YOUR_BUCKET_NAME' const keyName = 'from-ddb-items.csv' const s3 = new S3({ region: 'ap-northeast-1', }) const ddb = new DynamoDB({ region: 'ap-northeast-1', }) const ddbDoc = DynamoDBDocument.from(ddb) const main = async (): Promise<void> => { let scanOutput = await ddbDoc.scan({ TableName: 'Users', }) let users = scanOutput.Items! while (scanOutput.LastEvaluatedKey !== undefined) { scanOutput = await ddbDoc.scan({ TableName: 'Users', ExclusiveStartKey: scanOutput.LastEvaluatedKey, }) for (const user of scanOutput.Items!) { users.push(user) } } await s3.putObject({ Bucket: bucketName, Key: keyName, Body: stringify(users, { header: true }), }) } main()
メモリ使用量は 2569.887744MB でした。
まとめ
ケース | 時間 |
---|---|
StreamでScan+CSV変換→ローカルCSVファイルに吐き出す | 146.755584MB |
StreamせずにScan+CSV変換→ローカルCSVファイルに吐き出す | 1754.546176MB |
StreamでScan+CSV変換→S3バケットにCSVファイルアップロード | 314.372096MB |
StreamせずにScan+CSV変換→S3バケットにCSVファイルアップロード | 2569.887744MB |
今回もStreamを使った方が使用メモリ量を抑えることができる結果となりました。カスタムStreamクラスを使うことで適用できるシーンがより広がったように感じます。
参考になれば幸いです。