サーバーレスパターンから学ぶデータ分析基盤構築 #devio2022

DevelopersIO 2022のビデオセッションにて、「サーバーレスパターンから学ぶデータ分析基盤構築」というテーマでお話しました。その動画と関連資料をご紹介します。
2022.07.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の笠原です。

Developers.IO 2022 〜技術で心を揺さぶる3日間〜 にて 「サーバーレスパターンから学ぶデータ分析基盤構築」というテーマでお話ししましたので、内容を簡単にご紹介します。

セッション動画

セッションスライド

内容

AWSが公開している「サーバーレスパターン」の中から、データ分析に使えるパターンをいくつか取り上げて紹介し、実際の構築例を交えながら簡単に紹介しています。

また、Step Functionsを用いたサーバーレスデータ分析処理について、実際に構築した際にハマったポイントを紹介しています。

サーバーレスパターン

サーバーレスパターン

AWSが「サーバーレスパターン」として、公開しています。 ユースケース別に16パターンが公開されていますので、やりたいことベースでパターンを組み合わせて利用することができます。

今回は、サーバーレスパターンの中から、データ分析に使えるパターンをいくつか取り上げています。

シンプルなデータ加工

画像処理やシンプルなデータ加工で使えるパターンです。 S3バケットにデータファイルを格納したら、Lambda関数を起動してファイルを加工し、バケットに格納するというものです。 Lambdaでは、我々はよくPythonランタイムでPandasライブラリを使えるようにして、データファイルをPandasのデータフレームとして取り込んで処理しています。 S3からのファイルの取得と格納では、Lambda内の /tmp 領域に一時保存して処理しています。 この際、 /tmp に格納したファイルは処理後に明示的に削除することをオススメします。

また、データ整形後は同じバケットの別のパスに格納することもOKですし、別のバケットに格納することもOKです。 データ取得したファイルの元の場所と同じ場所に格納してしまうと、Lambdaの起動が無限ループで発生して止まらない状態になってしまうことがありますので、 Lambda起動イベントの設定に注意しましょう。

分散並列処理

S3バケットにデータファイルを格納したら、処理するデータ毎にLambda関数を複数起動するパターンです。 データ毎に分散処理できるので、トータルの実行時間が短く済みます。

例えば、年度毎のデータファイルを用意して、年度毎にLambda関数を起動し各々処理するパターンが考えられます。 データ毎に処理内容が異なる場合は都度関数を定義する必要がありますが、これによってトータルの実行時間が短くなるので、 Lambdaの15分制限を回避する利点も生まれます。

イベント駆動の業務処理連携

SNSとSQSを組み合わせて処理を連携するパターンです。

SNSでファンアウトできますので、並列実行できますし、単一実行でもデバッグ用途にイベントメッセージ内容の確認にも使えます。 SQLのキューにイベントメッセージを格納できますので、Lambda実行でのエラーハンドリングなんかにも使えます。 また、FIFO SNS/FIFO SQSで順序性を考慮することも可能です。 より信頼性のある連携方法を構築したい場合に使えるパターンです。

SQSキューの前にSNSトピックを挟むことについては、以下の記事が詳しいです。

【AWS】SQSキューの前には難しいこと考えずにSNSトピックを挟むと良いよ、という話 | DevelopersIO

アプリケーションフロー処理

データファイル数が多かったり、複雑なデータ加工処理が必要な場合は、Step Functionsを使ってデータ処理フローを構築することをオススメします。

Step Functionsのステートマシンを構築し、ワークフローを制御します。 ParallelタスクやMapタスクを活用してLambda関数を並列実行できたり、並列実行した結果を待ってから次の処理を行うといったことも簡単に行えます。

データファイルサイズが小さくて大量にあるケースで有効です。

実際に構築したステートマシンでは、「データ取得部」「データレイク加工部」「データマート加工部」と行った形で、ステートマシン上で処理のフェーズを分けて構築しており、 日次処理/毎時処理といった時間駆動でバッチ処理しています。 各タスクでLambda関数を起動し、Lambda関数の中で「S3バケットからデータ取得」⇒「データ加工」⇒「S3バケットに格納」を行なっています。 データ取得先毎やデータの種類毎にParallelタスクで並列処理したり、 データファイル毎にMapタスクで動的並列処理したりして、処理時間が短くなるように工夫しています。

毎時処理での例では、ファイル数が非常に多くて1つのステートマシンでは捌ききれないため、 ステートマシンを入れ子構造にすることで、対応しています。

非常に便利なStep Functionsですが、ハマった点がいくつかありましたので、紹介します。

Step Functionsのクォータ

Step Functionsのイベント履歴数の上限25,000イベントに簡単に到達してしまいます。 上限緩和不可の項目なので、上限に引っかからないように回避する必要があります。

対処としては、ステートマシンを入れ子構造にすることで回避しました。 入れ子構造にすることで、Mapタスクで動的並列にステートマシンを呼び出すこともできるため、 うまく設定するとLambdaの同時実行数も簡単に増やせます。 そのため、Lambdaの同時実行数1,000を超える場合はLambdaの上限緩和申請も併せて検討してみましょう。 また、Mapタスクの最大同時実行数を設定することも検討してみましょう。

Athenaのクォータ

Step Functionsのステートマシンの最後に「データマート加工部」があり、 データマート用に複雑な加工処理を行なっている部分を構築していた時に遭遇しました。

Athenaは基本的にデータ分析用途で使用するサービスですが、今回の例ではデータ取得用途で使ったものです。 データ分析用途のサービスですので、同時にクエリ実行できる数はデフォルトで 20 となっています。(バーストで 80 まで一時的に増えます)

上限緩和申請可能な項目ですが、今回は上限緩和申請をせずに対応しました。 データマート用のデータファイル毎にMapタスクで動的並列実行している部分に、最大同時実行数を設定することで対応しました。 これでAthenaの start-query-execution APIコールを抑えることで回避してます。

ちなみに、lambdaからAthenaのクエリを使用してデータ取得する方法については、以下のAKIBA.AWSの登壇資料をご参考ください。

[AKIBA.AWS ONLINE #07]S3にあるデータをAthenaのクエリで取得してLambda ( Pandas ) で加工してみた[資料公開] | DevelopersIO

Lambdaのスケールが追いつかない

LambdaのRateLimitエラーに遭遇しました。 ただし、同時実行数の上限緩和申請済の状態で、各関数のモニタリング内容を見る限り、その同時実行数に達していないことがわかりました。

調べてみると、バースト同時実行クォータは東京リージョンでは1,000となっていますが、 最初のバーストの後は、関数の同時実行数は1分毎にさらに500インスタンス増加とドキュメントに記載がありました。 そのため、バースト後のスケールが追いついていないケースが考えられました。

この場合は、Mapタスクの最大同時実行数の見直しと共に、Retry設定を見直して、 Lambda.TooManyRequestsException 発生時でリトライするようにステートマシンを設定しました。

データレイク周りのデータ加工

規模の大きいデータ処理にはGlueを使うパターンを考えましょう。

Lambda関数の最大実行時間(15分)を超えるようなデータ加工の場合はGlueを使用します。 最近では、Glue Databrewも出ましたので、Step FunctionsからGlue Databrewのジョブを起動する組み合わせもありますね。

処理するファイル数が多く、各ファイル容量が小さい場合は、Lambdaの起動の速さ、同時実行数の多さの利点が活きてきます。 ファイル容量が大きく、処理時間が多くかかるデータを処理する場合はGlueを使った処理が良いでしょう。 ケースに応じてうまく使い分けていきましょう。

まとめ

サーバーレスパターンの中から、データ分析基盤に使えるパターンを中心にご説明しました。 また、Step Functionsを使ったワークフロー制御で色々ハマったので、その内容をご紹介しました。

まずは、シンプルなデータ加工の基盤を構築してみましょう。 そして用途に応じてサービスをうまく使っていきましょう。