【AWS CDK】AWS AppSync で DynamoDB を直接 UpdateItemするマッピングテンプレートを使いたい

StackOverflow が生んだすばらしいテンプレートをひろめたい
2020.12.14

AWS CDKを利用すると、リソースを定義するときにさまざまな便利機能を使えます。マッピングテンプレートの流用はその一つです。実際にコードをみていただくのが1番わかりやすいかと思います。次のCDKコードを見てください。

    entryTableDataSource.createResolver({
        typeName: 'Query',
        fieldName: 'getEntry',
        requestMappingTemplate: MappingTemplate.dynamoDbGetItem('id', 'userId'),
        responseMappingTemplate: MappingTemplate.dynamoDbResultItem(),
    });

これは、AWS CDK で AppSync を使うときに用意されているユーティリティ関数で、DynamoDB から直接Itemを取得するリゾルバを定義しています。デプロイすると以下のようなマッピングテンプレートが生成されます。

{"version": "2017-02-28", "operation": "GetItem", "key": {"id": $util.dynamodb.toDynamoDBJson($ctx.args.userId)}}

これに対して Query を実行してみましょう。

get-entry

マッピングテンプレートを一行も書かずして、DynamoDBデータソースの GraphQL Query を実行できました。IDを指定しての Query であれば、AWS CDK のユーティリティ関数を使うことで簡単に定義できそうです。では、Dynamodb Item の更新、とりわけ UpdateItem についてはどうでしょうか。残念ながら AWS CDK には UpdateItem のユーティリティは用意されていません。自分たちで用意する必要があります。

DynamoDB UpdateItem の厄介さ

問答無用で上書きする PutItem は、AWS CDK で ユーティリティ関数が用意されています。

mapping-template.d.ts

static dynamoDbPutItem(key: PrimaryKey, values: AttributeValues): MappingTemplate;

一方、UpdateItem に相当するマッピングテンプレートはありません。その理由は、ひとことで言えば汎用化しづらいから、といえます。更新処理は状況や要件によって具体的な処理はまちまちです。こういうときは更新したい、こういうときは更新したくない、このケースでは文字列を結合して更新する…など、しばしば、ビジネスロジックと深く関わっており、そういった処理は Lambda Function に任せられます。

Lambda Function に毎回頼るのは確実ではありますが、効率が良いとは限らないシーンもあります。プロトタイプや検証など、とにかくデプロイして動作するアプリケーションが求められるシーンです。本稿では、ユースケースを限定的にした上で、ある程度使い回せる UpdateItem のマッピングテンプレート を検討してみます。

なお、以下のソース、とりわけ StackOverflow の質問者様のテンプレートは大いに流用させていただいています。皆、欲しい物は一緒ですね。

amazon web services - Automated DynamoDB createdAt, updatedAt, & version attributes using resolver - Stack Overflow

条件式 - Amazon DynamoDB

事前情報

バージョン

  • aws-cdk: 1.78.0

やること

  • UpdateItem の要件を整理する
  • GraphQL スキーマと AWS CDK コードを書く
  • 作成、更新、削除を実行する

UpdateItem の要件を整理する

UpdateItem で何をしたいか整理しましょう。対象とする状況は、個人アプリ、プロトタイプ、検証などを想定します。

  1. ID を指定して、存在するときは上書き、存在しないときは作成する
  2. Item は JSON で渡すことができ、各JSONキーを DynamoDB の属性にする。属性のマッピングも 基本的なスカラー型(String, Number, Boolean)くらいやってほしい。オブジェクト{}やリスト[]も渡せるとなお嬉しい
  3. {"key":null} といったように null を渡したときは属性を削除してほしい
  4. キーが存在しないときは更新も削除もしないでその属性はそっとしてほしい
  5. 競合更新を避けるため、更新日時 updateAtMillis でロックしてほしい

この要件をもとにマッピングテンプレートを考えていきます。

1は、DynamoDB の UpdateItem の動作がデフォルトでこのようになっている(IDがないときは作成、あるときは更新)ため、実現できそうです。2,3,4 は、マッピングテンプレートのif文をうまく使えば実現できそうです。そして5は、1との兼ね合いを考える必要があり、なおかつ、一般化が難しい箇所です。競合の回避事態は楽観的ロックを使えば実現できそうですが、何のデータをロックに使うかは、アプリケーションによってまちまちです。少し例を挙げると、

  • 更新日時をロックに使う。UpdateItemするたびに、更新日時も上書きする
  • バージョンIDをロックに使う。UpdateItemするたびに、バージョンIDをインクリメントする
  • ハッシュ値をロックに使う。UpdateItemするたびに、入力から計算されたハッシュ値で上書きする

パッと思いつくだけでもこれだけあります。今回は、「更新日時をロックに使う。UpdateItemするたびに、更新日時も上書きする」に限定してテンプレートを考えます。

キャンペーンへの応募を管理する仮想アプリケーション

実装を示すにあたり、仮想的なアプリケーションをたてます。AWS AppSync と DynamoDB を使って、キャンペーンへの応募を管理する GraphQL API を作ります。といっても説明用なので、用意するスキーマは getEntryupdateEntry だけです。

arch

AWS CDK コードと GraphQL スキーマを書く

DynamoDB や AppSync のリソース定義は AWS CDK 側の仕事なので、まずはそちらから書いていきます。

entry-service-stack.ts

import * as cdk from '@aws-cdk/core';
import { CfnOutput, Stack } from '@aws-cdk/core';
import * as ssm from '@aws-cdk/aws-ssm';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import { AttributeType, BillingMode } from '@aws-cdk/aws-dynamodb';
import * as appsync from '@aws-cdk/aws-appsync';
import { AuthorizationType, FieldLogLevel } from '@aws-cdk/aws-appsync';
import { GlobalProps } from './global-props';
import * as path from 'path';

export async function greetingServiceApplicationStack(
    app: cdk.App,
    id: string,
    global: GlobalProps,
): Promise<Stack> {
    const stack = new cdk.Stack(app, id, {
        stackName: global.getStackName(id),
    });

    const entryTable = new dynamodb.Table(stack, 'EntryTable', {
        tableName: global.getTableName('Entry'),
        billingMode: BillingMode.PROVISIONED,
        readCapacity: 1,
        writeCapacity: 1,
        partitionKey: {
            type: AttributeType.STRING,
            name: 'id',
        },
    });

    const graphApi = new appsync.GraphqlApi(stack, 'EntryBff', {
        name: global.getGraphApiName('EntryBff'),
        logConfig: {
            excludeVerboseContent: true,
            fieldLogLevel: FieldLogLevel.ALL,
        },
        authorizationConfig: {
            defaultAuthorization: {
                authorizationType: AuthorizationType.API_KEY,
            },
            additionalAuthorizationModes: [],
        },
        schema: appsync.Schema.fromAsset(
            path.join(__dirname, 'schema.graphql'),
        ),
        xrayEnabled: true,
    });

    const entryTableDataSource = graphApi.addDynamoDbDataSource(
        'EntryTableDataSource',
        entryTable,
    );

    new ssm.StringParameter(stack, 'EntryGraphApiEndpoint', {
        stringValue: graphApi.graphqlUrl,
        parameterName: global.pm.fullKeyOf('EntryGraphApiEndpoint'),
    });

    new CfnOutput(stack, 'EntryGraphApiEndpointOutput', {
        exportName: 'EntryGraphApiEndpointOutput',
        value: graphApi.graphqlUrl,
    });

    return stack;
}

次に、GraphQL スキーマとリゾルバを書きます。

schema.graphql

input MetadataInput {
    entryAtMillis: Float
}
input EntryInput {
    name: String
    message: String
    age: Int
    winMark: Boolean
    meta: MetadataInput
    friends: [String]
}

type Metadata {
    entryAtMillis: Float
}
type Entry {
    id: String!
    name: String
    message: String
    age: Int
    winMark: Boolean
    meta: Metadata
    friends: [String]
    updateAtMillis: Float!
}

type Query {
    getEntry(userId: String!): Entry!
}

type Mutation {
    updateEntry(userId: ID!, lastUpdateAtMillis: Float!, input: EntryInput!): Entry!
}

Entry

  • id: ユーザーID
  • name: ユーザー名
  • message: エントリーの意気込み
  • age: 年齢(数値の確認用)
  • winMark: 当選マーク(Booleanの確認用)
  • meta: メタデータ(オブジェクト=>Map型の確認用)
  • friends: 紹介した友達の配列(配列=>List型の確認用)
  • updateAtMillis: 更新日時ミリ秒(ロック用)

getEntry Query

  • userId: 取得するユーザーIDを指定します

updateEntry Mutation

  • userId: 更新/作成するユーザーIDを指定します
  • lastUpdateAtMillis: ロック用。更新する直前に取得した updateAtMillis を指定します。そのときからデータが変わってなければ、更新できます。
  • input: 更新したい内容を指定します。

このスキーマに対する、AWS CDK でのリゾルバは以下のようにかけます。

entry-service-stack.ts

    const entryTableDataSource = graphApi.addDynamoDbDataSource(
        'EntryTableDataSource',
        entryTable,
    );

    entryTableDataSource.createResolver({
        typeName: 'Query',
        fieldName: 'getEntry',
        requestMappingTemplate: MappingTemplate.dynamoDbGetItem('id', 'userId'), // ①
        responseMappingTemplate: MappingTemplate.dynamoDbResultItem(),
    });

    entryTableDataSource.createResolver({
        typeName: 'Mutation',
        fieldName: 'updateEntry',
        requestMappingTemplate: MappingTemplate.fromString( // ②
            updateItemMappingTemplate({
                partitionKey: {
                    keyName: 'id',
                    attributePath: 'userId',
                },
                version: {
                    keyName: 'updateAtMillis',
                    attributePath: 'lastUpdateAtMillis',
                },
                inputPath: 'input',
            }),
        ),
        responseMappingTemplate: MappingTemplate.dynamoDbResultItem(),
    });
  • ①: 冒頭で示したように、取得処理はユーティリティ関数を利用してシンプルにかけます
  • ②: ユーティリティはないので、マッピングテンプレートを文字列から生成するよう、指定しています

では本題です。外部メソッドにしている、UpdateItem のマッピングテンプレートがこちらです。

update-item-template.ts

export function updateItemMappingTemplate(options: {
    partitionKey: {
        keyName: string;
        attributePath: string;
    };
    // update可否を決めるためのバージョン情報をどの属性にするか
    version: {
        keyName: string;
        attributePath: string;
    };
    // Item が入っているベースキーを何にするか
    // 例:GraphQL Mutation が
    // update(input: { id: 2q35, name: "Bob" }) なら inputPath: 'input' になる
    inputPath: string;
}): string {
    const { partitionKey, version, inputPath } = options;

    // undefined => ''
    // 'input' => 'input.'
    const attributeContextPath = (inputPath ? [inputPath] : [])
        .concat([''])
        .join('.');
    return `
{
    "version" : "2017-02-28",
    "operation" : "UpdateItem",
    "key" : {
        "${partitionKey.keyName}" : $util.dynamodb.toDynamoDBJson($context.arguments.${partitionKey.attributePath})
    },

    ## Set up some space to keep track of things you're updating **
    #set( $expNames  = {} )
    #set( $expValues = {} )
    #set( $expRemove = [] )

    ## fixed versionId     
    $!{expNames.put("#updateAtMillis", "updateAtMillis")}
    $!{expValues.put(":updateAtMillis", $util.parseJson($util.dynamodb.toDynamoDBJson($util.time.nowEpochMilliSeconds())))}
    $!{expSet.put("#updateAtMillis", ":updateAtMillis")}

    ## Iterate through each argument, skipping "id" **
    #foreach( $entry in $context.arguments.${attributeContextPath}entrySet() )
        #if( $entry.key != "${partitionKey.keyName}" )
            #if( (!$entry.value) && ("$!{entry.value}" == "") )
                ## If the argument is set to "null", then remove that attribute from the item in DynamoDB **

                #set( $discard = \${expRemove.add("#\${entry.key}")} )
                $!{expNames.put("#\${entry.key}", "$entry.key")}
            #else
                ## Otherwise set (or update) the attribute on the item in DynamoDB **

                $!{expSet.put("#\${entry.key}", ":\${entry.key}")}
                $!{expNames.put("#\${entry.key}", "$entry.key")}
                $!{expValues.put(":\${entry.key}", $util.parseJson($util.dynamodb.toDynamoDBJson(\${entry.value})))}
            #end
        #end
    #end

    ## Start building the update expression, starting with attributes you're going to SET **
    #set( $expression = "" )
    #if( !\${expSet.isEmpty()} )
        #set( $expression = "SET" )
        #foreach( $entry in $expSet.entrySet() )
            #set( $expression = "\${expression} \${entry.key} = \${entry.value}" )
            #if ( $foreach.hasNext )
                #set( $expression = "\${expression}," )
            #end
        #end
    #end

    ## Continue building the update expression, adding attributes you're going to REMOVE **
    #if( !\${expRemove.isEmpty()} )
        #set( $expression = "\${expression} REMOVE" )

        #foreach( $entry in $expRemove )
            #set( $expression = "\${expression} \${entry}" )
            #if ( $foreach.hasNext )
                #set( $expression = "\${expression}," )
            #end
        #end
    #end

    ## Finally, write the update expression into the document, along with any expressionNames and expressionValues **
    "update" : {
        "expression" : "\${expression}"
        #if( !\${expNames.isEmpty()} )
            ,"expressionNames" : $utils.toJson($expNames)
        #end
        #if( !\${expValues.isEmpty()} )
            ,"expressionValues" : $utils.toJson($expValues)
        #end
    },

    "condition" : {
        "expression"       : "updateAtMillis = :lastUpdateAtMillis OR attribute_not_exists(updateAtMillis)",
        "expressionValues" : {
            ":lastUpdateAtMillis" : $util.dynamodb.toDynamoDBJson($context.arguments.${version.attributePath})
        }
    }
}
    `;
}

分解します。まずは関数パラメータ部分。

export function updateItemMappingTemplate(options: {
    partitionKey: {
        keyName: string;
        attributePath: string;
    };
    version: {
        keyName: string;
        attributePath: string;
    };
    inputPath: string;
}): string {

最終的にリゾルバのマッピングテンプレートを作りたい状況です。AppSyncのマッピングテンプレートでは、$context.arguments.userId のように書くことで GraphQL APIの入力パラメータをテンプレートの中で利用できます。ですので、テンプレートを作るためには、入力パラメータの名前は何で、をどう使うかという情報が必要です。すなわち attributePath は、入力パラメータ名かネストしている場合はパスを渡すことになります。先程、CDK側では

requestMappingTemplate: MappingTemplate.fromString( // ②
    updateItemMappingTemplate({
        partitionKey: {
            keyName: 'id',
            attributePath: 'userId',
        },
        version: {
            keyName: 'updateAtMillis',
            attributePath: 'lastUpdateAtMillis',
        },
        inputPath: 'input',
    }),
),

このように使っていました。partitionKey:は、パーティションキーのDynamoDB側属性名はkeyで、GraphQL側の入力パラメータ名はuserIdです と言っています。

    return `
{
    "version" : "2017-02-28",
    "operation" : "UpdateItem",
    "key" : {
        "${partitionKey.keyName}" : $util.dynamodb.toDynamoDBJson($context.arguments.${partitionKey.attributePath})
    },

    ## Set up some space to keep track of things you're updating **
    #set( $expNames  = {} )
    #set( $expValues = {} )
    #set( $expSet = {} )
    #set( $expRemove = [] )

ここからマッピングテンプレートに突入します。定義している変数から、更新SETと削除REMOVEをやろうとしているよくばりセットであることがみてとれます。逆に、数値型・List型に対するADDや、Set型から要素を除去するDELETEはサポートしていないことがわかります。

    ## fixed versionId     
    $!{expNames.put("#updateAtMillis", "updateAtMillis")}
    $!{expValues.put(":updateAtMillis", $util.parseJson($util.dynamodb.toDynamoDBJson($util.time.nowEpochMilliSeconds())))}
    $!{expSet.put("#updateAtMillis", ":updateAtMillis")}

DynamoDB Item のロックを updateAtMillis で行うと決めました。この記述はAPIをコールするクライアントが特に何もしなくても、更新に成功したときは必ず updateAtMillis を現在日時 util.time.nowEpochMilliSeconds() で置き換える動きになります。

    ## Iterate through each argument, skipping "id" **
    #foreach( $entry in $context.arguments.${attributeContextPath}entrySet() )
        #if( $entry.key != "${partitionKey.keyName}" )
            #if( (!$entry.value) && ("$!{entry.value}" == "") )
                ## If the argument is set to "null", then remove that attribute from the item in DynamoDB **

                #set( $discard = \${expRemove.add("#\${entry.key}")} )
                $!{expNames.put("#\${entry.key}", "$entry.key")}
            #else
                ## Otherwise set (or update) the attribute on the item in DynamoDB **

                $!{expSet.put("#\${entry.key}", ":\${entry.key}")}
                $!{expNames.put("#\${entry.key}", "$entry.key")}
                $!{expValues.put(":\${entry.key}", $util.parseJson($util.dynamodb.toDynamoDBJson(\${entry.value})))}
            #end
        #end
    #end

入力されたJSONを走査して、パーティションキー以外の項目を更新候補/削除候補いずれかに振り分けます。

    ## Start building the update expression, starting with attributes you're going to SET **
    // 中略...

    ## Continue building the update expression, adding attributes you're going to REMOVE **
    // 中略...

    ## Finally, write the update expression into the document, along with any expressionNames and expressionValues **
    "update" : {
        "expression" : "\${expression}"
        #if( !\${expNames.isEmpty()} )
            ,"expressionNames" : $utils.toJson($expNames)
        #end
        #if( !\${expValues.isEmpty()} )
            ,"expressionValues" : $utils.toJson($expValues)
        #end
    },

振り分けられた変数から抽出していき、SET ***, REMOVE *** の更新式を組み立てていきます。

    "condition" : {
        "expression"       : "updateAtMillis = :lastUpdateAtMillis OR attribute_not_exists(updateAtMillis)",
        "expressionValues" : {
            ":lastUpdateAtMillis" : $util.dynamodb.toDynamoDBJson($context.arguments.${version.attributePath})
        }
    }

最後に Condition です。updateAtMillisが存在しないか(初回作成時)、他の誰にも更新されていないときだけ、UpdateItem が実行できるようにします。

デプロイして、動作確認していきます。

作成、更新、削除を実行する

AWSコンソール で実際に AppSync のGraphQLクエリを叩きます。

作成

create.png

input で指定したオブジェクトをもとに DynamoDB のデータが生成されます。データがない場合は updateAtMillis はチェックされないので、lastUpdateAtMillis は何でも良いです。 0 などを指定します。先のDynamoDB のアップデートで、空文字列も登録できるようになったのは嬉しいですね。

更新 - ロックされている

update_locked.png

自分が更新する前に、誰かが更新してしまった状況です。この場合は lastUpdateAtMillis が保存されている updateAtMillis と一致しないので、コンディションチェックエラーになります。

更新 - ロック通過

update_ok.png

lastUpdateAtMillis を正しく指定した場合です。入力した値で更新されていることがわかります。レスポンスの年齢ageに注目してください。入力には何も指定していません。このときはスルーされ、DynamoDB に保存されている値が維持されます。

削除

remove.png

明示的にnullを指定すると属性を削除します。

まとめ

状況を限定した上で、ある程度使いまわしの効く DynamoDB UpdateItem のテンプレートを用意し、動かしてみました。繰り返しになりますが以下のようにユースケースを絞っている点にご注意ください。

  • 更新時のロックに更新日時updateAtMillisを使う
  • ADDDELETEはあきらめ、SETREMOVEのみ考慮する

これらをみたすような個人開発、検証、動作確認においては、Lambda Function を通さずに、直接UpdateItemを行えるテンプレートが使えます。是非活用してみてください。もし不審な動作をみつけたり、ADDDELETEについてもサポートできる可能性がありそうならば、イシューで教えていただけると助かります。皆様のサーバーレスアプリケーションに少しでも貢献できれば幸いです。

ソースコード

cm-wada-yusuke/aws-serverless-monorepo-starter at feature/dynamodb-direct

テンプレートメソッドはこちら

update-item-template.ts