ちょっと話題の記事

サーバーレス入門!英語の音声ファイルをTranscribeで文字起こししてTranslateで日本語に翻訳する #jawsug_bgnr

JAWS-UG 初心者支部#24 サーバレスハンズオン勉強会 の宿題である、文字起こし + 翻訳パイプラインに挑戦しました。

コンバンハ、千葉(幸)です。

先日JAWS-UG 初心者支部#24 サーバレスハンズオン勉強会にリモートで参加し、いくつかの構成をハンズオンで学びました。その時の記事がこちら。

その中で、ハンズオンでは全てを行わず、各自でチャレンジしてみてね!と出された宿題がありました。せっかくなので実際にやってみます。やりたいことの構成は以下のようになっています。

ちなみに自分の現状としては、以下の通りです。

  • S3 : 分かる
  • Lambda : 分かる
  • Transcribe : 聞いたことはある
  • Translate : 聞いたことはある
  • コード : ほぼ書いたことがない
  • Python : 入門書みたいなものを何冊も持っているがどれも本棚で眠っている。お前はいつもそうだ。

目次

取り掛かる前の準備

前提として、以下の構成までは準備できているものとします。

  • 音声ファイルをアップロードするインプット用S3バケット
  • 上記のS3バケットへのputをトリガーに実行されるLambda関数
    • 実行に必要なロールの設定含む
    • Amazon Transcribeによる文字起こし処理を行うコード含む
  • Lambda関数による文字起こし結果ファイルのアウトプット用S3バケット

上記の前提が整っていない場合、以下のハンズオン資料を見ながらであると簡単に作れます。マネジメントコンソールの操作に慣れている方なら、10分程度で構築できるかと思います。興味がある方はぜひ手を動かしてみてください。

追加でやりたいこと

構築済みの環境から追加でやりたいことを大まかに分解すると、以下の通りです。

  • アウトプット用のS3バケットへのputをトリガーに、Lambdaを実行する
  • アウトプットされたファイルの内、文字起こしの結果のテキストを取得する
  • 取得したテキストをTranslateに渡して翻訳してもらう

手元にある材料

ハンズオンの結果から、流用できそうなものがいくつかあります。

Transcribeされた結果のファイル

元のファイルは改行なしの1行で出力されていますが、整形すると以下のようになります。7行目の部分をうまく抜き出して、これをTranslateできれば良さそうです。

{
	"jobName": "20200219112729_Transcription",
	"accountId": "xxxxxxxxxxxx",
	"results": {
		"transcripts": [
			{
				"transcript": "Hello. Do you speak a foreign language? One language is never enough."
			}
		],
		"items": [
			{
				"start_time": "0.04",
				"end_time": "0.65",
				"alternatives": [
					{
						"confidence": "0.9139",
						"content": "Hello"
					}
				],
				"type": "pronunciation"
			},
      ================= 中略 =================
			}
		]
	},
	"status": "COMPLETED"
}

Translateに翻訳処理を実行してもらうコード

ハンズオンの2番目で使った、「API Gateway経由で渡された日本語のテキストを英語に翻訳する」ためのコードがあります。

ハイライト部分では、API GatewayをトリガーにLambdaを実行する際に受け渡されるイベントの中から翻訳対象のテキストを抜き出しています。ここを、どうにかして文字起こし結果ファイルの中のテキストを当て込められればほぼ踏襲して使えそうです。

import json
import boto3

def lambda_handler(event, context):

    translate = boto3.client('translate')
    input_text = event['queryStringParameters']['input_text']

    response = translate.translate_text(
        Text=input_text,
        SourceLanguageCode='ja',
        TargetLanguageCode='en'
    )

    output_text = response.get('TranslatedText')

    return {
        'statusCode': 200,
        'body': json.dumps({
            'output_text': output_text
        })
    }

あとは、上記のままだと日→英の翻訳なので、そこは入れ替えてあげる必要があります。

S3へのputをトリガーに実行されるコード

音声ファイルの文字起こしをやっているLambdaのコードです。 S3から送られてくるイベントから、S3バケット名とオブジェクトのキーを取得している部分が参考になりそうです。

import json
import urllib.parse
import boto3
import datetime

s3 = boto3.client('s3')
transcribe = boto3.client('transcribe')

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        transcribe.start_transcription_job(
            TranscriptionJobName= datetime.datetime.now().strftime("%Y%m%d%H%M%S") + '_Transcription',
            LanguageCode='en-US',
            Media={
                'MediaFileUri': 'https://s3.ap-northeast-1.amazonaws.com/' + bucket + '/' + key
            },
            OutputBucketName='yyyymmdd-transcribe-output-yourname'
        )
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

S3から送られてくるイベントの中身

ハンズオンで触れられた部分ではないですが、前回の記事を書くときに気になったので調べていました。

上のコードと組み合わせて、JSON形式のここの値を取りたいときはこう書けばいいんだな、という参考になりそうです。ここでファイルの中身に関する情報が含まれていれば話は単純でしたが、そうは行かないようです。

Amazon S3 イベントでの AWS Lambda の使用

{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-2",
      "eventTime": "2019-09-03T19:37:27.192Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
      },
      "requestParameters": {
        "sourceIPAddress": "205.255.255.255"
      },
      "responseElements": {
        "x-amz-request-id": "D82B88E5F771F645",
        "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
        "bucket": {
          "name": "lambda-artifacts-deafc19498e3f2df",
          "ownerIdentity": {
            "principalId": "A3I5XTEXAMAI3E"
          },
          "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
        },
        "object": {
          "key": "b21b84d653bb07b05b1e6b33684dc11b",
          "size": 1305107,
          "eTag": "b21b84d653bb07b05b1e6b33684dc11b",
          "sequencer": "0C0F6F405D6ED209E1"
        }
      }
    }
  ]
}

足りないもの

ここまで集めた情報だと、文字起こしされてアウトプットS3バケットにputされたオブジェクトのキーを取得するところまでは問題なく行けそうです。その後の、該当するオブジェクトの中身を開いてお目当てのテキストだけを抜き出す、という処理をどう実現するかが、ハンズオンで習った知識だけでは足りません。

コードの中で実施している処理について詳しく知りたいときはリファレンス見る、という話が出ていたので、今回のコードで使用しているPython用AWS SDKであるboto3のリファレンスを見てみます。S3の中身を見たいので、S3に関する部分を参照します。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html

S3.Client.get_objectのレスポンスを確認

オブジェクトの中身を見たいので、やっぱりget_objectかなと思い、以下を参照することにします。 S3.Client.get_object

Request Syntaxを見ると、リクエスト時に指定が必須なのはBucketKeyだけのようです。

response = client.get_object(
    Bucket='string',
    IfMatch='string',
    IfModifiedSince=datetime(2015, 1, 1),
    IfNoneMatch='string',
    IfUnmodifiedSince=datetime(2015, 1, 1),
    Key='string',
    Range='string',
    ResponseCacheControl='string',
    ResponseContentDisposition='string',
    ResponseContentEncoding='string',
    ResponseContentLanguage='string',
    ResponseContentType='string',
    ResponseExpires=datetime(2015, 1, 1),
    VersionId='string',
    SSECustomerAlgorithm='string',
    SSECustomerKey='string',
    RequestPayer='requester',
    PartNumber=123
)

Response Syntaxを見ると、レスポンスは以下のような形式で返ってくるようです。Bodyがまさにオブジェクトの中身そのものな気がするので、これを参照してあげるのが第一歩になりそうです。

{
    'Body': StreamingBody(),  # StreamingBodyとは??
    'DeleteMarker': True|False,
    'AcceptRanges': 'string',
    'Expiration': 'string',
    'Restore': 'string',
    'LastModified': datetime(2015, 1, 1),
    'ContentLength': 123,
    'ETag': 'string',
    'MissingMeta': 123,
    'VersionId': 'string',
========= 以下略 =========
}

StreamingBody()という見慣れぬ記述が気になりますが、いったんそのまま進みます。

Lambdaでレスポンス確認その1

S3.Client.get_objectのレスポンスの中身を確認するために、多分こんな感じだろうと最低限のコードを書いてLambdaをテスト実行してみます。このLambdaは以下を設定済みです。

  • ロールにS3へのアクセス権限を持つポリシーをアタッチ
  • 環境変数BucketName,KeyNameにそれぞれバケット名、オブジェクトキーを設定
    • 指定しているオブジェクトは、ハンズオンで実施した時の文字起こし結果のファイル
import os
import boto3
 
def lambda_handler(event, context):
 
    s3 = boto3.client('s3')
 
    response = s3.get_object(
        Bucket=os.environ['BucketName'],
        Key=os.environ['KeyName']
    )
 
    return {
        response['Body']
    }

実行するとエラーが出ました。 "errorMessage": "{<botocore.response.StreamingBody object at 0x7fd6f9fc9cc0>} is not JSON serializable",

Response Syntaxのところで見ぬふりをしたStreamingBody()なるものと向き合わざるをえなくなったので調べると、botocore.response.StreamingBodyというものを意識してあげないといけないようです。

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html#botocore-response

Lambdaでレスポンス確認その2

ハイライト部を修正して再チャレンジです。.read()を付与しました。

import os
import boto3
 
def lambda_handler(event, context):
 
    s3 = boto3.client('s3')
 
    response = s3.get_object(
        Bucket=os.environ['BucketName'],
        Key=os.environ['KeyName']
    )
 
    return {
        response['Body'].read()
    }

まだエラーが出ました。 "errorMessage": "{b'{中略}'} is not JSON serializable"

概ねオブジェクトの中身は取れている気がするのですが、先頭についているbってなんだ?と思ってtype()で調べてみると、結果はbytesでバイト型と呼ばれるもののようでした。

このあたりでひたすらトライ&エラーを試みたのですが、以下の設定を入れてあげるのが良さそうということに落ち着きました。

  • .decode("utf-8")でデコードしてバイト型を文字列型にする
  • json.loads()で文字列型の結果をJSONデコーディングする

Lambdaでレスポンス確認その3

ハイライト部分が先述の修正部分です。

import os
import boto3
import json
 
def lambda_handler(event, context):
 
    s3 = boto3.client('s3')
 
    response = s3.get_object(
        Bucket=os.environ['BucketName'],
        Key=os.environ['KeyName']
    )
 
    return json.loads(response['Body'].read().decode("utf-8"))

ここでようやくオブジェクトの中身がレスポンスとして返ってきました。後はお目当てのテキストを抜き出すだけです。

Lambdaでのレスポンス確認その4

S3イベントからバケット名やオブジェクトのキーを取得していた際の書き方を参考に、以下のように書きました。

import os
import boto3
import json
 
def lambda_handler(event, context):
 
    s3 = boto3.client('s3')
 
    response = s3.get_object(
        Bucket=os.environ['BucketName'],
        Key=os.environ['KeyName']
    )
 
    object = json.loads(response['Body'].read().decode("utf-8"))
 
    return object['results']['transcripts'][0]['transcript']

ようやくテキストを抽出することができました。

追加で作成するLambda関数のセットアップ

最終的に以下のコードにしました。ハイライト部に上記で試行錯誤した結果を取り込んだ以外は、ほぼありもののコードの流用で行けました。

翻訳された結果はLambdaのログが出力されるCloud Watch Logs上で見れれば十分かなと思ったですが、せっかくs3.get_objectを学んだので、putもしてみるか、ということでS3に出力する処理も入れました。

import json
import boto3
import urllib.parse
import os
 
def lambda_handler(event, context):
 
    translate = boto3.client('translate')
    s3 = boto3.client('s3')

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')

    file = s3.get_object(
        Bucket=bucket,
        Key=key
    )

    filecontents = json.loads(file['Body'].read().decode("utf-8"))
    input_text = filecontents['results']['transcripts'][0]['transcript']
    
    response = translate.translate_text(
        Text=input_text,
        SourceLanguageCode='en',
        TargetLanguageCode='ja'
    )
 
    output_text = response.get('TranslatedText')

    # Cloud Watch Logsでの確認用
    print(output_text)  

    # 翻訳結果をS3に出力
    put = s3.put_object(
        Body=output_text.encode('utf-8'),  # Bodyに渡す際にはバイト型である必要があるためエンコードしている
        Bucket=os.environ['BucketName'],  # インプットともアウトプットとも違うS3バケットを環境変数で指定
        Key='japanese_output'
    )

Lambda関数を作成し、上記のコードを設定します。Lアタッチするロールには、Cloud Watch Logs、S3、Translateに対する十分なポリシーを割り当てています。

Lambdaの実行トリガーとして、文字起こし結果のアウトプットS3バケットを指定します。

実行してみる

コードの準備ができたので早速一連の流れで確認してみます。改めて、やりたいことは以下です。

  • インプットS3バケットに音声ファイルをアップロードする
  • 上記をトリガーにLambdaが実行され、Transcribeのジョブが実行される
  • Transcribeのジョブが完了すると、文字起こし結果を含むファイルがアウトプットS3バケットに出力される
  • 上記をトリガーにLambdaが実行され、Translateが実行される
  • Translaeteの結果がCloud Watch LogsとS3バケットに出力される

今回はハンズオンとはまた別の音声ファイルを使用してみます。以下のリンクからダウンロードしました。(アクセスすると音声が流れます。)

https://d1.awsstatic.com/tmt/create-audio-transcript-transcribe/transcribe-sample.5fc2109bb28268d10fbc677e64b7e59256783d3c.mp3

ダウンロードしたmp3ファイルをインプットS3バケットにアップロードします。

putをトリガーに、Transcribe用のLambda関数がInvocateされていることが確認できます。

しばらくするとTransribeのジョブが完了します。

文字起こしされた結果のファイルがアウトプットS3バケットに出力されました。

中身のうち、文字起こし結果の部分のみ抽出すると、以下のようになっています。

"machine learning is employed in a range of computing tasks where designing and program explicit algorithms with good performance is difficult or infeasible. Example. Applications include email filtering, detection of network intruders and computervision. Machine learning is closely related to computational statistics, which also focuses on predictions making through the use of computer. It has strong ties to mathematical optimization, which delivers methods, theory and application domains to the field."

Translateを実行するLambda関数の実行ログを見ると、きちんと翻訳されていることが分かります。

機械学習は、優れたパフォーマンスで明示的なアルゴリズムの設計とプログラミングが困難または実行不可能である幅広いコンピューティングタスクで採用されています。 例。 アプリケーションには、電子メールフィルタリング、ネットワーク侵入者の検出、およびコンピュータビジョンが含まれます。 機械学習は密接にも、コンピュータの使用を介して行う予測に焦点を当てて計算統計、に関連しています。 数学的最適化と強固な関係を持ち、方法、理論、応用分野を現場に提供します。

S3バケットにも、指定したファイル名(キー名)で、結果が出力されています。中身は上記のテキストと同一です。

これで一連の動作確認が完了しました!

完璧ですね!!

なんかエラー出てなかった?

出ていました。

Cloud Watch Logs上での実行ログを見ると、きちんと日本語訳された結果の前後に、いくつかエラーが出力されています。

エラーの内容は以下のようになっています。

[ERROR] TypeError: list indices must be integers or slices, not str
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 20, in lambda_handler
    input_text = filecontents['results']['transcripts'][0]['transcript']

調べてみると、.write_access_check_file.tempが原因であることが分かりました。

Transcribeのジョブが実行される際に、.write_access_check_file.tempという名称のファイルをアウトプットS3バケットにputしています。文字起こし結果を出力する前に、きちんとS3バケットへのアクセス権を持っているかを確認するプロセスが走るようですね。そのputをトリガーにTranslate用のLambdaが実行され、中身がfilecontents['results']['transcripts'][0]['transcript']の形式に対応していないため、エラーが発生していました。

というわけで、チェックファイルがputされた場合は後続処理を中断する仕様にしてみます。

import json
import boto3
import urllib.parse
import os
 
def lambda_handler(event, context):
 
    translate = boto3.client('translate')
    s3 = boto3.client('s3')

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')

    if key == '.write_access_check_file.temp':
        return print(key)
    
    else:
        file = s3.get_object(
            Bucket=bucket,
            Key=key
        )
    
        filecontents = json.loads(file['Body'].read().decode('utf-8'))
        input_text = filecontents['results']['transcripts'][0]['transcript']
    
        response = translate.translate_text(
            Text=input_text,
            SourceLanguageCode='en',
            TargetLanguageCode='ja'
        )
    
        output_text = response.get('TranslatedText')
    
        print(output_text)
    
        put = s3.put_object(
            Body=output_text.encode('utf-8'),
            Bucket=os.environ['BucketName'],
            Key='japanese_output'
        )

エラーが出なくなりました!

Lambdaのリトライ処理

改修前のコードでは、エラーが計3回発生していました。.write_access_check_file.tempが3回putされていたのかな?と思いましたが、改修後のコードをみる限り1回のみのようです。

残りの2回のエラーは何によって発生していたのかというと、以下の仕様によるものでした。

非同期呼び出し – Lambda は、関数エラーを 2 回再試行します。関数にすべての受信リクエストを処理する十分なキャパシティがない場合、関数に送信されるまで、イベントはキューの中に数時間または数日間保持される可能性があります。正常に処理できなかったイベントを把握するために、デッドレターキューを設定できます。
エラー処理と AWS Lambda での自動再試行

S3からのイベントによってLambda関数を実行する場合は、非同期呼び出しというパターンに該当します。非同期呼び出しによって実行されたLambdaでエラーが発生した場合、2回再試行するため、計3回のエラーとなっていたというわけです。勉強になります。

終わりに

ほぼ材料あるしすぐできるだろーと思って取り組んでみたら、コードの書き方が分からずにひたすらハマりました。普段やらないことをやるといろいろな気づきがありますね。きっかけを与えてくれたハンズオンには感謝です。似たようたハンズオンが以下から実施できますので、興味がある方は試してみては以下がでしょうか。

東京オフィスの千葉(埼玉出身)がお送りしました!