Amazon Connectの通話記録をKinesis Data Streamsに配信時、Step Functionsでデータの重複対策を実装する
はじめに
Amazon Connectの問い合わせレコードをKinesis Data Streams(以降、KDS)経由で処理する際に発生する重複データの問題に対して、Step FunctionsとDynamoDBを活用した対策を実装しました。
以前の記事「Amazon Connectの通話記録を自動的にBacklogチケット化するシステムの構築」では、Amazon Connectで受けた問い合わせを自動的にBacklogのチケットとして起票する仕組みを紹介しました。しかし、運用を進める中で以下の2種類の重複配信が課題となりました。
-
問い合わせレコードの更新時の重複配信
問い合わせレコードが更新されると、同じコンタクトIDの問い合わせレコードがKDSに再送信される -
同一データの重複配信
Amazon ConnectからKDSへのデータ配信は「少なくとも1回」の配信保証であり、同一データが複数回配信される可能性がある
これらの問題により、同一通話に対して複数のBacklogチケットが作成されてしまうという状況が発生していました。
本記事では、DynamoDBをデータ重複チェックのためのストアとして活用し、Step Functionsのワークフローを拡張することで、この問題を解決する方法を解説します。
なお、本記事は前回の記事で構築したリソースを前提としています。システムの全体構成は以下の通りで、今回は新たにDynamoDBを追加します。
DynamoDBテーブルの作成
重複データを管理するためのDynamoDBテーブルを作成します。このテーブルは処理済みのコンタクトIDを記録し、同一コンタクトIDの重複処理を防止する役割を果たします。
以下の設定でDynamoDBテーブルを作成します。
- テーブル名:cm-hirai-connect-backlog-tracker
- パーティションキー:contact_id
- キャパシティモード:オンデマンド(デフォルト)
TTL(Time To Live)の設定
データを自動的に期限切れにするためにTTL機能を有効化します。これにより、古いコンタクト情報が自動的に削除され、テーブルのサイズとコストを管理できます。
-
テーブル作成後、[アクション]メニューから[Turn on TTL]を選択します
-
TTL属性名として
ttl
を指定し、[Turn on]ボタンをクリックして有効化します
この設定により、Step Functionsのワークフロー内で各レコードに処理時のタイムスタンプから24時間後を示すTTL値を設定すると、その時間経過後に自動的に削除されます。
Step Functionsの修正
重複データを検出・排除するために、Step Functionsのステートマシンを拡張します。前回のフローと今回の改良版フローを比較します。
前回実装したステートマシンは、データの受信から処理、Backlogチケット作成までをシンプルに行う構成でした。
処理の流れは次のようになります。
- Amazon Connectで顧客との通話が行われる
- エージェントがアフターコールワークを完了すると、問い合わせレコードがAmazon Kinesis Data Streams(以下、KDS)にストリーミングされる
- EventBridge Pipesがそのデータを検知し、Step Functionsのステートマシンを起動
- Step Functionsが問い合わせデータを整形し、Backlog APIを呼び出して新規チケットを自動作成
今回は重複データ対策として、DynamoDBを活用した検証ステップを追加しています。
重複データを管理するために、DynamoDBの条件付き書き込み機能を活用した対策を実装しました。この方法では、以下のステートを追加しています(赤枠内)
-
RecordContactInDynamoDB:DynamoDBにコンタクトIDを記録する際に、
attribute_not_exists(contact_id)
という条件を指定- この条件により、同じコンタクトIDが既に存在する場合は書き込みが失敗し、
ConditionalCheckFailedException
が発生 - 例外処理で重複データを検出し、処理を終了
- この条件により、同じコンタクトIDが既に存在する場合は書き込みが失敗し、
-
DuplicateContactEnd:重複データと判定された場合に処理を正常終了させるステート
-
Fail:その他のエラー時のステート(DynamoDBテーブルが存在しないなど)
- 同時書き込みの場合: DynamoDBの書き込み操作は常に強力な整合性を持ちます。
attribute_not_exists
条件と組み合わせることで、複数のプロセスが同時に同じコンタクトIDに対して書き込もうとしても、最初の1つだけが成功し、他はすべて失敗します。 - 別タイミングでの書き込みの場合: 既に処理済みのコンタクトIDに対して後から書き込みが試みられた場合も、
attribute_not_exists
条件によって書き込みは失敗します。
これにより、同一コンタクトIDに対する重複処理を、タイミングに関わらず効果的に防止できるようになります。
最後にUpdateDynamoDBWithBacklogTicketIdステートを追加し、作成したBacklogチケットIDをDynamoDBに記録するようにしています。これは重複対策とは関係ありません。
ステートマシン定義
以下がJSONata形式で記述した完全なステートマシン定義です。各環境に合わせてプレースホルダーを置き換えて使用してください。
{
"Comment": "通話記録内容をBacklogチケットに起票する",
"StartAt": "Base64Decode",
"States": {
"Base64Decode": {
"Type": "Pass",
"Next": "ChannelTypeCheck",
"Assign": {
"decodedData": "{% $base64decode($states.input.data) %}"
}
},
"ChannelTypeCheck": {
"Type": "Choice",
"Choices": [
{
"Next": "RecordContactInDynamoDB",
"Condition": "{% $parse($decodedData).Channel = \"VOICE\" %}",
"Assign": {
"data": "{% $parse($decodedData) %}"
}
}
],
"Default": "NonVoiceChannelEnd"
},
"RecordContactInDynamoDB": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Arguments": {
"TableName": "cm-hirai-connect-backlog-tracker",
"Item": {
"contact_id": {
"S": "{% $data.ContactId %}"
},
"ttl": {
"N": "{% $string($floor($toMillis($now()) / 1000 + 86400)) %}"
}
},
"ConditionExpression": "attribute_not_exists(contact_id)"
},
"Next": "InquiryTypeCheck",
"Catch": [
{
"ErrorEquals": [
"DynamoDB.ConditionalCheckFailedException"
],
"Next": "DuplicateContactEnd",
"Comment": "既に同じコンタクトIDが存在する場合は重複処理と判断して終了する"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail",
"Comment": "その他のエラーが発生した場合は処理を失敗させる"
}
]
},
"DuplicateContactEnd": {
"Type": "Succeed",
"Comment": "既に処理済みのコンタクトIDなので処理を終了"
},
"InquiryTypeCheck": {
"Type": "Choice",
"Choices": [
{
"Next": "ExtractContactData",
"Condition": "{% $data.Attributes.inquiry_type = \"product\" %}",
"Assign": {
"categoryId": "<categoryId_product>"
}
},
{
"Next": "ExtractContactData",
"Assign": {
"categoryId": "<categoryId_contract>"
},
"Condition": "{% $data.Attributes.inquiry_type = \"contract\" %}"
}
],
"Default": "ExtractContactData",
"Assign": {
"categoryId": "<categoryId_other>"
}
},
"NonVoiceChannelEnd": {
"Type": "Succeed"
},
"ExtractContactData": {
"Type": "Pass",
"Assign": {
"InitialContactId": "{% $data.ContactId %}",
"InitiationTimestamp": "{% $data.InitiationTimestamp %}",
"DisconnectTimestamp": "{% $data.DisconnectTimestamp %}",
"AgentName": "{% $data.Agent.Username %}",
"CallerPhoneNumber": "{% $data.SystemEndpoint.Address %}"
},
"Next": "FormatContactData"
},
"FormatContactData": {
"Type": "Pass",
"Assign": {
"FormattedInitiationTimestamp": "{% $fromMillis($toMillis($InitiationTimestamp), '[Y0001]年[M]月[D]日 [H]時[m01]分[s01]秒', '+0900') %}",
"FormattedDisconnectTimestamp": "{% $fromMillis($toMillis($DisconnectTimestamp), '[Y0001]年[M]月[D]日 [H]時[m01]分[s01]秒', '+0900') %}",
"FormattedPhoneNumber": "{% $substring($CallerPhoneNumber, 0, 3) = '+81' ? '0' & $substring($CallerPhoneNumber, 3) : $CallerPhoneNumber %}",
"ContactDetailsURL": "{% 'https://<Connectドメイン>/connect/contact-trace-records/details/' & $InitialContactId & '?tz=Asia/Tokyo' %}"
},
"Next": "CreateBacklogTicket"
},
"CreateBacklogTicket": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Arguments": {
"ApiEndpoint": "https://<xxx.backlog.jp>/api/v2/issues",
"InvocationConfig": {
"ConnectionArn": "<ConnectionArn>"
},
"Headers": {
"Content-Type": "application/x-www-form-urlencoded"
},
"Transform": {
"RequestBodyEncoding": "URL_ENCODED",
"RequestEncodingOptions": {
"ArrayFormat": "INDICES"
}
},
"Method": "POST",
"RequestBody": {
"projectId": "<projectId>",
"summary": "通話記録",
"description": "{% '|項目|内容|備考|\n|---|---|---|\n|通話開始時間|' & $FormattedInitiationTimestamp & '| |\n|通話終了時間|' & $FormattedDisconnectTimestamp & '| |\n|コンタクトID|' & $InitialContactId & '| [コンタクト詳細URL](' & $ContactDetailsURL & ')|\n|対応者|' & $AgentName & '| |\n|発信者電話番号|' & $FormattedPhoneNumber & '| |' %}",
"issueTypeId": "<issueTypeId>",
"categoryId[]": "{% $categoryId %}",
"priorityId": "3"
}
},
"Next": "UpdateDynamoDBWithBacklogTicketId",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"BackoffRate": 2,
"IntervalSeconds": 1,
"MaxAttempts": 3,
"JitterStrategy": "FULL"
}
],
"Assign": {
"keyId": "{% $string($states.result.ResponseBody.keyId) %}"
}
},
"UpdateDynamoDBWithBacklogTicketId": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Arguments": {
"TableName": "cm-hirai-connect-backlog-tracker",
"Key": {
"contact_id": {
"S": "{% $data.ContactId %}"
}
},
"UpdateExpression": "SET backlog_ticket_id = :keyId",
"ExpressionAttributeValues": {
":keyId": {
"N": "{% $keyId %}"
}
}
},
"End": true,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail"
}
]
},
"Fail": {
"Type": "Fail"
}
},
"QueryLanguage": "JSONata"
}
Step FunctionsのIAMロールには、AmazonDynamoDBFullAccess
をアタッチします。
以下のプレースホルダーを実際の値に置き換えてください
<projectId>
:BacklogのプロジェクトID<xxx.backlog.jp>
:Backlogドメイン<issueTypeId>
:Backlog「電話」種別のID<categoryId_product>
:Backlog「商品関連」カテゴリーのID<categoryId_contract>
:Backlog「契約関連」カテゴリーのID<categoryId_other>
:Backlog「その他」カテゴリーのID<Connectドメイン>
:Amazon ConnectインスタンスのURL<ConnectionArn>
:作成したEventBridge ConnectionのARN
動作確認
構築したシステムが実際に期待通りに動作するか、実環境でテストを行いました。
テストには以下のようなAmazon Connectフローを使用しました。
このフローでは、IVRで問い合わせカテゴリを選択した後、エージェントに接続される流れになっています。
実際の動作確認手順は以下の通りです
- Amazon Connectの電話番号に発信
- IVRの案内に従って「商品関連」を選択
- エージェントと通話
- 通話終了後、エージェントがアフターコールワークを完了
テストの結果、以下のような流れで自動化が正常に動作することを確認できました
- エージェントがアフターコールワークを完了
- 問い合わせレコードがKDSにストリーミング
- EventBridge PipesがStep Functionsを起動
- Step Functionsが通話データを処理
- Backlogにチケットが自動起票
通話データがKDSにストリーミングされるタイミングは、通話終了時ではなくアフターコールワーク完了後です。
重複対策の効果
実際のテストでは、同一コンタクトIDのデータが複数回KDSに配信されることを確認しましたが、実装した重複対策により、Backlogチケットの重複起票は発生しませんでした。
Step Functionsの実行履歴を確認すると、複数の実行のうち1つだけがチケット作成まで進み、他の実行は重複チェックの段階で早期に処理が終了していることがわかります。ランダム待機時間の設定により、どの実行がチケット作成まで進むかは実行ごとに異なります。
重複実行された実行記録
チケット作成した実行記録
チケットしなかった実行記録
DynamoDBの状態
DynamoDBには以下のように処理済みのコンタクト情報が記録されています。
各レコードには、以下が記録されており、これにより重複データの検出と処理済みデータの管理が適切に行われています。
- コンタクトID
- TTL値(24時間後に自動削除)
- 作成されたBacklogチケットID