SFTPコネクタの転送完了時に自動的にAthenaによるクエリを実行し転送結果の集計をメールで受信してみる

2024.03.02

初めに

先日Amazon Transfer Familyの各種イベントがEventBridgeに発行できるようになり、こちらでSFTPコネクタの転送完了を検知し後続の処理を自動的に解することができるようになりました。

SFTPコネクタはコマンドを実行することでAWS側のマネージドなSFTPクライアントがSFTPサーバとS3間のファイル転送を行う機能となりますが、
転送処理は非同期的に実行されるためコマンドの完了=ファイルの転送完了とならないため別の手段で完了を把握する必要があります。

例えば以下では1GBほどのtext1.txtというファイルを作成し実行しそのままs3 lsを実行していますがこのタイミングではまだ完了していないことが確認できます。

$ aws transfer start-file-transfer --connector-id c-xxxxxx --retrieve-file-paths /home/sftpuser/test1.txt /home/sftpuser/text1.txt  --local-directory-path /bucketname/receive && aws s3 ls s3://bucketname/receive/
{
    "TransferId": "ff8b19a3-5d42-4f85-bbc8-fadba2355e3b"
}
2024-02-26 13:09:23         16 test1.txt
2024-02-26 12:50:35          0 test2.txt

この関係で以前は定期的に状態を確認する必要があるため実行時間や基盤によっては確認回数が増え余計なコストがかかってしまったり、ポーリング間隔を長くする場合は余分な待ち時間が生まれてしまいましたが、今回のアップデートによりイベント駆動に持ち込み実行を最適化できる余地が生まれました。

一応正常系であればSFTPサーバからAmazon S3方面の転送であればAmazon S3に対してのPutObjectイベント駆動でも正常系は処理できますが、転送に失敗した場合は失敗しているのかまだ最中であるのかは別の手段が必要となります。

今回はコチラのイベントを利用しデイリーバッチ等でCSVがSFTPサーバからS3に送られるような環境でその転送完了時にデータを集計しユーザに通知する仕組みを作ってみます

全体像

大枠の構成は以下の通りです。

詳細は後述しますが転送の初期化および実行をするステートマシン、転送状態を確認し必要に応じAthenaを起動するステートマシン、Athenaの完了を通知するステートマシンの3層で構成されています。

実のところ構想時はもっとシンプルになる予定だったのですが思ったのとちょっと違う、このパターンまずいみたいのがあったりしてややリッチになりました。
この構成でも正常パターンのみを考えてもイベントの発生タイミング次第でAthenaの起動が抜ける等まだ足りないとは思いますがひとまず大枠は見れるのでそういったパターンは今回見逃しています。

各ステートマシンについて

転送の初期化・実行

こちらのステートマシンは以下のようなフローで構成されています。

今回は実行の入力のファイルについては実行時に手で入力する形にしています。

後述のフェーズに一部関わるのですがSFTPコネクタによる完了イベントは1コマンド=1イベントでまとまっているわけではなくファイル単位で管理されているようです。

実際にtest1.txtおよびtest2.txtを同時に指定した場合の完了イベントをSNS経由で受信したところ以下の通りとなっておりました。(test2.txtに対し同等のものが別途もう1つ)

{
  "version": "0",
  "id": "xxxxx",
  "detail-type": "SFTP Connector File Retrieve Completed",
  "source": "aws.transfer",
  "account": "xxxxx",
  "time": "2024-02-26T04:09:23Z",
  "region": "ap-northeast-1",
  "resources": [
    "arn:aws:transfer:ap-northeast-1:xxxxx:connector/c-xxxxx"
  ],
  "detail": {
    "operation": "RETRIEVE",
    "connector-id": "c-xxxxx",
    "transfer-id": "xxxxx",
    "file-transfer-id": "xxxxx",
    "url": "sftp://xxx.xxx.xxx.xxx",
    "file-path": "/home/sftpuser/test1.txt",
    "status-code": "COMPLETED",
    "local-directory-path": "/xxxxxx/receive",
    "bytes": 16,
    "start-timestamp": "2024-02-26T04:09:22.931482Z",
    "end-timestamp": "2024-02-26T04:09:22.941393Z",
    "local-file-location": {
      "domain": "S3",
      "bucket": "xxxxxx",
      "key": "receive/test1.txt"
    }
  }
}

そのため複数のファイルを転送する場合はこれらの通知を集約し状態を管理する何かを用意した上で全て出揃った後にクエリを開始する必要が出てきました。

今回は費用面と利便性からDynamoDBを利用し実行前に全てそちらに未開始ステータスでデータを投入し参照することで管理するようにしております。

なお気時点ではStartTransferFile1コマンドで転送可能なファイル数は5つかつSFTPコネクタの同時の実行数制限自体も10となっております

(2023/03/05追記)理解に誤りがありサーバあたりのコネクションは1000まで行けるようです。SSH側の仕組みで1コネクションで複数接続が張れる?機能があるらしくそちらが最大10で実行数自体は1000まで行けるのでしょうか。

今回はその制限を超えることないように実行していますが、実際のワークロードであれば超えるケースの方が多いかと思いますのでそういった場合は一旦SQSに投入し完了イベントで呼び出されるステートマシンで次の処理を始めるような構成が必要となってきます。

Athena起動

こちらのステートマシンは以下のようなフローで構成されています。

完了に応じ先ほどDynamoDBに投入したデータを更新し未完了のものが1つも得られなくなれば(Count=0)Athenaが実行されるようになっております。 今回はシンプルにCountでデータ数を取るのみの処理です。 細かい同時実行制御をしていないので実行タイミングと結果整合性の特性が合わさるともしかするとAthena実行をせずに終了するパターンが出そうな気はします。

後処理としてDynamoDBのデータを消すようなことを今回していないのでどこかでクリーンアップを行わないと無限にデータが溜まり続けます。

通知

こちらのステートマシンは以下のようなフローで構成されています。

StartQueryExecutionStartTransferFile同様非同期的に実行されるAPIとなっております。
今回はせっかくSFTPコネクタ側をイベント駆動にしてこちらが定期的に確認というのも違和感がありますので完了をトリガーに起動するステートマシンを別に作成し分離します。

結果についてはAthena側でGetQueryResultsを実行することで別途S3にアクセスせずとも取得は可能でしたがJSONで階層化されており何となく見栄えが好きではなかったのでイベント内に含まれる設置先の情報を利用し別途S3側から取得し通知しています。

ちなみにGetQueryResultsで得られた結果のデータ部は以下のような形となります。

{
...
        "Rows": [
            {
                "Data": [
                    {
                        "VarCharValue": "DataCount"
                    }
                ]
            },
            {
                "Data": [
                    {
                        "VarCharValue": "0"
                    }
                ]
            }
        ],
...
}

テンプレート

今回はSAMを利用して構築しており、テンプレートは以下に格納しております。

SFTPコネクタ自体とSFTPサーバ部分は含まれていないため別途構築が必要となります。

実行

準備として転送用のTSVファイルをSFTPサーバ上に設置しておきます。
(中央の値は対応するシーケンス番号のハッシュ値です)

$ pwd
/home/sftpuser
$ cat sec1.csv
1       c4ca4238a0b923820dcc509a6f75849b        2022-11-25 08:58:23.151356
2       c81e728d9d4c2f636f067f89cc14862c        2022-11-25 08:58:23.151356
3       eccbc87e4b5ce2fe28308fd9f2a7baf3        2022-11-25 08:58:23.151356
4       a87ff679a2f3e71d9181a67b7542122c        2022-11-25 08:58:23.151356
5       e4da3b7fbbce2345d7772b0674a318d5        2022-11-25 08:58:23.151356
6       1679091c5a880faf6fb5e6087eb1b2dc        2022-11-25 08:58:23.151356
7       8f14e45fceea167a5a36dedd4bea2543        2022-11-25 08:58:23.151356
8       c9f0f895fb98ab9159f51fd0297e236d        2022-11-25 08:58:23.151356
9       45c48cce2e2d7fbdea1afc51c7c6ad26        2022-11-25 08:58:23.151356
10      d3d9446802a44259755d38e6d163e820        2022-11-26 09:00:24.923556
$ cat sec2.csv
11      6512bd43d9caa6e02c990b0a82652dca        2022-11-25 08:58:23.151356
12      c20ad4d76fe97759aa27a0c99bff6710        2022-11-25 08:58:23.151356
13      c51ce410c124a10e0db5e4b97fc2af39        2022-11-25 08:58:23.151356
14      aab3238922bcc25a6f606eb525ffdc56        2022-11-25 08:58:23.151356
15      9bf31c7ff062936a96d3c8bd1f8f2ff3        2022-11-25 08:58:23.151356
16      c74d97b01eae257e44aa9d5bade97baf        2022-11-25 08:58:23.151356
17      70efdf2ec9b086079795c442636b55fb        2022-11-25 08:58:23.151356
18      6f4922f45568161a8cdf4ad2299f6d23        2022-11-25 08:58:23.151356
19      1f0e3dad99908345f7439f8ffabdffc4        2022-11-25 08:58:23.151356
20      98f13708210194c475687be6106a3b84        2022-11-25 08:58:23.151356

最初のステートマシンをマネジメントコンソールから起動します。
引数として先ほどの2つのファイルのフルパスを指定し起動します。

起動して少し待つとデータの転送が完了し以下のように今回実行したSQL(SELECT COUNT(*) AS DataCount FROM sample_table)の結果がBODYに格納されたJSONがSNSで通知されます。
フォーマットの関係上見やすさに関してはGetQueryResultsとそこまで大差はないですが改行文字を実際の改行にしてしまえば通常のTSVのみで見やすいはずです。
(見やすさより安全のためにbase64でエンコードした方が良い気もしますが)

{"AcceptRanges":"bytes","ContentLength":17,"ContentType":"application/octet-stream","ETag":"\"8e3409564bab85e98b6829cbde2b4d0e\"","LastModified":"2024-03-01T14:23:42Z","Metadata":{},"ServerSideEncryption":"AES256","Body":"\"DataCount\"\n\"20\"\n"}

経過を見るとsec2.csvが先に完了しその後にsec1.csvが完了しそちらの処理でAthenaを実行しているようです。

終わりに

今回はSFTPコネクタの転送完了イベントを活用した簡単な(?)サンプルを作ってみました。

実際組み上げてみるとSFTPサーバを除けばマネージドでサーバの管理が不要、Lambda関数等でプログラムを書く必要はなくEventBridgeやStepFunctionsの利用で所謂ローコード的に構築が可能、イベント駆動のため余分なチェック処理も走らず従量課金のサービスは最低限のみの請求
と非常にクラウドネイティブな良い感じのサービスで完結することができました。

サイズが不定で大容量のものも混じってると定期的なチェックがのオーバーヘッドの削減、(今回は組み込んでいませんが)イベント駆動を活かしエラー発生時に即時に通知やリカバリ処理の復帰などそう言った面でメリットは感じられてくるかと思います。

逆に一定のファイルサイズで各ファイル自体も小容量であれば定期的なチェックの方が呼び出し回数が結果として減り、制御もシンプルになるかと思いますで一概にこの方式が良いとはいえずケースバイケースとはなります。

今回のアップデートでSFTPコネクタ周辺でイベント駆動の処理が可能とはなりましたが状況により最適な処理方式は変わってきますのでこの場合は何が良いのだろうかという選択肢の一つとしてぜひ色々考えていただければと思います。