完了イベントとSQSを利用してStep Functionsで非同期な処理の最大同時呼び出し数を制御する

2024.03.05

初めに

先日以下の記事にてSFTPコネクタの完了通知を利用した実装例として結果を集計するシステムを作成しました。

MAPではMaxConcurrencyを指定することで同時実行数は制御可能ですが、上記記事で利用しているようなStartTransferFileStartQueryExecutionに関しては非同期処理となっており実際の実行状態に関わらず呼び出し事態に成功していればそのAPI自体は呼び出し完了として返却するためその同時実行数から外れてしまうものとなります。

今回は上記記事の構成を拡張しSQSを利用することで非同期処理の最大同時実行数に制御をかけてみます。

テンプレート

以下に格納しております。

拡張する要素

大枠の処理は上記の記事をご参照ください。

最初のキックするステートマシンでデプロイ時に指定した最大実行数を超える指定ファイルをキューに投入し、それ以外のファイルを前回同様に処理させるようにし、
完了イベントの際に呼び出される処理でキューをチェックし存在していればそれを新たに転送開始するようにします。

実際にはその他のリソースも含んでおりますが大枠としては完了イベントをトリガーに条件を満たすまで開始することで最初にキックした転送実行数を最大数として処理を続けます

具体的には以下のように変更します。Athena実行完了ステートマシンは特に変更していません。

転送実行ステートマシン

以下のようにDynamoDB登録後に投入後に指定した数を超えた場合はそれをSQSにSendMessageで送信するようにしています。

ConvertInputの部分は以下のように入力パラメータを変換する処理をしています(Choiceの入力時に変換できないので分離)。また取り扱いやすくするためにスライスで実行用のパス群とキューイング用のパス群を分離しています。

(ないとは思いますが)Parallelを利用して並列で処理させると実行完了と投入タイミング次第でキューのチェック処理の後にデータが投入されるケースがありそうなので一応直列で実行しておく方が安全そうです。

完了後処理

前完了チェックの前にキューからのデータの取り出しを追加し、それが存在していればStartFileTransferを呼ぶ形にしています。

転送完了イベントの処理で転送開始処理を呼んでいるため条件(キュー内にデータが存在)する限り繰り返していきます。

実行

最大実行数を3にした上でデプロイし、5つのファイルを指定して処理を開始します。

この状態で処理を開始すると配列変換部分で処理されるファイル3つと、それ以外のファイルに分離されます。

少しわかりづらいですがキューへの投入処理として超過した2つ分の処理がキューに投入された後、メインの転送処理では3つ分の転送が処理されます。

最初の1つ目の実行完了イベントで呼びされたステートマシンを確認してみるとキューに格納した値を取り出し再度転送処理を開始していることができます。

転送完了でキックされるステートマシンの4つ目の実行結果を見るとすでに5つ目の転送が始まっているため以下のようにキューのチェック終了後は何もせず終了しています。

最後の実行結果を見るとその他の処理が終了しているためここでAthenaのクエリを実行し終了します。

試してる中でやはりDynamoDB周りの同時実行制御が甘く複数通知が飛んでくることがあったので取り扱いが多くなればなるほどこの辺りについては注意が必要なものとなりそうです。

終わりに

キューをうまく使うことで非同期処理でMAP側で制御が難しい処理も最大実行数を制御し実行させることができました。

完了イベント内で開始処理を行うことで非同期的な処理であっても実質的にループ処理が可能とはなりますが、当然キューからのデータを取り出し後の削除抜け等本来止めるべきトリガーの部分に誤りがあると無限に呼び出されるため通知や別途最大実行回数をチェックするようなものがないと予期せぬ高額請求が発生する危険性はあるためその点は十分注意しましょう。