【運用自動化】Amazon Connect の通話内容を Slack に自動通知してみた

【運用自動化】Amazon Connect の通話内容を Slack に自動通知してみた

2025.12.24

Amazon Connect アドベントカレンダー 2025、24日目の記事です!

クラスメソッドとギークフィードさん、AWS Japanさんの有志が募ってチャレンジしている企画になります。

(アドベントカレンダーのカレンダー一覧はこちら↓)

https://qiita.com/advent-calendar/2025/amazon-connect

はじめに

こんにちは、フニです。
みなさん、電話の内容を上司に報告したり、同僚に周知する時どうしていますか?
周知すること自体が手間と感じたり、内容に漏れはないか不安を感じたりする場合もあるかと思います。

今回は Amazon Connect の電話内容を Slack に通知する仕組みを作ってみました。
同僚にその都度周知する手間が省け、内容の漏れなどもなくなるのでとても便利だと思ったので、ご紹介します。

前提

  • Amazon Connect インスタンス構築済み
  • Slack が利用可能な環境
  • 東京リージョン

リソース設定

Amazon Connect フロー

以下のように設定しています。

CleanShot 2025-12-24 at 19.31.10@2x.png

  1. ログ記録動作の設定

CleanShot 2025-12-24 at 19.31.57@2x.png

  1. 記録と分析の動作を設定

CleanShot 2025-12-24 at 19.33.09@2x.png

  1. 音声の設定

CleanShot 2025-12-24 at 19.34.06@2x.png

  1. プロンプトの再生

CleanShot 2025-12-24 at 19.34.32@2x.png

  1. 作業キューの設定

CleanShot 2025-12-24 at 19.34.58@2x.png

  1. キューへ転送

CleanShot 2025-12-24 at 19.35.19@2x.png

  1. 切断

CleanShot 2025-12-24 at 19.35.56@2x.png

Amazon Connect Agent

電話対応を想定して、以下のように Agent を作成しました。

CleanShot 2025-12-24 at 20.15.31@2x.png

Kinesis DataStreams

以下のように作成しました。名前以外はデフォルト値です。

CleanShot 2025-12-24 at 19.36.59@2x.png

Amazon Connect インスタンスのデータストリーミングで連携します。
また、Cloudshell などで以下のコマンドも入力して適用する必要があります。

CleanShot 2025-12-24 at 20.01.14@2x.png

aws connect associate-instance-storage-config \
    --instance-id <AmazonConnectインスタンスID> \
    --resource-type REAL_TIME_CONTACT_ANALYSIS_VOICE_SEGMENTS \
    --storage-config '{
      "StorageType": "KINESIS_STREAM",
      "KinesisStreamConfig": {
        "StreamArn": "arn:aws:kinesis:ap-northeast-1:<AWSアカウントID>:stream/<KinesisDataStreams名>"
      }
    }' \
    --region ap-northeast-1

Slack Bot

Webhook の発行と、chat:write 権限を付与しました。

CleanShot 2025-12-24 at 20.04.43@2x.png

CleanShot 2025-12-24 at 20.05.41@2x.png

Lambda

以下のように作成しました。

CleanShot 2025-12-24 at 19.39.09@2x.png

Lambda ロールには以下のポリシーと権限を付与しました。
本番環境では最小権限の原則に従い、必要な操作のみに限定したカスタムポリシーの使用を推奨します。

  • AmazonConnect_FullAccess
  • AmazonKinesisFullAccess
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:ap-northeast-1:<AWSアカウントID>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:<AWSアカウントID>:log-group:*:*"
            ]
        }
    ]
}

Lambda のトリガーに Kinesis DataStreams を登録します。

CleanShot 2025-12-24 at 20.10.21@2x.png

CleanShot 2025-12-24 at 20.10.55@2x.png

CleanShot 2025-12-24 at 20.11.21@2x.png

Lambda は以下のサンプルコードを利用しました。
環境変数は必ず登録してください。

CleanShot 2025-12-24 at 20.08.10@2x.png

import json
import urllib.request
import os
import base64
import boto3
from datetime import datetime, timezone, timedelta

# 日本標準時 (JST = UTC+9)
JST = timezone(timedelta(hours=9))

# Connect Contact Lens クライアント
contact_lens_client = boto3.client('connect-contact-lens', region_name='ap-northeast-1')

# Connect インスタンスID
INSTANCE_ID = os.environ.get('CONNECT_INSTANCE_ID')


def lambda_handler(event, context):
    """
    Kinesis Data StreamからContact Lens分析結果を受け取り
    COMPLETEDイベント時に全文トランスクリプトをSlackに送信
    """
    print(f"受信レコード数: {len(event.get('Records', []))}")

    for record in event.get('Records', []):
        try:
            # Kinesisレコードをデコード
            payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            data = json.loads(payload)

            event_type = data.get('EventType')
            print(f"イベントタイプ: {event_type}")

            # COMPLETEDイベントで全文トランスクリプトを取得して通知
            if event_type == 'COMPLETED':
                contact_id = data.get('ContactId')
                print(f"分析完了: ContactId={contact_id}")

                # 全文トランスクリプトを取得してSlack送信
                process_completed_contact(contact_id)

        except Exception as e:
            print(f"レコード処理エラー: {e}")
            import traceback
            traceback.print_exc()
            continue

    return {'statusCode': 200}


def process_completed_contact(contact_id: str) -> None:
    """
    完了したコンタクトの全文トランスクリプトを取得してSlackに送信

    Args:
        contact_id: Amazon ConnectコンタクトID
    """
    if not INSTANCE_ID:
        print("エラー: CONNECT_INSTANCE_ID が設定されていません")
        return

    try:
        # Contact Lens APIで全セグメントを取得
        response = contact_lens_client.list_realtime_contact_analysis_segments(
            InstanceId=INSTANCE_ID,
            ContactId=contact_id
        )

        segments = response.get('Segments', [])
        print(f"取得セグメント数: {len(segments)}")

        # トランスクリプトと感情分析を収集
        transcripts = []
        customer_sentiments = []
        agent_sentiments = []

        for segment in segments:
            if 'Transcript' in segment:
                trans = segment['Transcript']
                participant = trans.get('ParticipantRole', '')
                content = trans.get('Content', '')
                sentiment = trans.get('Sentiment', 'NEUTRAL')

                # 参加者の日本語表記
                if participant == 'CUSTOMER':
                    participant_jp = 'お客様'
                    customer_sentiments.append(sentiment)
                else:
                    participant_jp = 'エージェント'
                    agent_sentiments.append(sentiment)

                transcripts.append({
                    'participant': participant_jp,
                    'content': content,
                    'sentiment': sentiment
                })

        if not transcripts:
            print("トランスクリプトが見つかりませんでした")
            return

        # 感情スコア計算
        customer_sentiment = calculate_sentiment(customer_sentiments)
        agent_sentiment = calculate_sentiment(agent_sentiments)

        # Slack通知送信
        send_slack_notification(
            contact_id=contact_id,
            transcripts=transcripts,
            customer_sentiment=customer_sentiment,
            agent_sentiment=agent_sentiment
        )

    except Exception as e:
        print(f"トランスクリプト取得エラー: {e}")
        import traceback
        traceback.print_exc()


def calculate_sentiment(sentiments: list) -> dict:
    """
    感情スコアを計算

    Args:
        sentiments: 感情のリスト

    Returns:
        感情ラベルとスコアの辞書
    """
    if not sentiments:
        return {'label': '中立', 'score': 0.0}

    pos = sentiments.count('POSITIVE')
    neg = sentiments.count('NEGATIVE')
    total = len(sentiments)

    score = ((pos - neg) / total) * 100

    if score > 20:
        label = 'ポジティブ'
    elif score < -20:
        label = 'ネガティブ'
    else:
        label = '中立'

    return {'label': label, 'score': round(score, 1)}


def send_slack_notification(
    contact_id: str,
    transcripts: list,
    customer_sentiment: dict,
    agent_sentiment: dict
) -> bool:
    """
    Slackに会話全文を送信

    Args:
        contact_id: Amazon ConnectコンタクトID
        transcripts: 会話トランスクリプトのリスト
        customer_sentiment: 顧客の感情分析結果
        agent_sentiment: エージェントの感情分析結果

    Returns:
        送信成功ならTrue
    """
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')

    if not webhook_url:
        print("エラー: SLACK_WEBHOOK_URL が設定されていません")
        return False

    # 会話全文を整形
    conversation_text = ""
    for t in transcripts:
        conversation_text += f"{t['participant']}: {t['content']}\n"

    # Slackメッセージ構築
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "通話分析レポート",
                "emoji": True
            }
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": f"*コンタクトID:*\n`{contact_id[:24]}...`"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*発話数:*\n{len(transcripts)}件"
                }
            ]
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": f"*お客様感情:*\n{customer_sentiment['label']} ({customer_sentiment['score']})"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*エージェント感情:*\n{agent_sentiment['label']} ({agent_sentiment['score']})"
                }
            ]
        },
        {"type": "divider"},
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*会話全文:*\n```{conversation_text[:2900]}```"
            }
        }
    ]

    # 会話が長い場合の警告
    if len(conversation_text) > 2900:
        blocks.append({
            "type": "context",
            "elements": [{
                "type": "mrkdwn",
                "text": "会話が長いため一部省略されています"
            }]
        })

    # タイムスタンプ
    blocks.append({
        "type": "context",
        "elements": [{
            "type": "mrkdwn",
            "text": f"分析完了: {datetime.now(JST).strftime('%Y-%m-%d %H:%M:%S JST')}"
        }]
    })

    message = {"blocks": blocks}

    try:
        req = urllib.request.Request(
            webhook_url,
            data=json.dumps(message).encode('utf-8'),
            headers={'Content-Type': 'application/json'}
        )
        response = urllib.request.urlopen(req, timeout=10)
        print(f"Slack送信成功: ContactId={contact_id}")
        return True
    except Exception as e:
        print(f"Slack送信エラー: {e}")
        return False

結果

実際に電話をかけて、オペレーターにオフィスの位置を聞く会話をしてみました。
サンプルではございますが、会話全文が文字起こしされて通知が届きました。

CleanShot 2025-12-24 at 20.42.34@2x.png

さいごに

今回は Amazon Connect の電話内容を Slack に通知する仕組みを作ってみました。
会話全文が文字起こしされて通知されますので、内容の漏れや同僚への周知の手間なども軽減できそうで嬉しいですね!

この記事をシェアする

FacebookHatena blogX

関連記事