AWS SDK for JavaScript v3 の DynamoDB Document Client を使って paginate scan をしてみた
こんにちは、CX 事業本部製造ビジネステクノロジー部の若槻です。
AWS SDK for JavaScript v3 では、DynamoDB Client を DynamoDB Document Client でラップすることにより、通常(ネイティブの JavaScript 型)の JSON 形式でパラメーターに指定したり、レスポンスを受け取ることができます。
lib-dynamodb
の README はこちらです。
今回は、この DynamoDB Document Client を使って paginate scan をしてみました。
試してみた
サンプルデータ
テーブルに投入するサンプルデータとして、1 列目がパーティションキー(id)、2 列目がソートキー(timestamp)の CSV ファイルを作成します。
d001,1700625273 d001,1699658818 d001,1703858878 d001,1681316462 d001,1695108297 d001,1694674832 d001,1680945699 d001,1701799579 d001,1696271173 d001,1685651084 d002,1706301230 d002,1679314750 d002,1701457171 d002,1685919651 d002,1684091128
リソース作成(AWS CDK)
必要なリソースの作成は、AWS CDK(TypeScript)を使って行います。またテーブルへの paginate scan は Lambda 関数上で実行します。
import { aws_lambda, aws_logs, aws_lambda_nodejs, aws_dynamodb, aws_s3, aws_s3_deployment, Stack, RemovalPolicy, CfnOutput, } from 'aws-cdk-lib'; import { Construct } from 'constructs'; export class CdkSampleStack extends Stack { constructor(scope: Construct, id: string) { super(scope, id); // サンプルデータアップロード用の S3 バケット const bucket = new aws_s3.Bucket(this, 'Bucket', { removalPolicy: RemovalPolicy.DESTROY, autoDeleteObjects: true, }); // サンプルデータのアップロード new aws_s3_deployment.BucketDeployment(this, 'DeploySampleTableData', { sources: [aws_s3_deployment.Source.asset('./src/tableData')], destinationBucket: bucket, }); // DynamoDB テーブル const sampleTable = new aws_dynamodb.Table(this, 'SampleTable', { partitionKey: { name: 'id', type: aws_dynamodb.AttributeType.STRING }, sortKey: { name: 'timestamp', type: aws_dynamodb.AttributeType.NUMBER }, billingMode: aws_dynamodb.BillingMode.PAY_PER_REQUEST, removalPolicy: RemovalPolicy.DESTROY, importSource: { inputFormat: aws_dynamodb.InputFormat.csv({ delimiter: ',', headerList: ['id', 'timestamp'], }), bucket, }, }); // Lambda 関数 const sampleFunc = new aws_lambda_nodejs.NodejsFunction( this, 'SampleFunc', { architecture: aws_lambda.Architecture.ARM_64, runtime: aws_lambda.Runtime.NODEJS_20_X, logGroup: new aws_logs.LogGroup(this, 'SampleFuncLogGroup', { removalPolicy: RemovalPolicy.DESTROY, }), environment: { SAMPLE_TABLE_NAME: sampleTable.tableName, }, } ); // Lambda 関数に DynamoDB テーブルへのアクセス権限を付与 sampleTable.grantReadData(sampleFunc); // Lambda 関数名の出力 new CfnOutput(this, 'SampleFuncName', { value: sampleFunc.functionName, }); } }
Lambda コード
テーブルへの paginate scan を行う Lambda 関数のコードです。paginateScan()
では非同期なイテラブルが返されるため、for await...of
によりページごとのデータを順番(直列)に行います。
import { paginateScan, DynamoDBDocument } from '@aws-sdk/lib-dynamodb'; import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; const SAMPLE_TABLE_NAME = process.env.SAMPLE_TABLE_NAME || ''; interface DataItem { id: string; timestamp: number; } const ddbDocClient = DynamoDBDocument.from( new DynamoDBClient({ region: 'ap-northeast-1', apiVersion: '2012-08-10', }) ); export const handler = async (): Promise<void> => { const paginator = paginateScan( { client: ddbDocClient, }, { TableName: SAMPLE_TABLE_NAME, } ); const items: DataItem[] = []; for await (const page of paginator) { console.log(page.Count); items.push(...(page.Items as DataItem[])); } console.log(items); };
Lambda 関数を含むリソースを CDK でデプロイして、関数を実行します。
# CKD デプロイ npx cdk deploy --require-approval never --method=direct # Lambda 関数実行 aws lambda invoke --function-name ${sampleFuncName} outputfile.txt
実行結果のログです。パーティションキーごとに、ソートキーによる昇順で取得されています。また取得データは通常の JSON 形式となっています。
15 [ { id: 'd001', timestamp: 1680945699 }, { id: 'd001', timestamp: 1681316462 }, { id: 'd001', timestamp: 1685651084 }, { id: 'd001', timestamp: 1694674832 }, { id: 'd001', timestamp: 1695108297 }, { id: 'd001', timestamp: 1696271173 }, { id: 'd001', timestamp: 1699658818 }, { id: 'd001', timestamp: 1700625273 }, { id: 'd001', timestamp: 1701799579 }, { id: 'd001', timestamp: 1703858878 }, { id: 'd002', timestamp: 1679314750 }, { id: 'd002', timestamp: 1684091128 }, { id: 'd002', timestamp: 1685919651 }, { id: 'd002', timestamp: 1701457171 }, { id: 'd002', timestamp: 1706301230 } ]
ProjectionExpression を指定
ProjectionExpression を指定して、レスポンスデータの属性を絞り込みます。
export const handler = async (): Promise<void> => { const paginator = paginateScan( { client: ddbDocClient, }, { TableName: SAMPLE_TABLE_NAME, ProjectionExpression: 'id', // 指定 } ); const items: DataItem[] = []; for await (const page of paginator) { console.log(page.Count); items.push(...(page.Items as DataItem[])); } console.log(items); };
Lambda 関数をデプロイして実行すると、データの属性が id
に絞り込まれて返されました。
15 [ { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd001' }, { id: 'd002' }, { id: 'd002' }, { id: 'd002' }, { id: 'd002' }, { id: 'd002' } ]
FilterExpression を指定
FilterExpression を指定して、条件に合致するデータのみを取得します。client-dynamodb
との違いとして、ExpressionAttributeValues
では通常の JSON 形式を指定します。
export const handler = async (): Promise<void> => { const paginator = paginateScan( { client: ddbDocClient, }, { TableName: SAMPLE_TABLE_NAME, ExpressionAttributeNames: { '#timestamp': 'timestamp', }, ExpressionAttributeValues: { ':timestamp': 1700000000, }, FilterExpression: '#timestamp >= :timestamp', } ); const items: DataItem[] = []; for await (const page of paginator) { console.log(page.Count); items.push(...(page.Items as DataItem[])); } console.log(items); };
Lambda 関数をデプロイして実行すると、timestamp
が 1700000000
以上のデータのみが取得されました。
5 [ { id: 'd001', timestamp: 1700625273 }, { id: 'd001', timestamp: 1701799579 }, { id: 'd001', timestamp: 1703858878 }, { id: 'd002', timestamp: 1701457171 }, { id: 'd002', timestamp: 1706301230 } ]
pageSize を指定
pageSize を指定して、ページごとのデータ取得数を制限します。
export const handler = async (): Promise<void> => { const paginator = paginateScan( { client: ddbDocClient, pageSize: 3, // 指定 }, { TableName: SAMPLE_TABLE_NAME, } ); const items: DataItem[] = []; for await (const page of paginator) { console.log(page.Count); items.push(...(page.Items as DataItem[])); } console.log(items); };
Lambda 関数をデプロイして実行すると、ページごとに 3 件ずつのデータが取得されました。
3 3 3 3 3 0 [ { id: 'd001', timestamp: 1680945699 }, { id: 'd001', timestamp: 1681316462 }, { id: 'd001', timestamp: 1685651084 }, { id: 'd001', timestamp: 1694674832 }, { id: 'd001', timestamp: 1695108297 }, { id: 'd001', timestamp: 1696271173 }, { id: 'd001', timestamp: 1699658818 }, { id: 'd001', timestamp: 1700625273 }, { id: 'd001', timestamp: 1701799579 }, { id: 'd001', timestamp: 1703858878 }, { id: 'd002', timestamp: 1679314750 }, { id: 'd002', timestamp: 1684091128 }, { id: 'd002', timestamp: 1685919651 }, { id: 'd002', timestamp: 1701457171 }, { id: 'd002', timestamp: 1706301230 } ]
Limit を指定(効果なし)
ここで、ページネーション処理においては、 Limit を指定して取得するデータ数を制限できるのでしょうか?
export const handler = async (): Promise<void> => { const paginator = paginateScan( { client: ddbDocClient, }, { TableName: SAMPLE_TABLE_NAME, Limit: 5, // 指定 } ); const items: DataItem[] = []; for await (const page of paginator) { console.log(page.Count); items.push(...(page.Items as DataItem[])); } console.log(items); };
Lambda 関数をデプロイして実行すると、ページごとに 15 件のデータが取得されました。TypeScript の型定義では指定可能であるにもかかわらず、Limit の指定は効果が無いようです。
15 [ { id: 'd001', timestamp: 1680945699 }, { id: 'd001', timestamp: 1681316462 }, { id: 'd001', timestamp: 1685651084 }, { id: 'd001', timestamp: 1694674832 }, { id: 'd001', timestamp: 1695108297 }, { id: 'd001', timestamp: 1696271173 }, { id: 'd001', timestamp: 1699658818 }, { id: 'd001', timestamp: 1700625273 }, { id: 'd001', timestamp: 1701799579 }, { id: 'd001', timestamp: 1703858878 }, { id: 'd002', timestamp: 1679314750 }, { id: 'd002', timestamp: 1684091128 }, { id: 'd002', timestamp: 1685919651 }, { id: 'd002', timestamp: 1701457171 }, { id: 'd002', timestamp: 1706301230 } ]
ページごとの取得処理を並列化したい場合
for await...of
内でデータの取得まで行う場合は、直列で処理が行われるためデータ数が多い場合は取得に時間を要します。並列で取得する場合は下記のように Promise を使ってページごとのデータ取得処理を並列化することができます。
並列取得により処理が高速化される動作とならなかっため確認中。
export const handler = async (): Promise<void> => { const paginator = paginateScan( { client: ddbDocClient, pageSize: 1, }, { TableName: SAMPLE_TABLE_NAME, } ); const pagePromises: Promise<DataItem[]>[] = []; for await (const page of paginator) { const pagePromise = new Promise<DataItem[]>((resolve) => { resolve(page.Items as DataItem[]); }); pagePromises.push(pagePromise); } const items = (await Promise.all(pagePromises)).flat(); console.log(items); };
おわりに
AWS SDK for JavaScript v3 の DynamoDB Document Client を使って paginate scan をしてみました。
記述が冗長となる client-dynamodb
と比べて、lib-dynamodb
では JSON 形式でデータを扱うことができるため、コードがシンプルになります。一部パラメーターが上手く動作しないことなどに気をつければ便利に利用することができます。
参考
以上