Amazon Transcribeのジョブ実行結果をS3 Events/SNS/Lambdaでイベントドリブンに処理する

2018.07.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

従来の Amazon Transcribe は、文字起こし結果を AWS の管理する S3 バケットに出力しました。今後は、ユーザーの管理する S3 バケットにも出力可能になりました。

Amazon Transcribeが文字起こし結果をユーザーのS3バケットに出力できるようになりました

ユーザーの管理する S3 バケットに出力することにより

  • 文字起こし結果の権限・ライフサイクルの管理
  • S3 への書き込みイベントをもとに処理を呼び出し

といったことが可能になります。

本ブログでは、Transcribe ジョブ結果がユーザーの管理する S3 バケットに書き込まれたあと、 S3 → SNS → Lambda と連携し、ジョブ結果ファイルを Lambda で処理できるように実装します。

文字起こしパイプラインイメージ図

イベントドリブンに

  • S3 バケットに音源ファイルが書き込まれたら、文字起こしジョブを開始
  • S3 バケットに文字起こし結果が出力されたら(=ジョブが完了)、文字起こし結果を処理

のようなことを行うと、次のようなパイプラインが考えられます。

S3 イベントから直接 Lambda を呼び出さず、SNS を挟んでいるのは

  • S3 のイベント呼び出し条件はオーバーラップできないという制約がある(バケット全体と特定のprefixなど)
  • Fanout しやすい

ためです。

1. transcription 出力用 S3 バケットの作成

Amazon Transcribe と同じリージョンに S3 バケットを作成します。

Amazon Transcribe(transcribe.amazonaws.com) が この S3 バケットにオブジェクトを更新(s3:PutObject)できるようにするためのバケットポリシーは不要です。

ジョブ完了時にメッセージ送信する SNS トピックの作成

SNS トピックを作成します。

また、S3 がこのトピックにメッセージ送信(SNS:Publish)できるように、トピックポリシーを設定します。

{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "SNS:Publish",
      "Resource": "arn:aws:sns:us-east-1:205974338614:TOPIC-NAME",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:*:*:TRANSCRIPTION-BUCKET-NAME"
        }
      }
    }
  ]
}

S3 Events で SNS を呼び出す

ジョブ実行結果が S3 バケットに書き込まれたたときに、この SNS トピックにメッセージを送信するように S3 イベント設定します。

今回はバケット全体に対してObjectCreated:Put をトリガーにしイベントを発火します。

ジョブ完了時に呼び出される Lambda 関数を作成

S3 イベント情報をもとに、S3 にあるジョブ実行結果を取得し、この結果をダンプするだけの Lambda 関数を用意します。

実サービスでは、ジョブ実行結果ファイルから文字起こしされたテキストを抽出し、要件に合わせた処理を行うことになるかと思います。

'''
Lambda function to retrieve transcription data from S3

Invoked by S3 events -> SNS -> Lambda
'''
import json
from logging import getLogger, DEBUG
import boto3

logger = getLogger(__name__)
logger.setLevel(DEBUG)

s3_client = boto3.client('s3')

def lambda_handler(event, context):
    logger.debug(event)

    for sns_record in event['Records']:
        sns_message = json.loads(sns_record['Sns']['Message'])

        for s3_record in sns_message['Records']:
            s3 = s3_record['s3']
            logger.info(s3)

            # ジョブ結果ファイルを S3 から取得
            response = s3_client.get_object(
                         Bucket=s3['bucket']['name'],
                         Key=s3['object']['key'])
            logger.debug('transcription data')
            logger.debug(response['Body'].read())

この Lambda 関数に対して、先程作成した SNS トピックを購読させます。

文字起こしパイプラインを実行

ジョブの作成

今回は管理コンソールからジョブを作成します。

ジョブ作成画面の最下部に ”Choose output locationInfo” という入力欄が追加されています。

デフォルトでは「Amazon default bucket」が選択されていますが、「My own bucket」を選択すると、バケット名を入力できるようになります。

S3 バケットは、Amazon Transcribe ジョブと同じリージョンにある必要があります。 また、現時点ではバケット名のみ指定可能で、バケット内でのパスは指定できません。

ジョブ完了後の処理を確認

CloudWatch Logs にある Lambda の実行ログから、 SNS トピックに送信されたメッセージを確認します。

...
[DEBUG]	2018-07-19T09:31:03.242Z	740d17d2-8b36-11e8-831d-f9c26748560e	transcription data
[DEBUG]	2018-07-19T09:31:03.264Z	740d17d2-8b36-11e8-831d-f9c26748560e	b'{"jobName":"YOUR-JOB-NAME","accountId":"123456789012","results":{"transcripts":[{"transcript":"...
...

無事、Amazon Transcribe のジョブ実行結果を取得できています。

補足:S3 Events のメッセージ詳細

S3 Events → SNS → Lambda と連携したときに Lambda に渡るメッセージのデータ構造を確認します。

ジョブが完了し、S3 に PutObject されると、以下の様なメッセージが Lambda に渡ります。

{
  "Records": [
    {
      "EventSource": "aws:sns",
      "EventVersion": "1.0",
      "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:SNS-TOPIC-NAME:e7ac2ee1-b4b4-4155-a292-95464c676c6f",
      "Sns": {
        "Type": "Notification",
        "MessageId": "0733f0c2-6c59-5a45-9607-a30ab5c16d29",
        "TopicArn": "arn:aws:sns:us-east-1:123456789012:SNS-TOPIC-NAME",
        "Subject": "Amazon S3 Notification",
        "Message": "...",
        ...
      }
    }
  ]
}

12 行目から Amazon S3 イベントであることがわかります。 13 行目の Message がイベント発火の実態です。

この Message の値を抜粋したのが以下です。

{
  "Records": [
    {
      "eventVersion": "2.0",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "2018-07-18T05:59:52.123Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {...},
      "requestParameters": {...},
      "responseElements": {...},
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "S3-EVENT-NAME",
        "bucket": {
          "name": "TRANSCRIPTION-BUCKET-NAME",
          "ownerIdentity": {
            "principalId": "XXX"
          },
          "arn": "arn:aws:s3:::TRANSCRIPTION-BUCKET-NAME"
        },
        "object": {
          "key": "TRANSCRIPTION-JOB-NAME.json",
          "size": 94004,
          "eTag": "5c0a6bc6b063e27d9058d5ff6b206af2",
          "sequencer": "005B4ED757C1EDA88B"
        }
      }
    }
  ]
}

S3 バケットに対する ObjectCreated:Put のためにイベントが呼び出され、Put されたバケット名、オブジェクトキーもイベント情報に含まれています。

最後に

Amazon Transcribe の文字起こし結果をユーザーの S3 バケットに出力できるようになりました。

この機能を S3 Events と連携し、ジョブ完了後は即座にジョブ実行結果ファイルを Lambda 関数で処理してみました。

ジョブ完了時にイベント・ドリブンに処理したい場合、今回紹介した方法とは別に CloudWatch Events を利用するアプローチもあります。

次のブログを参照ください。

Amazon TranscribeがAmazon CloudWatch Eventsに対応しました

参考