Amazon Connect の通話データの分析結果をバッチ処理で Word 文書にする – Amazon Connect アドベントカレンダー 2022

Amazon Connect の通話データの分析結果をバッチ処理で Word 文書にする – Amazon Connect アドベントカレンダー 2022

Clock Icon2022.12.15

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!森田です。

この記事は「Amazon Connect アドベントカレンダー 2022」の15日目の記事となります!

Amazon Connectアドベントカレンダー2022は、クラスメソッドと株式会社ギークフィードさんでチャレンジしている企画となっており、他にもAmazon Connect関する様々な記事がありますのでぜひご参照ください!!

この記事では、Amazon Connect の通話データをバッチ処理で分析しその結果を Word 文書にする方法をご紹介します。

やりたいこと

Amazon Connectの音声データの分析結果を AWS Lambda で Word 文書に変換し、 そのファイルパスを Amazon Connect から確認できるようにコンタクト属性に追加します。

Amazon Connect では、Contact Lens を利用して音声データを分析・可視化することはできますが、音声分析や可視化に対してのカスタマイズはできません。 一方で今回のフローでは、Amazon Transcribe をAPIとして呼び出すため、言語モデルのカスタマイズやで Lambda上でコードを記述することで可視化のカスタマイズなどを行うことが可能となります。

また、今回の音声分析結果→Word文書の変換については、AWSブログで紹介されていたamazon-transcribe-output-wordを利用していきます。

やってみた

Amazon Connect 周りのリソース作成

まずは、Amazon Connect インスタンスとコンタクトフローなどの一通りのリソースを準備します。

コンタクトフローについては、音声データを分析するために、記録と分析の動作を設定の通話記録をオンにします。

AWS Lambda の準備

Amazon Transcribe のジョブ実行用の Lambda とジョブ結果を Word 文書に変換するLambda の計2つを準備します。

Amazon Transcribe ジョブ実行用の Lambda

LambdaのIAMロールには、Amazon Transcribe のジョブ実行と Connect インスタンスIdを取得するため、Amazon Connect の読み込み権限が必要となります。

LambdaのIAMロール用ポリシー(クリックして展開)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "transcribe:StartCallAnalyticsJob",
                "connect:ListInstances",
                "iam:PassRole",
                "ds:DescribeDirectories"
            ],
            "Resource": "*"
        }
    ]
}

 

また、Amazon Transcribe のジョブ実行時には、Amazon Connectの音声ファイルが格納されているS3バケットへの読み込み権限を持つIAMロールも必要となっております。

ジョブ実行用のIAMポリシー(クリックして展開)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::バケット名/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::バケット名"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": [
                "arn:aws:kms:ap-northeast-1:アカウントId:key/*"
            ],
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                        "s3.*.amazonaws.com"
                    ]
                }
            },
            "Effect": "Allow"
        }
    ]
}

以下が Lambda のソースコードとなります。

DataAccessRoleArnについては、ジョブ実行用のIAMロールのARNを入れます。

lambda_function.py(クリックして展開)
import json
import boto3
import urllib

DataAccessRoleArn = "ジョブ実行用のIAMロールARN"
JobDirPath = "connect/morita-demo/transcribe"

def get_instance_id(alias):
    client = boto3.client('connect')

    res = client.list_instances()
    instances = res.get('InstanceSummaryList')
    while res.get("NextToken"):
        res = client.list_instances()
        instances += res.get('InstanceSummaryList')

    for i in instances:
        if i["InstanceAlias"] == alias:
            return i['Id']
    return None

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    client = boto3.client('transcribe')
    file_uri = 's3://{}/{}'.format(bucket, key)
    alias = key.split("/")[1] 
    instance_id = get_instance_id(alias)
    job_name = key.split("/")[-1].split("_")[0]
    
    print(job_name, file_uri)
    res = client.start_call_analytics_job(
        CallAnalyticsJobName=job_name+"_"+instance_id,
        DataAccessRoleArn=DataAccessRoleArn,
        OutputLocation="s3://{}/{}/{}_{}.json".format(bucket, JobDirPath, job_name, instance_id),
        Media = {
            'MediaFileUri': file_uri
            },
        Settings={
            'LanguageOptions': ["ja-JP"]
        },
        ChannelDefinitions=[
            {
                'ChannelId': 1,
                'ParticipantRole': 'AGENT'
            },
                    {
                'ChannelId': 0,
                'ParticipantRole': 'CUSTOMER'
            }
        ]
    )
    # TODO implement
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

上記のコードでは、後ほど説明するAmazon Connect のS3バケット格納が以下のようなパスとなっていることを想定しています。

バケット名/connect/エイリアス/CallRecordings/

また、ジョブ結果の保存先は以下のようなパスとなります。

バケット名/connect/エイリアス/transcribe/

ジョブ結果を Word 文書に変換する Lambda

LambdaのIAMロールには、S3バケットの読み込み・書き込み権限とConnectのコンタクト属性の書き込み権限を与えます。

いくつかのライブラリを利用するため、python-docx、matplotlibのレイヤーを準備します。 (scipyもレイヤーに加えようとしましたが、サイズオーバしてしまったため、逐次インストールとしています。)

 

ソースコードについては、amazon-transcribe-output-wordのスクリプトを関数として呼び出されるように変更したものを別ファイル(ts_to_word.py)としてlambda_function.pyと同じ階層に準備します。

ts_to_word.py(クリックして展開)
import subprocess, sys

def run_command(name):
    sys.path.insert(1, '/tmp/packages')
    subprocess.check_call([sys.executable, "-m", "pip", "install", '--target', '/tmp/packages', name])
run_command("scipy")

class DictWrapper:
    def __init__(self, d):
        self.dict = d
    
    def __getattr__(self, name):
        return self.dict.get(name)
from docx import Document
from docx.shared import Cm, Mm, Pt, Inches, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH, WD_COLOR_INDEX, WD_BREAK
from docx.enum.style import WD_STYLE_TYPE
from docx.enum.section import WD_SECTION
from docx.oxml.shared import OxmlElement, qn
from docx.oxml.ns import nsdecls
from docx.oxml import parse_xml
from pathlib import Path
from time import perf_counter
from scipy.interpolate import make_interp_spline
import urllib.request
import json
import datetime
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import statistics
import os
import boto3
import argparse
from io import BytesIO


# Common formats and styles
CUSTOM_STYLE_HEADER = "CustomHeader"
TABLE_STYLE_STANDARD = "Light List"
CATEGORY_TRANSCRIPT_BG_COLOUR = "EEFFFF"
CATEGORY_TRANSCRIPT_FG_COLOUR = RGBColor(0, 128, 255)
ALTERNATE_ROW_COLOUR = "F0F0F0"
BAR_CHART_WIDTH = 1.0

# Column offsets in Transcribe output document table
COL_STARTTIME = 0
COL_ENDTIME = 1
COL_SPEAKER = 2
COL_SENTIMENT = 3
COL_CONTENT = 4

# Comprehend Sentiment helpers - note, if a language code in Comprehend has multiple suffixed versions
# then the suffixed versions MUST be defined in the language list BEFORE the base one; e.h. "zh-TW" before "zh"
MIN_SENTIMENT_LENGTH = 16
MIN_SENTIMENT_NEGATIVE = 0.4
MIN_SENTIMENT_POSITIVE = 0.6
SENTIMENT_LANGUAGES = ["en", "es", "fr", "de", "it", "pt", "ar", "hi", "ja", "ko", "zh-TW", "zh"]

# Image download URLS
IMAGE_URL_BANNER = "https://raw.githubusercontent.com/aws-samples/amazon-transcribe-output-word-document/main/images/banner.png"
IMAGE_URL_SMILE = "https://raw.githubusercontent.com/aws-samples/amazon-transcribe-output-word-document/main/images/smile.png"
IMAGE_URL_FROWN = "https://raw.githubusercontent.com/aws-samples/amazon-transcribe-output-word-document/main/images/frown.png"
IMAGE_URL_NEUTRAL = "https://raw.githubusercontent.com/aws-samples/amazon-transcribe-output-word-document/main/images/neutral.png"

# Definitions to use whilst scanning summarisation data
CALL_SUMMARY_MAP = [
    {"Field": "segmentIssuesDetected", "Title": "Issues Detected", "Color": "FF3333"},
    {"Field": "segmentActionItemsDetected", "Title": "Action Items Detected", "Color": "FFB266"},
    {"Field": "segmentOutcomesDetected", "Title": "Outcomes Detected", "Color": "66CC00"}
]

# Additional Constants
START_NEW_SEGMENT_DELAY = 2.0       # After n seconds pause by one speaker, put next speech in new segment


class SpeechSegment:
    """ Class to hold information about a single speech segment """
    def __init__(self):
        self.segmentStartTime = 0.0
        self.segmentEndTime = 0.0
        self.segmentSpeaker = ""
        self.segmentText = ""
        self.segmentConfidence = []
        self.segmentSentimentScore = -1.0    # -1.0 => no sentiment calculated
        self.segmentPositive = 0.0
        self.segmentNegative = 0.0
        self.segmentIsPositive = False
        self.segmentIsNegative = False
        self.segmentAllSentiments = []
        self.segmentLoudnessScores = []
        self.segmentInterruption = False
        self.segmentIssuesDetected = []
        self.segmentActionItemsDetected = []
        self.segmentOutcomesDetected = []


def convert_timestamp(time_in_seconds):
    """
    Function to help convert timestamps from s to H:M:S:MM

    :param time_in_seconds: Time in seconds to be displayed
    :return: Formatted string for this timestamp value
    """
    timeDelta = datetime.timedelta(seconds=float(time_in_seconds))
    tsFront = timeDelta - datetime.timedelta(microseconds=timeDelta.microseconds)
    tsSmall = timeDelta.microseconds
    return str(tsFront) + "." + str(int(tsSmall / 10000))


def get_text_colour_analytics_sentiment(score):
    """
    Returns RGB code text to represent the strength of negative or positive sentiment

    :param score: Sentiment score in range +/- 5.0
    :return: Background RGB colour text string to use in sentiment text
    """
    # Get our score into the range [0..4], which is our shade 'strength' - higher => brighter shade
    truncated = min(abs(int(score)), 4)
    col_shade = (4 - truncated) * 51

    if score >= 0:
        # Positive sentiment => Green shade
        background_colour = "{0:0>2X}{1:0>2X}{2:0>2X}".format(col_shade, 255, col_shade)
    else:
        # Negative sentiment => Red shade
        background_colour = "{0:0>2X}{1:0>2X}{2:0>2X}".format(255, col_shade, col_shade)

    return background_colour


def set_table_row_bold(row, bold):
    for cell in row.cells:
        for paragraph in cell.paragraphs:
            for run in paragraph.runs:
                run.font.bold = bold

def set_transcript_text_style(run, force_highlight, confidence=0.0, rgb_color=None):
    """
    Sets the colour and potentially the style of a given run of text in a transcript.  You can either
    supply the hex-code, or base it upon the confidence score in the transcript.

    :param run: DOCX paragraph run to be modified
    :param force_highlight: Indicates that we're going to forcibly set the background colour
    :param confidence: Confidence score for this word, used to dynamically set the colour
    :param rgb_color: Specific colour for the text
    """

    # If we have an RGB colour then use it
    if rgb_color is not None:
        run.font.color.rgb = rgb_color
    else:
        # Set the colour based upon the supplied confidence score
        if confidence >= 0.90:
            run.font.color.rgb = RGBColor(0, 0, 0)
        elif confidence >= 0.5:
            run.font.color.rgb = RGBColor(102, 51, 0)
        else:
            run.font.color.rgb = RGBColor(255, 0, 0)

    # Apply any other styles wanted
    if confidence == 0.0:
        # Call out any total disasters in bold
        run.font.bold = True

    # Force the background colour if required
    if force_highlight:
        run.font.highlight_color = WD_COLOR_INDEX.YELLOW


def write_transcribe_text(output_table, sentiment_enabled, analytics_mode, speech_segments, keyed_categories):
    """
    Writes out each line of the transcript in the Word table structure, optionally including sentiments

    :param output_table: Word document structure to write the table into
    :param sentiment_enabled: Flag to indicate we need to show some sentiment
    :param analytics_mode: Flag to indicate we're in Analytics mode, not Standard
    :param speech_segments: Turn-by-turn speech list
    :param keyed_categories: List of categories identified at any timestamps
    """

    # Load our image files if we have sentiment enabled
    if sentiment_enabled:
        png_smile = load_image(IMAGE_URL_SMILE)
        png_frown = load_image(IMAGE_URL_FROWN)
        png_neutral = load_image(IMAGE_URL_NEUTRAL)
        content_col_offset = 0
    else:
        # Ensure we offset the CONTENT column correctly due to no sentiment
        content_col_offset = -1

    # Create a row populate it for each segment that we have
    shading_reqd = False
    for segment in speech_segments:
        # Before we start, does an angory start at this time?
        start_in_millis = segment.segmentStartTime * 1000.0
        end_in_millis = segment.segmentEndTime * 1000.0
        if start_in_millis in keyed_categories:
            insert_category_row(content_col_offset, keyed_categories, output_table, start_in_millis)
            keyed_categories.pop(start_in_millis)

        # Start with the easy stuff
        row_cells = output_table.add_row().cells
        row_cells[COL_STARTTIME].text = convert_timestamp(segment.segmentStartTime)
        row_cells[COL_ENDTIME].text = f"{(segment.segmentEndTime - segment.segmentStartTime):.1f}s"
        row_cells[COL_SPEAKER].text = segment.segmentSpeaker

        # Mark the start of the turn as INTERRUPTED if that's the case
        if segment.segmentInterruption:
            run = row_cells[COL_CONTENT + content_col_offset].paragraphs[0].add_run("[INTERRUPTION]")
            set_transcript_text_style(run, True, confidence=0.0)
            row_cells[COL_CONTENT + content_col_offset].paragraphs[0].add_run(" ")

        # Summarised data blocks are in order - pick out the first for each of our
        # types, as well as getting list of the remaining ones for this segment
        issues, next_issue = setup_summarised_data(segment.segmentIssuesDetected)
        actions, next_action = setup_summarised_data(segment.segmentActionItemsDetected)
        outcomes, next_outcome = setup_summarised_data(segment.segmentOutcomesDetected)

        # Then do each word with confidence-level colouring
        text_index = 1
        live_issue = False
        live_action = False
        live_outcome = False
        for eachWord in segment.segmentConfidence:
            # Look to start a new summary block if needed, in strict priority order - issues, actions, then outcomes.
            # We cannot start a new one until an existing one finishes, so if 2 overlap (unlikely) we skip the second
            live_issue = start_summary_run_highlight(content_col_offset, live_issue, live_action or live_outcome,
                                                     next_issue, row_cells, text_index, "[ISSUE]")
            live_action = start_summary_run_highlight(content_col_offset, live_action, live_issue or live_outcome,
                                                      next_action, row_cells, text_index, "[ACTION]")
            live_outcome = start_summary_run_highlight(content_col_offset, live_outcome, live_issue or live_action,
                                                       next_outcome, row_cells, text_index, "[OUTCOME]")

            # Output the next word, with the correct confidence styling and forced background
            run = row_cells[COL_CONTENT + content_col_offset].paragraphs[0].add_run(eachWord["text"])
            text_index += len(eachWord["text"])
            confLevel = eachWord["confidence"]
            set_transcript_text_style(run, live_issue or live_outcome or live_action, confidence=confLevel)

            # Has any in-progress summarisation block now finished?  Check each one
            live_issue, next_issue = stop_summary_run_highlight(issues, live_issue, next_issue, text_index)
            live_action, next_action = stop_summary_run_highlight(actions, live_action, next_action, text_index)
            live_outcome, next_outcome = stop_summary_run_highlight(outcomes, live_outcome, next_outcome, text_index)

        # If enabled, finish with the base sentiment for the segment - don't write out
        # score if it turns out that this segment ie neither Negative nor Positive
        if sentiment_enabled:
            if segment.segmentIsPositive or segment.segmentIsNegative:
                paragraph = row_cells[COL_SENTIMENT].paragraphs[0]
                img_run = paragraph.add_run()
                if segment.segmentIsPositive:
                    img_run.add_picture(png_smile, width=Mm(4))
                else:
                    img_run.add_picture(png_frown, width=Mm(4))

                # We only have turn-by-turn sentiment score values in non-analytics mode
                if not analytics_mode:
                    text_run = paragraph.add_run(' (' + str(segment.segmentSentimentScore)[:4] + ')')
                    text_run.font.size = Pt(7)
                    text_run.font.italic = True
            else:
                row_cells[COL_SENTIMENT].paragraphs[0].add_run().add_picture(png_neutral, width=Mm(4))

        # Add highlighting to the row if required
        if shading_reqd:
            for column in range(0, COL_CONTENT + content_col_offset + 1):
                set_table_cell_background_colour(row_cells[column], ALTERNATE_ROW_COLOUR)
        shading_reqd = not shading_reqd

        # Check if a category occurs in the middle of a segment - put it after the segment, as timestamp is "later"
        for category_start in keyed_categories.copy().keys():
            if (start_in_millis < category_start) and (category_start < end_in_millis):
                insert_category_row(content_col_offset, keyed_categories, output_table, category_start)
                keyed_categories.pop(category_start)

        # Before we end, does an analytics category start with this line's end time?
        if end_in_millis in keyed_categories:
            # If so, write out the line after this
            insert_category_row(content_col_offset, keyed_categories, output_table, end_in_millis)
            keyed_categories.pop(end_in_millis)


def stop_summary_run_highlight(summaries, live_summary, next_summary, text_index):
    """
    Checks the supplied flags to see that particular type of call summary - e.g. issues or actions - has
    reached the end of it's final word.  If so then it resets the flags and shifts the structures to
    the next summary item of that type in this segment (there most-likely aren't any more)

    :param summaries: List of remaining summary data items to be fully-processed
    :param live_summary: Flag to indicate is this type of call summary data is currently running
    :param next_summary: Start/end word offset information for the current/next summary data item
    :param text_index: Text offset position for this segment what we've rendered up to
    """

    if live_summary and next_summary["End"] <= text_index: # Yes - stop highlighting, and pick up any pending summary left on this line of this type live_summary = False if len(summaries) > 0:
            next_summary = summaries.pop()
        else:
            next_summary = {}
    return live_summary, next_summary


def start_summary_run_highlight(content_col_offset, this_summary, other_summaries, next_summ_item, row_cells,
                                text_index, output_phrase):
    """
    This looks at a call summary data block to see if it has started - if it has then we output a
    message with a highlight and set the text-run highlighting to continue.  If a summary block of
    any other type is currently in-progress then we skip displaying this one, as in a Word document
    the highlighting would be confusing and hard to do.

    :param content_col_offset: Offset into the Word table so we can skip non-existent sentiment columns
    :param this_summary: Flag indicating if a highlighting run for this type is already in progress
    :param other_summaries: Flag indicating if a highlighting run for any other type is already in progress
    :param next_summ_item: The next summary item to be considered for highlighting
    :param row_cells: Cell reference in the Word table for the current speech segment
    :param text_index: Text offset position for this segment what we've rendered up to
    :param output_phrase: Phrase to use in the transcript to mark the start of this highighting run
    """

    new_summary = this_summary

    if len(next_summ_item) > 0 and not this_summary and not other_summaries:
        if (next_summ_item["Begin"] == 0 and text_index == 1) or (next_summ_item["Begin"] == text_index):
            # If so, start the highlighting run, tagging on a leading/trailing
            # highlight space depending on where were are in the segment
            if text_index == 1:
                next_phrase = output_phrase + " "
            else:
                next_phrase = " " + output_phrase
            run = row_cells[COL_CONTENT + content_col_offset].paragraphs[0].add_run(next_phrase)
            set_transcript_text_style(run, True, confidence=0.0)
            new_summary = True

    return new_summary


def setup_summarised_data(summary_block):
    """
    Creates a copy of specified call-summary data block in preparation for writing out the transcription.  This is
    used for each of the supported summary data types.  Returns the first item in the block, or {} if there
    aren't any items, as well as the copy of the block minus the header item

    :param summary_block: The summarise block of data that we're interested in
    """
    summary_data = summary_block.copy()
    if len(summary_data) > 0:
        next_data_item = summary_data.pop()
    else:
        next_data_item = {}
    return summary_data, next_data_item


def insert_category_row(content_col_offset, keyed_categories, output_table, timestamp_millis):
    """
    When writing out the transcript table this method will add in an additional row based
    upon the found entry in the time-keyed category list

    :param content_col_offset: Any additionl
    :param keyed_categories: List of categories identified at any timestamps
    :param output_table: Word document structure to write the table into
    :param timestamp_millis: Timestamp key whose data we have to write out (in milliseconds)
    """

    # Create a new row with the timestamp leading cell, then merge the other cells together
    row_cells = output_table.add_row().cells
    row_cells[COL_STARTTIME].text = convert_timestamp(timestamp_millis / 1000.0)
    merged_cells = row_cells[COL_ENDTIME].merge(row_cells[COL_CONTENT + content_col_offset])

    # Insert the text for each found category
    run = merged_cells.paragraphs[0].add_run("[CATEGORY]")
    set_transcript_text_style(run, False, rgb_color=CATEGORY_TRANSCRIPT_FG_COLOUR)
    run = merged_cells.paragraphs[0].add_run(" " + " ".join(keyed_categories[timestamp_millis]))
    set_transcript_text_style(run, False, confidence=0.5)

    # Give this row a special colour so that it stands out when scrolling
    set_table_cell_background_colour(row_cells[COL_STARTTIME], CATEGORY_TRANSCRIPT_BG_COLOUR)
    set_table_cell_background_colour(merged_cells, CATEGORY_TRANSCRIPT_BG_COLOUR)


def merge_speaker_segments(input_segment_list):
    """
    Merges together consecutive speaker segments unless:
    (a) There is a speaker change, or
    (b) The gap between segments is greater than our acceptable level of delay

    :param input_segment_list: Full time-sorted list of speaker segments
    :return: An updated segment list
    """
    outputSegmentList = []
    lastSpeaker = ""
    lastSegment = None

    # Step through each of our defined speaker segments
    for segment in input_segment_list:
        if (segment.segmentSpeaker != lastSpeaker) or \
                ((segment.segmentStartTime - lastSegment.segmentEndTime) >= START_NEW_SEGMENT_DELAY):
            # Simple case - speaker change or > n-second gap means new output segment
            outputSegmentList.append(segment)

            # This is now our base segment moving forward
            lastSpeaker = segment.segmentSpeaker
            lastSegment = segment
        else:
            # Same speaker, short time, need to copy this info to the last one
            lastSegment.segmentEndTime = segment.segmentEndTime
            lastSegment.segmentText += " " + segment.segmentText
            segment.segmentConfidence[0]["text"] = " " + segment.segmentConfidence[0]["text"]
            for wordConfidence in segment.segmentConfidence:
                lastSegment.segmentConfidence.append(wordConfidence)

    return outputSegmentList


def generate_sentiment(segment_list, language_code):
    """
    Generates sentiment per speech segment, inserting the results into the input list.  This will use
    Amazon Comprehend, but we need to map the job language code to one that Comprehend understands

    :param segment_list: List of speech segments
    :param language_code: Language code to use for the Comprehend job
    """
    # Get our botot3 client, then go through each segment
    client = boto3.client("comprehend")
    for nextSegment in segment_list:
        if len(nextSegment.segmentText) >= MIN_SENTIMENT_LENGTH:
            nextText = nextSegment.segmentText
            response = client.detect_sentiment(Text=nextText, LanguageCode=language_code)
            positiveBase = response["SentimentScore"]["Positive"]
            negativeBase = response["SentimentScore"]["Negative"]

            # If we're over the NEGATIVE threshold then we're negative
            if negativeBase >= MIN_SENTIMENT_NEGATIVE:
                nextSegment.segmentIsNegative = True
                nextSegment.segmentSentimentScore = negativeBase
            # Else if we're over the POSITIVE threshold then we're positive,
            # otherwise we're either MIXED or NEUTRAL and we don't really care
            elif positiveBase >= MIN_SENTIMENT_POSITIVE:
                nextSegment.segmentIsPositive = True
                nextSegment.segmentSentimentScore = positiveBase

            # Store all of the original sentiments for future use
            nextSegment.segmentAllSentiments = response["SentimentScore"]
            nextSegment.segmentPositive = positiveBase
            nextSegment.segmentNegative = negativeBase


def set_repeat_table_header(row):
    """
    Set Word repeat table row on every new page
    """
    row_pointer = row._tr.get_or_add_trPr()
    table_header = OxmlElement('w:tblHeader')
    table_header.set(qn('w:val'), "true")
    row_pointer.append(table_header)
    return row


def load_image(url):
    """
    Loads binary image data from a URL for later embedding into a docx document
    :param url: URL of image to be downloaded
    :return: BytesIO object that can be added as a docx image
    """
    image_url = urllib.request.urlopen(url)
    io_url = BytesIO()
    io_url.write(image_url.read())
    io_url.seek(0)
    return io_url


def write_small_header_text(document, text, confidence):
    """
    Helper function to write out small header entries, where the text colour matches the
    colour of the transcript text for a given confidence value

    :param document: Document to write the text to
    :param text: Text to be output
    :param confidence: Confidence score, which changes the text colour
    """
    run = document.paragraphs[-1].add_run(text)
    set_transcript_text_style(run, False, confidence=confidence)
    run.font.size = Pt(7)
    run.font.italic = True


def write(cli_arguments, speech_segments, job_status, summaries_detected, data):
    """
    Write a transcript from the .json transcription file and other data generated
    by the results parser, putting it all into a human-readable Word document

    :param cli_arguments: CLI arguments used for this processing run
    :param speech_segments: List of call speech segments
    :param job_status: Status of the Transcribe job
    :param summaries_detected: Flag to indicate presence of call summary data
    """

    sentimentEnabled = (cli_arguments.sentiment == 'on')
    tempFiles = []

    # Initiate Document, orientation and margins
    document = Document()
    document.sections[0].left_margin = Mm(19.1)
    document.sections[0].right_margin = Mm(19.1)
    document.sections[0].top_margin = Mm(19.1)
    document.sections[0].bottom_margin = Mm(19.1)
    document.sections[0].page_width = Mm(210)
    document.sections[0].page_height = Mm(297)

    # Set the base font and document title
    font = document.styles["Normal"].font
    font.name = "Calibri"
    font.size = Pt(10)

    # Create our custom text header style
    custom_style = document.styles.add_style(CUSTOM_STYLE_HEADER, WD_STYLE_TYPE.PARAGRAPH)
    custom_style.paragraph_format.widow_control = True
    custom_style.paragraph_format.keep_with_next = True
    custom_style.paragraph_format.space_after = Pt(0)
    custom_style.font.size = font.size
    custom_style.font.name = font.name
    custom_style.font.bold = True
    custom_style.font.italic = True

    # Intro banner header
    document.add_picture(load_image(IMAGE_URL_BANNER), width=Mm(171))

    # Pull out header information - some from the JSON, but most only exists in the Transcribe job status
    if cli_arguments.analyticsMode:
        # We need 2 columns only if we're in analytics mode, as we put the charts on the right of the table
        document.add_section(WD_SECTION.CONTINUOUS)
        section_ptr = document.sections[-1]._sectPr
        cols = section_ptr.xpath('./w:cols')[0]
        cols.set(qn('w:num'), '2')

    # Write put the call summary table - depending on the mode that Transcribe was used in, and
    # if the request is being run on a JSON results file rather than reading the job info from Transcribe,
    # not all of the information is available.
    # -- Media information
    # -- Amazon Transcribe job information
    # -- Average transcript word-confidence scores
    write_custom_text_header(document, "Amazon Transcribe Audio Source")
    table = document.add_table(rows=1, cols=2)
    table.style = document.styles[TABLE_STYLE_STANDARD]
    table.alignment = WD_ALIGN_PARAGRAPH.LEFT
    hdr_cells = table.rows[0].cells
    hdr_cells[0].text = "Job Name"
    if cli_arguments.analyticsMode:
        hdr_cells[1].text = data["JobName"]
    else:
        hdr_cells[1].text = data["jobName"]
    job_data = []
    # Audio duration is the end-time of the final voice segment, which might be shorter than the actual file duration
    if len(speech_segments) > 0:
        audio_duration = speech_segments[-1].segmentEndTime
        dur_text = str(int(audio_duration / 60)) + "m " + str(round(audio_duration % 60, 2)) + "s"
        job_data.append({"name": "Audio Duration", "value": dur_text})
    # We can infer diarization mode from the JSON results data structure
    if cli_arguments.analyticsMode:
        job_data.append({"name": "Audio Ident", "value": "Call Analytics"})
    elif "speaker_labels" in data["results"]:
        job_data.append({"name": "Audio Ident", "value": "Speaker-separated"})
    else:
        job_data.append({"name": "Audio Ident", "value": "Channel-separated"})

    # Some information is only in the job status
    if job_status is not None:
        job_data.append({"name": "Language", "value": job_status["LanguageCode"]})
        job_data.append({"name": "File Format", "value": job_status["MediaFormat"]})
        job_data.append({"name": "Sample Rate", "value": str(job_status["MediaSampleRateHertz"]) + " Hz"})
        job_data.append({"name": "Job Created", "value": job_status["CreationTime"].strftime("%a %d %b '%y at %X")})
        if "ContentRedaction" in job_status["Settings"]:
            redact_type = job_status["Settings"]["ContentRedaction"]["RedactionType"]
            redact_output = job_status["Settings"]["ContentRedaction"]["RedactionOutput"]
            job_data.append({"name": "Redaction Mode", "value": redact_type + " [" + redact_output + "]"})
        if "VocabularyFilterName" in job_status["Settings"]:
            vocab_filter = job_status["Settings"]["VocabularyFilterName"]
            vocab_method = job_status["Settings"]["VocabularyFilterMethod"]
            job_data.append({"name": "Vocabulary Filter", "value": vocab_filter + " [" + vocab_method + "]"})
        if "VocabularyName" in job_status["Settings"]:
            job_data.append({"name": "Custom Vocabulary", "value": job_status["Settings"]["VocabularyName"]})

    # Finish with the confidence scores (if we have any)
    stats = generate_confidence_stats(speech_segments)
    if len(stats["accuracy"]) > 0:
        job_data.append({"name": "Avg. Confidence", "value": str(round(statistics.mean(stats["accuracy"]), 2)) + "%"})

    # Place all of our job-summary fields into the Table, one row at a time
    for next_row in job_data:
        row_cells = table.add_row().cells
        row_cells[0].text = next_row["name"]
        row_cells[1].text = next_row["value"]

    # Formatting transcript table widths
    widths = (Cm(3.44), Cm(4.89))
    for row in table.rows:
        for idx, width in enumerate(widths):
            row.cells[idx].width = width

    # Spacer paragraph
    document.add_paragraph()

    # Conversational Analytics (other column) if enabled
    # -- Caller sentiment graph
    # -- Talk time split
    if cli_arguments.analyticsMode:
        write_header_graphs(data, document, tempFiles)

    # At this point, if we have no transcript then we need to quickly exit
    if len(speech_segments) == 0:
        document.add_section(WD_SECTION.CONTINUOUS)
        section_ptr = document.sections[-1]._sectPr
        cols = section_ptr.xpath('./w:cols')[0]
        cols.set(qn('w:num'), '1')
        write_custom_text_header(document, "This call had no audible speech to transcribe.")
    else:
        # Conversational Analytics (new Section)
        # -- Show speaker loudness graph, with sentiment, interrupts and non-talk time highlighted
        # -- Show a summary of any call analytics categories detected
        # -- Show a summary of any issues detected in the transcript
        # -- Process and display speaker sentiment by period
        if cli_arguments.analyticsMode:
            build_call_loudness_charts(document, speech_segments, data["ConversationCharacteristics"]["Interruptions"],
                                       data["ConversationCharacteristics"]["NonTalkTime"],
                                       data["ConversationCharacteristics"]["TalkTime"], tempFiles)
            keyed_categories = write_detected_categories(document, data["Categories"]["MatchedDetails"])
            write_analytics_sentiment(data, document)

            # Write out any call summarisation data
            if summaries_detected:
                write_detected_summaries(document, speech_segments)
        else:
            # No analytics => no categories
            keyed_categories = {}

        # Process and display transcript by speaker segments (new section)
        # -- Conversation "turn" start time and duration
        # -- Speaker identification
        # -- Sentiment type (if enabled) and sentiment score (if available)
        # -- Transcribed text with (if available) Call Analytics markers
        document.add_section(WD_SECTION.CONTINUOUS)
        section_ptr = document.sections[-1]._sectPr
        cols = section_ptr.xpath('./w:cols')[0]
        cols.set(qn('w:num'), '1')
        write_custom_text_header(document, "Call Transcription")
        document.add_paragraph()  # Spacing
        write_small_header_text(document, "WORD CONFIDENCE: >= 90% in black, ", 0.9)
        write_small_header_text(document, ">= 50% in brown, ", 0.5)
        write_small_header_text(document, "< 50% in red", 0.49) table_cols = 4 if sentimentEnabled or cli_arguments.analyticsMode: # Ensure that we add space for the sentiment column table_cols += 1 content_col_offset = 0 else: # Will need to shift the content column to the left, as Sentiment isn't there now content_col_offset = -1 table = document.add_table(rows=1, cols=table_cols) table.style = document.styles[TABLE_STYLE_STANDARD] hdr_cells = table.rows[0].cells hdr_cells[COL_STARTTIME].text = "Start" hdr_cells[COL_ENDTIME].text = "Dur." hdr_cells[COL_SPEAKER].text = "Speaker" hdr_cells[COL_CONTENT + content_col_offset].text = "Transcription" # Based upon our segment list, write out the transcription table write_transcribe_text(table, sentimentEnabled or cli_arguments.analyticsMode, cli_arguments.analyticsMode, speech_segments, keyed_categories) document.add_paragraph() # Formatting transcript table widths - we need to add sentiment # column if needed, and it and the content width accordingly widths = [Inches(0.8), Inches(0.5), Inches(0.5), 0] if sentimentEnabled: # Comprehend sentiment needs space for the icon and % score widths.append(0) widths[COL_CONTENT + + content_col_offset] = Inches(7) widths[COL_SENTIMENT] = Inches(0.7) elif cli_arguments.analyticsMode: # Analytics sentiment just needs an icon widths.append(0) widths[COL_CONTENT + + content_col_offset] = Inches(7.4) widths[COL_SENTIMENT] = Inches(0.3) else: widths[COL_CONTENT + content_col_offset] = Inches(7.7) for row in table.rows: for idx, width in enumerate(widths): row.cells[idx].width = width # Setup the repeating header set_repeat_table_header(table.rows[0]) # Display confidence count table, if requested (new section) # -- Summary table of confidence scores into "bins" # -- Scatter plot of confidence scores over the whole transcript if cli_arguments.confidence == 'on': write_confidence_scores(document, stats, tempFiles) document.add_section(WD_SECTION.CONTINUOUS) # Generate our raw data for the Comprehend sentiment graph (if requested) if sentimentEnabled: write_comprehend_sentiment(document, speech_segments, tempFiles) # Save the whole document document.save("/tmp/"+cli_arguments.outputFile) s3 = boto3.resource('s3') s3.Bucket(cli_arguments.BUCKET_NAME).upload_file("/tmp/"+cli_arguments.outputFile, 'docsOutput/'+cli_arguments.outputFile) return "s3://"+cli_arguments.BUCKET_NAME+'/docsOutput/'+cli_arguments.outputFile def write_header_graphs(data, document, temp_files): """ Writes out the two header-level graphs for caller sentiment and talk-time split :param data: JSON result data from Transcribe :param document: Word document structure to write the table into :param temp_files: List of temporary files for later deletion """ characteristics = data["ConversationCharacteristics"] # Caller sentiment graph fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(12.5 / 2.54, 8 / 2.54), gridspec_kw={'width_ratios': [4, 3]}) period_sentiment = characteristics["Sentiment"]["SentimentByPeriod"]["QUARTER"] # Graph configuration ax[0].set_xlim(xmin=1, xmax=4) ax[0].set_ylim(ymax=5, ymin=-5) ax[0].yaxis.set_major_locator(ticker.MultipleLocator(5.0)) ax[0].spines['bottom'].set_position('zero') ax[0].spines['top'].set_color('none') ax[0].spines['right'].set_color('none') ax[0].set_xticks([]) ax[0].set_title("Customer sentiment", fontsize=10, fontweight="bold", pad="12.0") # Only draw the sentiment line if we actually have a Customer that talked if "CUSTOMER" in period_sentiment: # Setup our data holders, then extract it all x_sentiment = np.array([]) y_sentiment = np.array([]) period_index = 1 for score in period_sentiment["CUSTOMER"]: x_sentiment = np.append(x_sentiment, period_index) y_sentiment = np.append(y_sentiment, score["Score"]) period_index += 1 # Set the line colour to match the overall sentiment if characteristics["Sentiment"]["OverallSentiment"]["CUSTOMER"] >= 0.0:
            line_colour = "darkgreen"
        else:
            line_colour = "red"

        # Now draw out the simple line plot
        x_new = np.linspace(1, 4, 200)
        spline = make_interp_spline(x_sentiment, y_sentiment)
        y_smooth = spline(x_new)
        ax[0].plot(x_new, y_smooth, linewidth=3, color=line_colour)
    # Talk time calculations and ratios
    non_talk = characteristics["NonTalkTime"]["Instances"]
    quiet_time = 0
    for quiet in non_talk:
        quiet_time += quiet["DurationMillis"]
    if "AGENT" in characteristics["TalkTime"]["DetailsByParticipant"]:
        agent_talk_time = characteristics["TalkTime"]["DetailsByParticipant"]["AGENT"]["TotalTimeMillis"]
    else:
        agent_talk_time = 0
    if "CUSTOMER" in characteristics["TalkTime"]["DetailsByParticipant"]:
        caller_talk_time = characteristics["TalkTime"]["DetailsByParticipant"]["CUSTOMER"]["TotalTimeMillis"]
    else:
        caller_talk_time = 0
    total_time = agent_talk_time + caller_talk_time + quiet_time
    if total_time > 0:
        quiet_ratio = quiet_time / total_time * 100.0
        agent_ratio = agent_talk_time / total_time * 100.0
        caller_ratio = caller_talk_time / total_time * 100.0
    else:
        quiet_ratio = 0.0
        agent_ratio = 0.0
        caller_ratio = 0.0
    ratio_format = "{speaker} ({ratio:.1f}%)"
    # Additional configuration
    ax[1].set_xticks([])
    ax[1].set_yticks([])
    ax[1].set_title("Talk time", fontsize=10, fontweight="bold", pad="10.0")
    ax[1].spines['top'].set_color('none')
    ax[1].spines['bottom'].set_color('none')
    ax[1].spines['left'].set_color('none')
    ax[1].spines['right'].set_color('none')
    # Now draw out the plot
    labels = ["time"]
    width = 1.0
    ax[1].bar(labels, [quiet_time], width, label=ratio_format.format(ratio=quiet_ratio, speaker="Non-Talk"),
              bottom=[agent_talk_time + caller_talk_time])
    ax[1].bar(labels, [caller_talk_time], width, label=ratio_format.format(ratio=caller_ratio, speaker="Customer"),
              bottom=[agent_talk_time])
    ax[1].bar(labels, [agent_talk_time], width, label=ratio_format.format(ratio=agent_ratio, speaker="Agent"))
    box = ax[1].get_position()
    ax[1].set_position([box.x0, box.y0 + box.height * 0.25, box.width, box.height * 0.75])
    ax[1].legend(loc="upper center", bbox_to_anchor=(0.5, -0.05), ncol=1)
    chart_file_name = "/tmp/" + "talk-time.png"
    plt.savefig(chart_file_name, facecolor="aliceblue")
    temp_files.append(chart_file_name)
    document.add_picture(chart_file_name, width=Cm(7.5))
    plt.clf()


def generate_confidence_stats(speech_segments):
    """
    Creates a map of timestamps and confidence scores to allow for both summarising and graphing in the document.
    We also need to bucket the stats for summarising into bucket ranges that feel important (but are easily changed)
    
    :param speech_segments: List of call speech segments 
    :return: Confidence and timestamp structures for graphing 
    """""

    # Stats dictionary
    stats = {
        "timestamps": [],
        "accuracy": [],
        "9.8": 0, "9": 0, "8": 0, "7": 0, "6": 0, "5": 0, "4": 0, "3": 0, "2": 0, "1": 0, "0": 0,
        "parsedWords": 0}

    # Confidence count - we need the average confidence score regardless
    for line in speech_segments:
        for word in line.segmentConfidence:
            stats["timestamps"].append(word["start_time"])
            conf_value = word["confidence"]
            stats["accuracy"].append(int(conf_value * 100))
            if conf_value >= 0.98:
                stats["9.8"] += 1
            elif conf_value >= 0.9:
                stats["9"] += 1
            elif conf_value >= 0.8:
                stats["8"] += 1
            elif conf_value >= 0.7:
                stats["7"] += 1
            elif conf_value >= 0.6:
                stats["6"] += 1
            elif conf_value >= 0.5:
                stats["5"] += 1
            elif conf_value >= 0.4:
                stats["4"] += 1
            elif conf_value >= 0.3:
                stats["3"] += 1
            elif conf_value >= 0.2:
                stats["2"] += 1
            elif conf_value >= 0.1:
                stats["1"] += 1
            else:
                stats["0"] += 1
            stats["parsedWords"] += 1
    return stats


def write_custom_text_header(document, text_label):
    """
    Adds a run of text to the document with the given text label, but using our customer text-header style

    :param document: Word document structure to write the table into
    :param text_label: Header text to write out
    :return:
    """
    paragraph = document.add_paragraph(text_label)
    paragraph.style = CUSTOM_STYLE_HEADER


def write_confidence_scores(document, stats, temp_files):
    """
    Using the pre-build confidence stats list, create a summary table of confidence score
    spreads, as well as a scatter-plot showing each word against the overall mean

    :param document: Word document structure to write the table into
    :param stats: Statistics for the confidence scores in the conversation
    :param temp_files: List of temporary files for later deletion
    :return:
    """
    document.add_section(WD_SECTION.CONTINUOUS)
    section_ptr = document.sections[-1]._sectPr
    cols = section_ptr.xpath('./w:cols')[0]
    cols.set(qn('w:num'), '2')
    write_custom_text_header(document, "Word Confidence Scores")
    # Start with the fixed headers
    table = document.add_table(rows=1, cols=3)
    table.style = document.styles[TABLE_STYLE_STANDARD]
    table.alignment = WD_ALIGN_PARAGRAPH.LEFT
    hdr_cells = table.rows[0].cells
    hdr_cells[0].text = "Confidence"
    hdr_cells[1].text = "Count"
    hdr_cells[2].text = "Percentage"
    parsedWords = stats["parsedWords"]
    confidenceRanges = ["98% - 100%", "90% - 97%", "80% - 89%", "70% - 79%", "60% - 69%", "50% - 59%", "40% - 49%",
                        "30% - 39%", "20% - 29%", "10% - 19%", "0% - 9%"]
    confidenceRangeStats = ["9.8", "9", "8", "7", "6", "5", "4", "3", "2", "1", "0"]
    # Add on each row
    shading_reqd = False
    for confRange, rangeStats in zip(confidenceRanges, confidenceRangeStats):
        row_cells = table.add_row().cells
        row_cells[0].text = confRange
        row_cells[1].text = str(stats[rangeStats])
        row_cells[2].text = str(round(stats[rangeStats] / parsedWords * 100, 2)) + "%"

        # Add highlighting to the row if required
        if shading_reqd:
            for column in range(0, 3):
                set_table_cell_background_colour(row_cells[column], ALTERNATE_ROW_COLOUR)
        shading_reqd = not shading_reqd

    # Formatting transcript table widths, then move to the next column
    widths = (Inches(1.2), Inches(0.8), Inches(0.8))
    for row in table.rows:
        for idx, width in enumerate(widths):
            row.cells[idx].width = width
    # Confidence of each word as scatter graph, and the mean as a line across
    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 4))
    ax.scatter(stats["timestamps"], stats["accuracy"])
    ax.plot([stats["timestamps"][0], stats["timestamps"][-1]], [statistics.mean(stats["accuracy"]),
                                                                statistics.mean(stats["accuracy"])], "r")
    # Formatting
    ax.set_xlabel("Time (seconds)")
    ax.set_ylabel("Word Confidence (percent)")
    ax.set_yticks(range(0, 101, 10))
    fig.suptitle("Word Confidence During Transcription", fontsize=11, fontweight="bold")
    ax.legend(["Word Confidence Mean", "Individual words"], loc="lower center")
    # Write out the chart
    chart_file_name = "/tmp/" + "chart.png"
    plt.savefig(chart_file_name, facecolor="aliceblue")
    temp_files.append(chart_file_name)
    plt.clf()
    document.add_picture(chart_file_name, width=Cm(8))
    document.paragraphs[-1].alignment = WD_ALIGN_PARAGRAPH.LEFT
    document.add_paragraph()


def insert_line_and_col_break(document):
    """
    Inserts a line break and column break into the document

    :param document: Word document structure to write the breaks into
    """
    # Blank line followed by column break
    document.add_paragraph()  # Spacing
    run = document.paragraphs[-1].add_run()
    run.add_break(WD_BREAK.LINE)
    run.add_break(WD_BREAK.COLUMN)


def write_detected_categories(document, category_list):
    """
    If there are any detected categories then write out a simple list

    :param document: Word document structure to write the table into
    :param category_list: Details of detected categories
    :return: A timestamp-keyed list of detected categories, which we'll use later when writing out the transcript
    """
    timed_categories = {}
    if category_list != {}:
        # Start with a new single-column section
        document.add_section(WD_SECTION.CONTINUOUS)
        section_ptr = document.sections[-1]._sectPr
        cols = section_ptr.xpath('./w:cols')[0]
        cols.set(qn('w:num'), '1')
        write_custom_text_header(document, "Categories Detected")

        # Table header information
        table = document.add_table(rows=1, cols=3)
        table.style = document.styles[TABLE_STYLE_STANDARD]
        hdr_cells = table.rows[0].cells
        hdr_cells[0].text = "Category"
        hdr_cells[1].text = "#"
        hdr_cells[1].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
        hdr_cells[2].text = "Timestamps found at"

        # Go through each detected category
        for next_cat in category_list.keys():
            row_cells = table.add_row().cells
            row_cells[0].text = next_cat

            # Instances and timestamps for the category do not exist for "negative" categories
            if category_list[next_cat]["PointsOfInterest"] != []:
                row_cells[1].text = str(len(category_list[next_cat]["PointsOfInterest"]))
                row_cells[1].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER

                # Now go through each instance of it
                instance = 0
                for next_timestamp in category_list[next_cat]["PointsOfInterest"]:
                    # Add the next timestamp to the document row, with separating punctuation if needed
                    start_time_millis = next_timestamp["BeginOffsetMillis"]
                    start_time_text = convert_timestamp(start_time_millis / 1000.0)
                    if instance > 0:
                        row_cells[2].paragraphs[0].add_run(", ")
                    row_cells[2].paragraphs[0].add_run(start_time_text)
                    instance += 1

                    # Now add this to our time-keyed category list
                    if start_time_millis not in timed_categories:
                        timed_categories[start_time_millis] = [next_cat]
                    else:
                        timed_categories[start_time_millis].append(next_cat)

        # Formatting transcript table widths
        widths = (Cm(4.0), Cm(1.0), Cm(12.2))
        shading_reqd = False
        for row in table.rows:
            for idx, width in enumerate(widths):
                row.cells[idx].width = width
                if shading_reqd:
                    set_table_cell_background_colour(row.cells[idx], ALTERNATE_ROW_COLOUR)
            shading_reqd = not shading_reqd

        # Finish with some spacing
        document.add_paragraph()

    # Return our time-keyed category list
    return timed_categories


def write_detected_summaries(document, speech_segments):
    """
    Scans the speech segments for any detected summaries of the requested type, and if there are any
    then a new table is added to the document.  This assumes that we do have some summaries, as if
    not we'll just output a table header on its own

    :param document: Word document structure to write the table into
    :param speech_segments: Call transcript structures
    """

    # Start with a new single-column section
    document.add_section(WD_SECTION.CONTINUOUS)
    section_ptr = document.sections[-1]._sectPr
    cols = section_ptr.xpath('./w:cols')[0]
    cols.set(qn('w:num'), '1')
    table = document.add_table(rows=1, cols=3)
    table.style = document.styles[TABLE_STYLE_STANDARD]
    hdr_cells = table.rows[0].cells
    hdr_cells[0].text = "Call Summary Highlights"
    hdr_cells[0].merge(hdr_cells[2])

    # Loop through each of our summary types
    for summary_map in CALL_SUMMARY_MAP:
        # Scan through the segments and extract the issues
        summary_detected = []
        for turn in speech_segments:
            summary_block = getattr(turn, summary_map["Field"])
            # for issue in turn.myVar:
            for issue in summary_block:
                new_summary = {"Speaker": turn.segmentSpeaker}
                new_summary["Timestamp"] = turn.segmentStartTime
                new_summary["Text"] = turn.segmentText[issue["Begin"]:issue["End"]]
                # May need a prefix or suffix for partial text
                if issue["Begin"] > 0:
                    new_summary["Text"] = "..." + new_summary["Text"]
                if issue["End"] < len(turn.segmentText): new_summary["Text"] = new_summary["Text"] + "..." summary_detected.append(new_summary) # If we found some of this type then write out a table if summary_detected: # Header section for this block row_cells = table.add_row().cells row_cells[0].text = summary_map["Title"] set_table_cell_background_colour(row_cells[0], summary_map["Color"]) row_cells[0].merge(row_cells[2]) # Column header section for this block next_row = table.add_row() row_cells = next_row.cells row_cells[0].text = "Speaker" row_cells[1].text = "Turn Time" row_cells[2].text = "Detected Text" set_table_row_bold(next_row, True) shading_reqd = False # Output each row for issue in summary_detected: # First column is the speaker next_row = table.add_row() row_cells = next_row.cells row_cells[0].text = issue["Speaker"] row_cells[1].text = convert_timestamp(issue["Timestamp"]) row_cells[2].text = issue["Text"] set_table_row_bold(next_row, False) # Add highlighting to the row if required; e.g. every 2nd row if shading_reqd: for column in range(0, 3): set_table_cell_background_colour(row_cells[column], ALTERNATE_ROW_COLOUR) shading_reqd = not shading_reqd # Formatting transcript table widths widths = (Cm(2.2), Cm(2.2), Cm(12.8)) for row in table.rows: for idx, width in enumerate(widths): row.cells[idx].width = width # Finish with some spacing document.add_paragraph() def build_call_loudness_charts(document, speech_segments, interruptions, quiet_time, talk_time, temp_files): """ Creates the call loudness charts for each caller, which we also overlay sentiment on :param document: Word document structure to write the graphics into :param speech_segments: Call transcript structures :param interruptions: Call speaker interruption structures :param quiet_time: Call non-talk time structures :param talk_time: Call talk time structures :param temp_files: List of temporary files for later deletion (includes our graph) """ # Start with a new single-column section document.add_section(WD_SECTION.CONTINUOUS) section_ptr = document.sections[-1]._sectPr cols = section_ptr.xpath('./w:cols')[0] cols.set(qn('w:num'), '1') document.add_paragraph() write_custom_text_header(document, "Conversation Volume Levels with Sentiment and Interruptions") # Initialise our loudness structures secsLoudAgent = [] dbLoudAgent = [] secsLoudCaller = [] dbLoudCaller = [] # Work through each conversation turn, extracting timestamp/decibel values as we go for segment in speech_segments: this_second = int(segment.segmentStartTime) # Each segment has a loudness score per second or part second for score in segment.segmentLoudnessScores: # This can be set to NONE, which causes errors later if score is None: score = 0.0 # Track the Agent loudness if segment.segmentSpeaker == "Agent": secsLoudAgent.append(this_second) dbLoudAgent.append(score) # Track the Caller loudness else: secsLoudCaller.append(this_second) dbLoudCaller.append(score) this_second += 1 agentLoudness = {"Seconds": secsLoudAgent, "dB": dbLoudAgent} callerLoudness = {"Seconds": secsLoudCaller, "dB": dbLoudCaller} # Work out our final talk "second", as we need both charts to line up, but # be careful as there may just be one speaker in the Call Analytics output if talk_time["DetailsByParticipant"]["AGENT"]["TotalTimeMillis"] == 0: final_second = max(secsLoudCaller) max_decibel = max(dbLoudCaller) haveAgent = False haveCaller = True plotRows = 1 elif talk_time["DetailsByParticipant"]["CUSTOMER"]["TotalTimeMillis"] == 0: final_second = max(secsLoudAgent) max_decibel = max(dbLoudAgent) haveAgent = True haveCaller = False plotRows = 1 else: final_second = max(max(secsLoudAgent), max(secsLoudCaller)) max_decibel = max(max(dbLoudAgent), max(dbLoudCaller)) haveAgent = True haveCaller = True plotRows = 2 # Add some headroom to our decibel limit to give space for "interruption" markers max_decibel_headroom = (int(max_decibel / 10) + 2) * 10 # Create a dataset for interruptions, which needs to be in the background on both charts intSecs = [] intDb = [] for speaker in interruptions["InterruptionsByInterrupter"]: for entry in interruptions["InterruptionsByInterrupter"][speaker]: start = int(entry["BeginOffsetMillis"] / 1000) end = int(entry["EndOffsetMillis"] / 1000) for second in range(start, end+1): intSecs.append(second) intDb.append(max_decibel_headroom) intSegments = {"Seconds": intSecs, "dB": intDb} # Create a dataset for non-talk time, which needs to be in the background on both charts quietSecs = [] quietdB = [] for quiet_period in quiet_time["Instances"]: start = int(quiet_period["BeginOffsetMillis"] / 1000) end = int(quiet_period["EndOffsetMillis"] / 1000) for second in range(start, end + 1): quietSecs.append(second) quietdB.append(max_decibel_headroom) quietSegments = {"Seconds": quietSecs, "dB": quietdB} # Either speaker may be missing, so we cannot assume this is a 2-row or 1-row plot # We want a 2-row figure, one row per speaker, but with the interruptions on the background fig, ax = plt.subplots(nrows=plotRows, ncols=1, figsize=(12, 2.5 * plotRows)) if haveAgent: if haveCaller: build_single_loudness_chart(ax[0], agentLoudness, intSegments, quietSegments, speech_segments, final_second, max_decibel_headroom, "Agent", False, True) build_single_loudness_chart(ax[1], callerLoudness, intSegments, quietSegments, speech_segments, final_second, max_decibel_headroom, "Customer", True, False) else: build_single_loudness_chart(ax, agentLoudness, intSegments, quietSegments, speech_segments, final_second, max_decibel_headroom, "Agent", True, True) elif haveCaller: build_single_loudness_chart(ax, callerLoudness, intSegments, quietSegments, speech_segments, final_second, max_decibel_headroom, "Customer", True, True) # Add the chart to our document chart_file_name = "/tmp/" + "volume.png" fig.savefig(chart_file_name, facecolor="aliceblue") temp_files.append(chart_file_name) document.add_picture(chart_file_name, width=Cm(17)) document.paragraphs[-1].alignment = WD_ALIGN_PARAGRAPH.LEFT plt.clf() def build_single_loudness_chart(axes, loudness, interrupts, quiet_time, speech_segments, xaxis_max, yaxis_max, caller, show_x_legend, show_chart_legend): """ Builds a single loundness/sentiment chart using the given data :param axes: Axis to use for the chart in our larger table :param loudness: Data series for the speakers loudness levels :param interrupts: Data series for marking interrupts on the chart :param quiet_time: Data series for marking non-talk time on the chart :param speech_segments: Call transcript structures :param xaxis_max: Second for the last speech entry in the call, which may not have been this speaker :param yaxis_max: Max decibel level in the call, which may not have been this speaker :param caller: Name of the caller to check for in the transcript :param show_x_legend: Flag to show/hide the x-axis legend :param show_chart_legend: Flag to show/hide the top-right graph legend """ # Draw the main loudness data bar-chart seconds = loudness["Seconds"] decibels = loudness["dB"] axes.bar(seconds, decibels, label="Speaker volume", width=BAR_CHART_WIDTH) axes.set_xlim(xmin=0, xmax=xaxis_max) axes.set_ylim(ymax=yaxis_max) if show_x_legend: axes.set_xlabel("Time (in seconds)") axes.set_ylabel("decibels") # Build up sentiment data series for positive and negative, plotting it at the bottom x = np.linspace(0, max(seconds), endpoint=True, num=(max(seconds) + 1)) ypos = np.linspace(0, 0, endpoint=True, num=(max(seconds) + 1)) yneg = np.linspace(0, 0, endpoint=True, num=(max(seconds) + 1)) yneut = np.linspace(0, 0, endpoint=True, num=(max(seconds) + 1)) for segment in speech_segments: this_second = int(segment.segmentStartTime) if segment.segmentSpeaker == caller: if segment.segmentIsPositive: for score in segment.segmentLoudnessScores: ypos[this_second] = 10 this_second += 1 elif segment.segmentNegative: for score in segment.segmentLoudnessScores: yneg[this_second] = 10 this_second += 1 else: for score in segment.segmentLoudnessScores: yneut[this_second] = 10 this_second += 1 axes.bar(x, ypos, label="Positive sentiment", color="limegreen", width=BAR_CHART_WIDTH) axes.bar(x, yneg, label="Negative sentiment", color="orangered", width=BAR_CHART_WIDTH) axes.bar(x, yneut, label="Neutral sentiment", color="cadetblue", width=BAR_CHART_WIDTH) # Finish with the non-talk and interrupt overlays (if there are any) if len(quiet_time["Seconds"]) > 0:
        axes.bar(quiet_time["Seconds"], quiet_time["dB"], label="Non-talk time", color="lightcyan", width=BAR_CHART_WIDTH)
    if len(interrupts["Seconds"]) > 0:
        axes.bar(interrupts["Seconds"], interrupts["dB"], label="Interruptions", color="goldenrod", width=BAR_CHART_WIDTH, alpha=0.5, bottom=10)

    # Only show the legend for the top graph if requested
    box = axes.get_position()
    axes.set_position([0.055, box.y0, box.width, box.height])
    axes.text(5, yaxis_max-5, caller, style='normal', color='black', bbox={'facecolor': 'white', 'pad': 5})
    if show_chart_legend:
        axes.legend(loc="upper right", bbox_to_anchor=(1.21, 1.0), ncol=1, borderaxespad=0)


def write_comprehend_sentiment(document, speech_segments, temp_files):
    """
    Writes out tables for per-period, per-speaker sentiment from the analytics mode, as well as
    the overall sentiment for a speaker

    :param document: Docx document to add the sentiment graph to
    :param speech_segments: Process transcript text holding turn-by-turn sentiment
    :param temp_files: List of temp files to be deleted later
    :return:
    """
    # Initialise our base structures
    speaker0labels = ['ch_0', 'spk_0']
    speaker1labels = ['ch_1', 'spk_1']
    speaker0timestamps = []
    speaker0data = []
    speaker1timestamps = []
    speaker1data = []

    # Start with some spacing and a new sub-header
    document.add_paragraph()
    write_custom_text_header(document, "Amazon Comprehend Sentiment")
    # Now step through and process each speech segment's sentiment
    for segment in speech_segments:
        if segment.segmentIsPositive or segment.segmentIsNegative:
            # Only interested in actual sentiment entries
            score = segment.segmentSentimentScore
            timestamp = segment.segmentStartTime

            # Positive re-calculation
            if segment.segmentIsPositive:
                score = 2 * ((1 - (1 - score) / (1 - MIN_SENTIMENT_POSITIVE)) * 0.5)
            # Negative re-calculation
            else:
                score = 2 * ((1 - score) / (1 - MIN_SENTIMENT_NEGATIVE) * 0.5 - 0.5)

            if segment.segmentSpeaker in speaker1labels:
                speaker1data.append(score)
                speaker1timestamps.append(timestamp)
            elif segment.segmentSpeaker in speaker0labels:
                speaker0data.append(score)
                speaker0timestamps.append(timestamp)

    # Spline fit needs at least 4 points for k=3, but 5 works better
    speaker1k = 3
    speaker0k = 3
    if len(speaker1data) < 5:
        speaker1k = 1
    if len(speaker0data) < 5:
        speaker0k = 1

    # Create Speaker-0 graph
    plt.figure(figsize=(8, 5))
    speaker0xnew = np.linspace(speaker0timestamps[0], speaker0timestamps[-1],
                               int((speaker0timestamps[-1] - speaker0timestamps[0]) + 1.0))
    speaker0spl = make_interp_spline(speaker0timestamps, speaker0data, k=speaker0k)
    speaker0powerSmooth = speaker0spl(speaker0xnew)
    plt.plot(speaker0timestamps, speaker0data, "ro")
    plt.plot(speaker0xnew, speaker0powerSmooth, "r", label="Speaker 1")

    # Create Speaker-1 graph
    speaker1xnew = np.linspace(speaker1timestamps[0], speaker1timestamps[-1],
                               int((speaker1timestamps[-1] - speaker1timestamps[0]) + 1.0))
    speaker1spl = make_interp_spline(speaker1timestamps, speaker1data, k=speaker1k)
    speaker1powerSmooth = speaker1spl(speaker1xnew)
    plt.plot(speaker1timestamps, speaker1data, "bo")
    plt.plot(speaker1xnew, speaker1powerSmooth, "b", label="Speaker 2")

    # Draw it out
    plt.title("Call Sentiment - Pos/Neg Only")
    plt.xlabel("Time (seconds)")
    plt.axis([0, max(speaker0timestamps[-1], speaker1timestamps[-1]), -1.5, 1.5])
    plt.legend()
    plt.axhline(y=0, color='k')
    plt.axvline(x=0, color='k')
    plt.grid(True)
    plt.xticks(np.arange(0, max(speaker0timestamps[-1], speaker1timestamps[-1]), 60))
    plt.yticks(np.arange(-1, 1.01, 0.25))

    # Write out the chart
    chart_file_name = "/tmp/" + "sentiment.png"
    plt.savefig(chart_file_name)
    temp_files.append(chart_file_name)
    plt.clf()
    document.add_picture(chart_file_name, width=Cm(14.64))
    document.paragraphs[-1].alignment = WD_ALIGN_PARAGRAPH.LEFT


def set_table_cell_background_colour(cell, rgb_hex):
    """
    Modifies the background color of the given table cell to the given RGB hex value.  This currently isn't
    supporting by the DOCX module, and the only option is to modify the underlying Word document XML

    :param cell: Table cell to be changed
    :param rgb_hex: RBG hex string for the background color
    """
    parsed_xml = parse_xml(r'<w:shd {0} w:fill="{1}"/>'.format(nsdecls('w'), rgb_hex))
    cell._tc.get_or_add_tcPr().append(parsed_xml)


def write_analytics_sentiment(data, document):
    """
    Writes out tables for per-period, per-speaker sentiment from the analytics mode, as well as
    the overall sentiment for a speaker

    :param data: Transcribe results data
    :param document: Docx document to add the tables to
    """

    # Start with a new 2-column section
    document.add_section(WD_SECTION.CONTINUOUS)
    section_ptr = document.sections[-1]._sectPr
    cols = section_ptr.xpath('./w:cols')[0]
    cols.set(qn('w:num'), '2')

    # Table 1 - Period sentiment per speaker
    write_custom_text_header(document, "Call Sentiment per Quarter of the call")
    table = document.add_table(rows=1, cols=5)
    table.style = document.styles[TABLE_STYLE_STANDARD]
    hdr_cells = table.rows[0].cells
    hdr_cells[0].text = "Speaker"
    hdr_cells[1].text = "Q1"
    hdr_cells[2].text = "Q2"
    hdr_cells[3].text = "Q3"
    hdr_cells[4].text = "Q4"
    for col in range(1, 5):
        hdr_cells[col].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER

    # Work through our sentiment period data
    period_sentiment = data["ConversationCharacteristics"]["Sentiment"]["SentimentByPeriod"]["QUARTER"]
    for caller in period_sentiment:
        # First column is the speaker
        row_cells = table.add_row().cells
        row_cells[0].text = caller.title()
        col_offset = 1
        # Further columns on that row hold the value for one period on the call
        for period in period_sentiment[caller]:
            row_cells[col_offset].text = str(period["Score"])
            row_cells[col_offset].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
            cell_colour = get_text_colour_analytics_sentiment(period["Score"])
            set_table_cell_background_colour(row_cells[col_offset], cell_colour)
            col_offset += 1

    # Put in a short table footer, then move to the next column
    document.add_paragraph()  # Spacing
    write_small_header_text(document, "SENTIMENT: Range from +5 (Positive) to -5 (Negative)", 0.9)

    # Table 2 - Overall speaker sentiment
    write_custom_text_header(document, "Overall Speaker Sentiment")
    table = document.add_table(rows=1, cols=2)
    table.style = document.styles[TABLE_STYLE_STANDARD]
    hdr_cells = table.rows[0].cells
    hdr_cells[0].text = "Speaker"
    hdr_cells[1].text = "Sentiment"
    hdr_cells[1].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
    speaker_sentiment = data["ConversationCharacteristics"]["Sentiment"]["OverallSentiment"]
    for caller in speaker_sentiment:
        row_cells = table.add_row().cells
        row_cells[0].text = caller.title()
        row_cells[1].text = str(speaker_sentiment[caller])
        row_cells[1].paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
        cell_colour = get_text_colour_analytics_sentiment(speaker_sentiment[caller])
        set_table_cell_background_colour(row_cells[1], cell_colour)

    # Keep the columns narrow for the 2nd table
    widths = (Cm(2.2), Cm(1.5))
    for row in table.rows:
        for idx, width in enumerate(widths):
            row.cells[idx].width = width
    document.add_paragraph()  # Spacing


def create_turn_by_turn_segments(data, cli_args):
    """
    This creates a list of per-turn speech segments based upon the transcript data.  It has to work in three
    slightly different ways, as each operational mode from Transcribe outputs slightly different JSON structures.
    These modes are (a) Speaker-separated audio, (b) Channel-separated audio, and (c) Call Analytics audio

    :param data: JSON result data from Transcribe
    :param cli_args: CLI arguments used for this processing run
    :return: List of transcription speech segments
    :return: Flag to indicate the presence of call summary data
    """
    speechSegmentList = []
    summaries_detected = False

    # Decide on our operational mode - it's in the job-status or, if necessary, infer it from the data file
    # STANDARD => speaker separated, channel separated;  ANALYTICS => different format
    isAnalyticsMode = cli_args.analyticsMode
    if isAnalyticsMode:
        # We know if its analytics mode, as it's defined in the job-status and file
        isChannelMode = False
        isSpeakerMode = False
    else:
        # Channel/Speaker-mode only relevant if not using analytics
        isChannelMode = "channel_labels" in data["results"]
        isSpeakerMode = not isChannelMode

    lastSpeaker = ""
    lastEndTime = 0.0
    skipLeadingSpace = False
    confidenceList = []
    nextSpeechSegment = None

    # Process a Speaker-separated non-analytics file
    if isSpeakerMode:
        # A segment is a blob of pronunciation and punctuation by an individual speaker
        for segment in data["results"]["speaker_labels"]["segments"]:

            # If there is content in the segment then pick out the time and speaker
            if len(segment["items"]) > 0:
                # Pick out our next data
                nextStartTime = float(segment["start_time"])
                nextEndTime = float(segment["end_time"])
                nextSpeaker = str(segment["speaker_label"])

                # If we've changed speaker, or there's a gap, create a new row
                if (nextSpeaker != lastSpeaker) or ((nextStartTime - lastEndTime) >= START_NEW_SEGMENT_DELAY):
                    nextSpeechSegment = SpeechSegment()
                    speechSegmentList.append(nextSpeechSegment)
                    nextSpeechSegment.segmentStartTime = nextStartTime
                    nextSpeechSegment.segmentSpeaker = nextSpeaker
                    skipLeadingSpace = True
                    confidenceList = []
                    nextSpeechSegment.segmentConfidence = confidenceList
                nextSpeechSegment.segmentEndTime = nextEndTime

                # Note the speaker and end time of this segment for the next iteration
                lastSpeaker = nextSpeaker
                lastEndTime = nextEndTime

                # For each word in the segment...
                for word in segment["items"]:

                    # Get the word with the highest confidence
                    pronunciations = list(filter(lambda x: x["type"] == "pronunciation", data["results"]["items"]))
                    word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                    try:
                        result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                        confidence = float(result["confidence"])
                    except:
                        result = word_result[-1]["alternatives"][0]
                        confidence = float(result["redactions"][0]["confidence"])

                    # Write the word, and a leading space if this isn't the start of the segment
                    if skipLeadingSpace:
                        skipLeadingSpace = False
                        wordToAdd = result["content"]
                    else:
                        wordToAdd = " " + result["content"]

                    # If the next item is punctuation, add it to the current word
                    try:
                        word_result_index = data["results"]["items"].index(word_result[0])
                        next_item = data["results"]["items"][word_result_index + 1]
                        if next_item["type"] == "punctuation":
                            wordToAdd += next_item["alternatives"][0]["content"]
                    except IndexError:
                        pass

                    nextSpeechSegment.segmentText += wordToAdd
                    confidenceList.append({"text": wordToAdd,
                                           "confidence": confidence,
                                           "start_time": float(word["start_time"]),
                                           "end_time": float(word["end_time"])})

    # Process a Channel-separated non-analytics file
    elif isChannelMode:

        # A channel contains all pronunciation and punctuation from a single speaker
        for channel in data["results"]["channel_labels"]["channels"]:

            # If there is content in the channel then start processing it
            if len(channel["items"]) > 0:

                # We have the same speaker all the way through this channel
                nextSpeaker = str(channel["channel_label"])
                for word in channel["items"]:
                    # Pick out our next data from a 'pronunciation'
                    if word["type"] == "pronunciation":
                        nextStartTime = float(word["start_time"])
                        nextEndTime = float(word["end_time"])

                        # If we've changed speaker, or we haven't and the
                        # pause is very small, then start a new text segment
                        if (nextSpeaker != lastSpeaker) or\
                                ((nextSpeaker == lastSpeaker) and ((nextStartTime - lastEndTime) > 0.1)):
                            nextSpeechSegment = SpeechSegment()
                            speechSegmentList.append(nextSpeechSegment)
                            nextSpeechSegment.segmentStartTime = nextStartTime
                            nextSpeechSegment.segmentSpeaker = nextSpeaker
                            skipLeadingSpace = True
                            confidenceList = []
                            nextSpeechSegment.segmentConfidence = confidenceList
                        nextSpeechSegment.segmentEndTime = nextEndTime

                        # Note the speaker and end time of this segment for the next iteration
                        lastSpeaker = nextSpeaker
                        lastEndTime = nextEndTime

                        # Get the word with the highest confidence
                        pronunciations = list(filter(lambda x: x["type"] == "pronunciation", channel["items"]))
                        word_result = list(filter(lambda x: x["start_time"] == word["start_time"] and x["end_time"] == word["end_time"], pronunciations))
                        try:
                            result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]
                            confidence = float(result["confidence"])
                        except:
                            result = word_result[-1]["alternatives"][0]
                            confidence = float(result["redactions"][0]["confidence"])
                        # result = sorted(word_result[-1]["alternatives"], key=lambda x: x["confidence"])[-1]

                        # Write the word, and a leading space if this isn't the start of the segment
                        if (skipLeadingSpace):
                            skipLeadingSpace = False
                            wordToAdd = result["content"]
                        else:
                            wordToAdd = " " + result["content"]

                        # If the next item is punctuation, add it to the current word
                        try:
                            word_result_index = channel["items"].index(word_result[0])
                            next_item = channel["items"][word_result_index + 1]
                            if next_item["type"] == "punctuation":
                                wordToAdd += next_item["alternatives"][0]["content"]
                        except IndexError:
                            pass

                        # Finally, add the word and confidence to this segment's list
                        nextSpeechSegment.segmentText += wordToAdd
                        confidenceList.append({"text": wordToAdd,
                                               "confidence": confidence,
                                               "start_time": float(word["start_time"]),
                                               "end_time": float(word["end_time"])})

        # Sort the segments, as they are in channel-order and not speaker-order, then
        # merge together turns from the same speaker that are very close together
        speechSegmentList = sorted(speechSegmentList, key=lambda segment: segment.segmentStartTime)
        speechSegmentList = merge_speaker_segments(speechSegmentList)

    # Process a Call Analytics file
    elif isAnalyticsMode:

        # Lookup shortcuts
        interrupts = data["ConversationCharacteristics"]["Interruptions"]

        # Each turn has already been processed by Transcribe, so the outputs are in order
        for turn in data["Transcript"]:

            # Setup the next speaker block
            nextSpeechSegment = SpeechSegment()
            speechSegmentList.append(nextSpeechSegment)
            nextSpeechSegment.segmentStartTime = float(turn["BeginOffsetMillis"]) / 1000.0
            nextSpeechSegment.segmentEndTime = float(turn["EndOffsetMillis"]) / 1000.0
            nextSpeechSegment.segmentSpeaker = turn["ParticipantRole"].title()
            nextSpeechSegment.segmentText = turn["Content"]
            nextSpeechSegment.segmentLoudnessScores = turn["LoudnessScores"]
            confidenceList = []
            nextSpeechSegment.segmentConfidence = confidenceList
            skipLeadingSpace = True

            # Check if this block is within an interruption block for the speaker
            if turn["ParticipantRole"] in interrupts["InterruptionsByInterrupter"]:
                for entry in interrupts["InterruptionsByInterrupter"][turn["ParticipantRole"]]:
                    if turn["BeginOffsetMillis"] == entry["BeginOffsetMillis"]:
                        nextSpeechSegment.segmentInterruption = True

            # Record any issues detected
            if "IssuesDetected" in turn:
                summaries_detected = True
                for issue in turn["IssuesDetected"]:
                    # Grab the transcript offsets for the issue text
                    nextSpeechSegment.segmentIssuesDetected.append(issue["CharacterOffsets"])

            # Record any actions detected
            if "ActionItemsDetected" in turn:
                summaries_detected = True
                for action in turn["ActionItemsDetected"]:
                    # Grab the transcript offsets for the issue text
                    nextSpeechSegment.segmentActionItemsDetected.append(action["CharacterOffsets"])

            # Record any outcomes detected
            if "OutcomesDetected" in turn:
                summaries_detected = True
                for outcome in turn["OutcomesDetected"]:
                    # Grab the transcript offsets for the issue text
                    nextSpeechSegment.segmentOutcomesDetected.append(outcome["CharacterOffsets"])

            # Process each word in this turn
            for word in turn["Items"]:
                # Pick out our next data from a 'pronunciation'
                if word["Type"] == "pronunciation":
                    # Write the word, and a leading space if this isn't the start of the segment
                    if skipLeadingSpace:
                        skipLeadingSpace = False
                        wordToAdd = word["Content"]
                    else:
                        wordToAdd = " " + word["Content"]

                    # If the word is redacted then the word confidence is a bit more buried
                    if "Confidence" in word:
                        conf_score = float(word["Confidence"])
                    elif "Redaction" in word:
                        conf_score = float(word["Redaction"][0]["Confidence"])

                    # Add the word and confidence to this segment's list
                    confidenceList.append({"text": wordToAdd,
                                           "confidence": conf_score,
                                           "start_time": float(word["BeginOffsetMillis"]) / 1000.0,
                                           "end_time": float(word["BeginOffsetMillis"] / 1000.0)})
                else:
                    # Punctuation, needs to be added to the previous word
                    last_word = nextSpeechSegment.segmentConfidence[-1]
                    last_word["text"] = last_word["text"] + word["Content"]

            # Tag on the sentiment - analytics has no per-turn numbers
            turn_sentiment = turn["Sentiment"]
            if turn_sentiment == "POSITIVE":
                nextSpeechSegment.segmentIsPositive = True
                nextSpeechSegment.segmentPositive = 1.0
                nextSpeechSegment.segmentSentimentScore = 1.0
            elif turn_sentiment == "NEGATIVE":
                nextSpeechSegment.segmentIsNegative = True
                nextSpeechSegment.segmentNegative = 1.0
                nextSpeechSegment.segmentSentimentScore = 1.0

    # Return our full turn-by-turn speaker segment list with sentiment,
    # along with a flag to indicate the presence of call summary data
    return speechSegmentList, summaries_detected


def load_transcribe_job_status(cli_args):
    """
    Loads in the job status for the job named in cli_args.inputJob.  This will try both the standard Transcribe API
    as well as the Analytics API, as the customer may not know which one their job relates to

    :param cli_args: CLI arguments used for this processing run
    :return: The job status structure (different between standard/analytics), and a 'job-completed' flag
    """
    transcribe_client = boto3.client("transcribe")

    try:
        # Extract the standard Transcribe job status
        job_status = transcribe_client.get_transcription_job(TranscriptionJobName=cli_args.inputJob)["TranscriptionJob"]
        cli_args.analyticsMode = False
        completed = job_status["TranscriptionJobStatus"]
    except:
        # That job doesn't exist, but it may have been an analytics job
        job_status = transcribe_client.get_call_analytics_job(CallAnalyticsJobName=cli_args.inputJob)["CallAnalyticsJob"]
        cli_args.analyticsMode = True
        completed = job_status["CallAnalyticsJobStatus"]

    return job_status, completed


def generate_document(cli_args):
    """
    Entrypoint for the command-line interface.
    """
    cli_args = DictWrapper(cli_args)
    # Load in the JSON file for processing
    # json_filepath = Path(cli_args.inputFile)
    # if json_filepath.is_file():
    #     json_data = json.load(open(json_filepath.absolute(), "r", encoding="utf-8"))
    # else:
    #     print("FAIL: Specified JSON file '{0}' does not exists.".format(cli_args.inputFile))
    #     exit(-1)
    # s3 file load
    bucket = boto3.resource('s3').Bucket(cli_args.BUCKET_NAME)
    obj = bucket.Object(cli_args.OBJECT_KEY_NAME)
    response = obj.get()    
    body = response['Body'].read()
    json_data = json.loads(body.decode('utf-8'))

    # If this is a file-input run then try and load the job status (which may no longer exist)
    if cli_args.inputJob is None:
        try:
            # Ensure we don't delete our JSON later, reset our output file to match the job-name if it's currently blank
            cli_args.keep = True
            if cli_args.outputFile is None:
                if "results" in json_data:
                    cli_args.outputFile = json_data["jobName"] + ".docx"
                    cli_args.inputJob = json_data["jobName"]
                else:
                    cli_args.outputFile = json_data["JobName"] + ".docx"
                    cli_args.inputJob = json_data["JobName"]
            job_info, job_status = load_transcribe_job_status(cli_args)
        except:
            # No job status - need to quickly work out what mode we're in,
            # as standard job results look different from analytical ones
            cli_args.inputJob = None
            cli_args.outputFile = cli_args.inputFile + ".docx"
            cli_args.analyticsMode = "results" not in json_data
            job_info = None

    # Disable Comprehend's sentiment if we're in Analytics mode
    if cli_args.analyticsMode:
        cli_args.sentiment = 'off'

    # Generate the core transcript
    start = perf_counter()
    speech_segments, summaries_detected = create_turn_by_turn_segments(json_data, cli_args)

    # Inject Comprehend-based sentiments into the segment list if required
    if cli_args.sentiment == 'on':
        # Work out the mapped language code, as Transcribe supports more languages than Comprehend.  Just
        # see if the Transcribe language code starts with any of those that Comprehend supports and use that
        sentiment_lang_code = None
        for comprehend_code in SENTIMENT_LANGUAGES:
            if job_info["LanguageCode"].startswith(comprehend_code):
                sentiment_lang_code = comprehend_code
                break

        # If we have no match then we cannot perform sentiment analysis
        if sentiment_lang_code is not None:
            generate_sentiment(speech_segments, sentiment_lang_code)
        else:
            cli_args.sentiment = 'off'

    # Write out our file and the performance statistics
    s3_path = write(cli_args, speech_segments, job_info, summaries_detected, json_data)
    finish = perf_counter()
    duration = round(finish - start, 2)
    print(f"> Transcript {cli_args.outputFile} writen in {duration} seconds.")
    return s3_path


あとは、上記ファイルで定義した関数を呼び出すようにlambda_handler関数内に記述します。

lambda_function.py(クリックして展開)
import json
import subprocess
from ts_to_word import generate_document
import boto3
import urllib

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    job_name = key.split("/")[-1].split("_")[0]
    instance_id = key.split("/")[-1].split("_")[1].replace(".json", "")
    data = {
        "inputFile":job_name,
        "BUCKET_NAME":bucket,
        "OBJECT_KEY_NAME":key
    }
    # docs file create
    s3_path = generate_document(data)
    # connect contact data update
    response = boto3.client('connect').update_contact_attributes(
        InitialContactId=job_name,
        InstanceId=instance_id,
        Attributes={
            "docs": s3_path
        }
    )
    # TODO implement
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }


なお、メモリは、256MBとし、実行に時間を要するため、タイムアウトについては1分30秒とします。

S3バケットのイベント通知設定

続いて、S3バケットにイベント通知設定を行います。

まずは、Amazon Connect のコンソール→データストレージからどのS3バケットのどのフォルダへファイルがアップロードされるかを確認します。

特定のフォルダへwavファイルアップロードをトリガー

上記で確認したフォルダへwavファイルのアップロードされたら、ジョブ実行用の Lambda を起動するように設定します。

特定のフォルダへjsonファイルアップロードをトリガー

Transcribeジョブの実行結果保存先のS3バケットのフォルダへのjsonファイルのアップロードをトリガーとなるように設定を行います。

動作確認

では、最後に動作確認をしてみます。

実際に電話をかけて、しばらく待つとコンタクトが確認できるようになります。

以下のようにコンタクト属性にS3バケットのパスが表示されていることが確認できました。

また、S3バケットへ移動し、Word 文書をダウンロードして中身を確認してみます。

以下のように分析結果のレポートとなっていれば問題なく動作しています。

最後に

無事に Word 文書を確認することはできましたが、Lambda周りは検討の余地があります。(scipyのインストールをどうするかなど)

ただ、amazon-transcribe-output-wordのスクリプトで簡単に Word 文書に変換できるのは魅力的だと思うので、気になった方はぜひお試しください。

 

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.