【サーバーレス】18,570本のブログの形態素分析をやってみた

こんにちは!コンサルティング部のキムです。

私は韓国で昔から「晴れ男」によく呼ばれましたけど、東京来てからはあまり晴れた日が少なかった気がします。 ですが、直近の天気は良さすぎて毎日ハイパーになっています。さすがに東京の空は綺麗ですね!(韓国の空は微細ホコリのせいで綺麗な日が少ないです)

あ、「晴れ男」や「晴れ女」という表現は韓国語ではないものですが、面白いし、とても良い表現だと思います :D

さて、今日は先日小規模で試してみた「Developers.IO ブログで日本語の辞書を作ってみた」の内容をDevelopersIOブログの全体の規模で実装してみましたので、そのお話をしたいと思いました。

さすがにある程度の規模では小規模とは異なり、考えすべきなことがたくさんありました。こんなに簡単な作業(1つのLambda関数実行)を大規模で実装しながら私がハマったこと・わかったことを本記事にて纏めてみました。

それでは早速、現実的であり実装できるサーバーレスアーキテクチャーのために考慮すべきことが何かを私と一緒にみてみませんか!?

目次

背景(スキップしても構いません)

先日、私が書いたブログ「Developers.IO ブログで日本語の辞書を作ってみた」の内容のアウトプットを高度化して、実際に有用なものを作りたかったです。最初は形態素分析で集中してデーターの保存はDynamoDBに適当に格納していたのですが、それだけでは検索機能が足りなかったです。

なので、ローカルでのDocker基盤のElasticsearch clusterを動かせて、分析結果のデータをそのElasticsearch clusterに格納しました。そこまでには大丈夫でしたが、もっと有用なツールになるためには、サンプルの日本語の文章がより多く必要でした。せっかくなので、全ブログの規模の分析結果を入れてみようと思いました。

早速全ブログのURLリストを取集するクローラーを作ってテキストファイルにてローカルで保存しました。ターゲットになるブログURLは18570個でした。(ほぼ十日前の結果でしたので今は19000ほどあると思います!) そして既存の形態素分析のためのpythonコードでmulti threading機能を追加して起動してみました。それにも関わらず、時間も10時間以上掛かってしまい、実行途中で原因が分からないエラーが出てきてローカルではダメだと感じました。

ちなみにElasticsearchの検索エンジンには既に形態素分析の機能がありますが、私が将来最終的にやりたいことはこの形態素分析の結果が必要になるため、どうしてもやりたかったです。

以上、Developers.IOの全ブログの規模の分析作業はAWS上でやってみよう!というモチベーションの背景でした。

一体何がやりたいのか

ローカルでもう既に url_list.txt というファイルが用意されていました。このテキストの内容としては以下のようなただのURLリストのテキストファイルです。

url_list.txt

このファイルをインプットとして、URL毎に分け、形態素分析を行い、その結果をjson形式にてS3上に保存したかったです。

つまりINPUTとOUTPUTは以下のようにまとまれます。

  • INPUT : url_list.txt
  • OUTPUT : URL毎の形態素分析の結果JSONファイル * URLの数(=18,570)

ハマった話 - その1「Lambdaの上で実行できる仕様なのか」

ビックデータの処理をサーバーレスで構築するパターンに関しては、もう多くの公開事例もありますが、私はその規模の構築をやったことがなかったので正直何を考慮すべきかについてはイメージがあまりついてなかったです。(今でも当然足りないですが)

私がやりたいことを実装するだけの目的であれば、多分大きいサイズのEC2何個で十分だと思いますが、せっかくなのでサーバーレスで実装してみたかったです。 私はフリーランサーとしてサーバーレスAPIを何回か実装した経験がありましたので、サーバーレスの概念自体は問題なかったです。その経験のおかげで一番最初に検証したのが、既存のコードがLambda上で動くかどうかを確認することでした。

既存のコードはpython3.7基盤であり、spaCyというNLPライブラリーの日本語バージョンである GINZAを使っていました。 GINZAはプラットフォーム特殊なライブラリーを使っているのではないかと思ってEC2の上でGINZAライブラリーをインストールしてみたら、インストールが失敗しました。そういうことであれば、Lambda上のインストールも失敗する可能性が高い、っていうかLambdaがAmazon Linux基盤で実行されるからインストールが失敗することに決まっているという意味でした。

予想通りLambdaでもインストールがダメでした。もったいなかったですが、形態素分析のライブラリーを変えるしかなさそうでした。

色々試して見た結果、Janomeというライブラリーが良さそうに見えて、EC2(Amazon Linux 2)でインストールができるかどうかを確認してみました。結果は成功! 次はパッケージのサイズでした。Lambdaは実行に必要な全てのファイルを含めて unzipped deployment package size250MB 以下 という制限があって、このようなかなりヘビーなライブラリーは対象外になる可能性が高かいです。そこまでには昔の経験のためある程度予想ができました。Janome のライブラリー自体は120MBほどでしたが、他のライブラリーと一緒にデプロイすれば250MBが超えてました。

どのライブラリーが一番デカイのかを確認したところ、boto3 ライブラリーがかなり大きなサイズを握っていました。Boto3 ライブラリーはLambda環境で基本的に提供してくれるライブラリーであるため requirements.txt から boto3 ライブラリーを外したらちょうどいい感じのデプロイサイズになりました!

unzipped-size

Node.js 基盤で開発すれば devDependencies というものがあり、リリースする時は除外してくれる機能がありますが、python基盤だとこの機能に関してはネイティブなサポートが足りないと感じました。 今回のように boto3 は開発環境でも必須なライブラリーなので良い方法が無いか調べてみました。英語ですが、このブログがいろんな方法を教えてくれるので参考になりました。

ハマった話 - その2「Step Functionsのタスクのインプット・アウトプットのデータサイズに気をつけよう」

既存コードを少し修正してLambda上で実行できることを確認してからは、どのような構成で実装するかを悩みました。EMRはオーバースペックだと思って(知識も経験も無いので)とりあえずStep Functionsでやってみようと思いました。実はStep Functions一度もやったことなかったので、何も分からなかったです。

とりあえず最近発表された動的並列処理の機能を確認した上(ナイスタイミングでした。笑)、URL毎に分けて処理可能なのかを確認しました。S3の上 url_list.txt ファイルを配置しておいて、以下のような構成でできると思いました。

state-machine

  • Read URL List ステートではS3で url_list.txt を読み込んで、次のマップステートに url を array の形で伝達する関数でした。
  • Analyze Blog Post はハマった話 - その1で、私が作った形態素分析のためのコードでした。

ですが、結果はダメでした。

原因は Step Functions の制限である「タスク、状態(ステート)、実行の最大の入力または結果データサイズ」でした。 Read URL List ステートで結果として渡す URL array のサイズはMB単位のため、32KBのデータサイズの制限に比べたらありえないサイズでした。

step-functions-data-size-limit

他の方法を考えざるを得なかったです。

ハマった話 - その3「Lambdaの同時実行の数には制限がある」

Step Functions の制限の為、今回は url_list.txt 自体を url毎にファイルを分けてS3に保存し、Mapにて実行しようという考えを思いました。URL毎にファイルを分けると一つのファイルの中身は以下の様になりました。

{
   "url": "https://dev.classmethod.jp/cloud/aws/how-to-create-japanese-dictionary-with-devlopersio-blogs/"
}

Step Functions は以下の構成になりました。

state-machine-2

Step Functions の Mapにて実行されるLambda関数で Index の参照ができるので問題ないと思いました。 (参考として、Step Functionsのドキュメントは日本語版はかなり古い感がありますので、英語のドキュメントを読んだ方が良いと思います。)

これは必ず出来ると思っていたですが、隣に座っていた坂巻さんが「キムさん、Lambdaの同時実行の数の制限もあるよー」と教えてくれました。

そうです。私はこんなに基本的なLambdaの制限も知らずに今までサーバーレスの構成を考えていたのでした。(パニック)

確かに昔Lambdaの制限を検討した時(何年前の話です)、同時 実行の数が 1000 以上超える場合に備えて Concurrency コントロールが必要なんて、普通のサービスだとこの制限はあまりにも甘いのでは?と思ったことがありました。(その時はBackendのAPIとしての使い方しか考えられなかったです)

しかし、並列処理だと1000の制限はかなり厳しい制限だと感じました。さすがに Step Functionsではそういうところに備えて当然 Retry 等の設定が可能でしたが、それもそれなりに複雑度を増すし、URLファイルを分けることも綺麗じゃないと思ったので、より簡単な他の方法を調べようと思いました。

ハマった話 - その4「多数の並列処理の起動トリガーとして、DynamoDB Streamsは?」

※ 追記(2019/10/03):以下の内容は私が間違えていたので、改めて再検証を行いました。この再検証した記事をご参考ください。

他の良い構成を調べていたところ、DynamoDB Streams を活用すれば良いでは?という考えを思いつきました。

DynamoDB Streams は Lambdaが失敗してもリトライする(1日間の制限あり)ので Step Functions と異なり、Concurrencyコントロールの為に別の作業は要らないです。そしてURLリストのファイルが無くても DynamoDB だけで、すごく綺麗な実装が出来そうに見えました。

早速ローカルで url_list.txt ファイルを読み込んで、URL毎に DynamoDB にアップロードするコードを書きました。ストリームイベントでトリガーされるLambda関数を作って、この関数を形態素分析コードに紐つけて形態素分析を実行させるコードも用意しました。検証の為、100個のURLをアップロードしてみました。

結果は失敗でした。

100個のURLは確実にDynamoDBの中に格納されたんですが、何故か40個ほどのストリームイベントだけがトリガーされて、残りのストリームイベントはトリガーされてなかったです。他の作業をやりながら1時間以上を待っていても結果は同じでした。

いろいろ調べてみた結果、正確な原因はまだわからないですが、そもそもDynamoDB Streams の使い方として大規模のイベント並列処理はオススメではなかったということでした。大規模でストリーム処理をやりたい場合は Kinesis Streams を!ということでした。Kinesis Streams は並列処理の規模に従って shard の数のコントロールが可能ですが、DynamoDB Streams は shard 数のコントロールが出来ないので、Partition Key だけで暗示的に Shard 数をコントロールするしかないです。UUID を Partition Key として使ってもほぼ同じでした。

このところについて社内の優秀なエンジニアたちにもらったコメントは以下の通りでした。

「そもそもDynamoDB Streamsの使い方は、並列に動きすぎない、同期先の性能低いDBにあわせ、あえてゆっくり動かしたりするにはいい仕組み。」

ということでした。確かに今回私の作業はこのユースケースをよく知らずにやっていたので、あえて他の方法を探すようになりました。

実装出来たアーキテクチャー

考えていた構成案の中で残った案は以下の二つでした。

  • SQS -> Lambda
  • Kinesis Streams -> Lambda

18,000ほどの規模だとSQSを使っても十分実装出来ると思いましたので、とりあえずSQSで実装してみました。

結果は成功でした!!!

import boto3

bucket_name = 'XXXXXXXX'
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
size = sum(1 for _ in bucket.objects.all())

print(size)

s3-ls-count-result

が、なんか2個のファイルが足りなかったです。SQSのMessages In Flightもずっと2に固定されてありました。

sqs-messages-in-flight-2

Lambdaのマネコンで Recent Invocations を確認してみたら実行時間(Duration)が想定した時間より短いことを発見しました。ログで確認してみると問題の二つの対象URLが分かりました。

lambda-recent-invocations

一つ目は https://dev.classmethod.jp/server-side/language/python-scope-and-decorator/ でした。 クローリングしてURLを取集した際は存在していたURLだと思いますが、今は404になっていることを確認しましたので問題なさそうです。

二つ目は https://dev.classmethod.jp/etc/mesoko-joined-classmethod/ でした。 ちょっと面白かったですが、めそ子の記事のみページの構成が異なり、私が書いた html parse コードがエラーを出すようでした。

ちなみに「めそ子」は弊社で活躍している二次元社員です(笑)

このような問題を起こす記事がこれ一つのみで本当に良かったです。これも特に問題ではありませんので、成功だと思われます!!!!

SQSにメッセージを送る時、注意したこと

私は以下のコードをローカルで実行してSQSにメッセージを送りました。

import boto3
import time

def send_messages(url_dict_list):
	sqs = boto3.resource('sqs')
	queue = sqs.get_queue_by_name(QueueName='XXXXXXX')

	i = 0
	while i < len(url_dict_list):
		res = queue.send_messages(Entries=url_dict_list[i:i+10])
		i += 10
		time.sleep(1) # make 10 requests per second (18,000 requests for 30 mins)
		print(i)


	if len(url_dict_list[i:i+10]) > 0:
		res = queue.send_messages(Entries=url_dict_list[i:i+10])

def read_url_list():
    filepath = 'url_list.txt'

    url_list = []
    with open(filepath, 'r') as text_file:
        raw_content = text_file.read()
        url_list = raw_content.split('\n')
        url_list = url_list[:-1]

    return url_list


if __name__ == '__main__':
	
	url_list = read_url_list()
	
	url_dict_list = [ { 'Id': str(i), 'MessageBody': url } for i, url in enumerate(url_list) ]
	send_messages(url_dict_list)
	
	print('done')

ここで send_messages関数を見ると

while i < len(url_dict_list):
	res = queue.send_messages(Entries=url_dict_list[i:i+10])
	i += 10
	time.sleep(1) # make 10 requests per second (18,000 requests for 30 mins)

ということが書いてあります。boto3のqueue.send_messages()は最大10個のメッセージを送ることが出来ますので、10個づつ分けて送りました。

あと、time.sleep(1) ですが、SQS側でLambdaのConcurrencyのコントロールをやってくれますが、1000個のURLリストで検証した時、Lambdaのエラーが出すぎて、Queueのメッセージ数をコントロールする方が良いという印象を受けました。なので1秒に10個のURLづづ送るコード(1分に600個・LambdaのTimeout設定は1分にしました)になりました。理論的にLambdaの同時実行の数の制限にも問題ないように設定しました。

Lambdaが並列処理をやっている間のモニタリング

最初はLambdaのモニタリングはやるつもりではなかったんですが、実装の失敗を重ねながらちょっと気になっていた為、何が起こっているのかをLambdaのマネコンのCloudWatch メトリクスを通じて観察してみました。

Lambda-metrics

メトリクスの値は5分毎に集まるので、5分より細かい単位でメトリクスを分析するためにはある程度の推測が必要です。

先ずは Invocations を見て、この作業は約30分間行われていたのが分かります。5分間最大3,000回実行されていたので1分ではおそらく最大約600回の実行が行なっていたのが分かります。 これはメッセージを送る際の意図と一致することでした。

実行時間のDurationですが、タイムアウトを1分で設定したので最大1分の実行時間があったのが分かりました。タイムアウトを正確な数はこのグラフだけ見ると分からないところが残念でした。

エラーと成功のグラフを見ていた時は正直ちょっと怖かったです。エラーがゼロではなく、それも260回まで上がってきて何か問題あるのかなと心配してたんですが、正常に全てのURLの分析結果が出来上がりましたので、DevelopersIO のサーバーでレスがなかった時とエラーではないのかということを推測してみました。多分、その中には↑の問題の二つのURLのため、無限に繰り返して失敗した結果も含まれているはずですが、その割合が分からないところは仕方がなかったです。

特に Throttles のグラフは素晴らしいものでした。Throttles メトリクスとは公式ドキュメントを見ると以下のように記載されております。

「お客様の同時オペレーションを超える呼び出しレートのために調整された Lambda 関数の呼び出し試行の回数を測定します (エラーコード 429)。失敗した呼び出しは、成功する再試行のトリガーとなる場合があります。」

つまり、Throttles が実行されていた間ずっとゼロになったことはLambdaの最大同時実行数には届かなかったという意味です。 これは私が最初 time.sleep(1) 無しで、一発で全てのURLをSQSに送った時も同じでした。SQSの方でLambdaの最大実行数をコントロールしてくれるってことがグラフを通じても分かりました。

費用について

この作業のためかかった費用は以下の通りです。検証も含めた全ての費用です。

Lambda

lambda-cost

SQS

sqs-cost

S3

s3-cost

ポイントまとめ

今回の作業をやってみて、サーバーレスの構成を考える時に注意することについて分かったことは以下の通りです。

  • Lambdaで実行できる仕様なのか確認する。
  • AWSサービス毎の制限事項を必ず確認してから作業する。
  • Lambda、もしくはサーバーレスは万能ではない。特にThrottlingを常に考えないといけない。
  • サービスや機能毎のユースケースに注意する。
  • クラスメソッドのエンジニアたちは素晴らしい。AWSのことならばクラメソのエンジニアに何でも聞いてほしい。(本記事の問題点に関する解決策はだいぶ優秀なクラメソのエンジニアたちに教えてもらいました)

最後に

本記事の目標作業について本記事の構成が完璧な構成だ!!とは絶対に言えません。ただ、一つの有効な構成を考えて実装するまでの経験を共有したいと思って本記事を書きました。

今後も私の日本語の勉強をより加速化してくれるこのツールのアップデート記事はガンガン書くつもりですので、お楽しみにしてください!:D