AWS Transfer Familyのワークフロー機能をつかってみた

2022.03.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

いわさです。

先日こんなアップデートがありました。
Transfer Familyのマネージドワークフローが強化されたそうです。

マネージドワークフローはここ半年ほどで登場した機能のようで、なぜかDevelopersIOでも見当たらないですね、たまたま忘れ去られてしまったか。
そこで、今日はマネージドワークフローの基本的なところから触ってみることにしました。

マネージドワークフロー

AWS Transfer Familyで転送されたファイルに対して何か処理を行いたい場合は、S3イベント通知からのLambda実行するケースが多かったと思います。
2021年9月ごろにTransfer Familyではマネージドワークフローという機能が登場し、転送されたファイルに対してTransfer Familyマネージドなワークフローを簡単に組み込むことが出来るようになっていました。

これは、最高なのでは。

Transfer Familyコンソール内のワークフローメニューから作成を行います。

正常時のステップと例外時のステップをそれぞれ複数設定することが出来ます。
ステップタイプは事前定義されたものが3つと、カスタムファイル処理ステップが選択出来て、1つのワークフローあたり最大8つまで設定が可能です。

一点注意点があって、作成したワークフローは編集が出来ません。
サーバーにワークフローが関連付けられている場合は、新しいワークフローを新規作成し、サーバーの設定で紐付けを変更する必要があります。

例外ステップは本日は使わないですが、ワークフローが失敗したときに実行するステップとなり、例えばファイルのアップロード後にファイルのチェックや加工を行う場合として失敗した場合に、失敗したものをディレクトリなどに移動させてのちほど処理したタグをつけたり、あるいは通知を行ったりすることが出来ます。

ポイントとして、ワークフローはアップロードが成功・終了したときのみトリガーされます。また、サイズが0の場合はトリガーされません。

事前定義されたステップ

事前定義されたステップは「ファイルをコピー」「ファイルにタグを付ける」「ファイルを削除」が用意されています。
ファイル削除以外のステップはEFSでは利用できずS3のみで利用が出来ます。

ここでは転送されたファイルをキープレフィックスを指定してコピーしてコピーさせてみます。
削除ステップも組み合わせると、移動が実現出来ますね。
移動先は同一リージョンである必要があるのでご注意ください。

サーバー設定にマネージドワークフローを設定する箇所があり、そこで作成したワークフローを設定します。
ここで設定するロールはワークフロー実行時のロールです。S3ファイルコピーを行うのであればコピーするための権限が必要で、タグ付けをするのであればタグを設定するための権限が必要です。
また、後述のカスタムファイル処理ステップではLambdaでカスタム処理を実行するので、Lambdaの呼び出し権限が必要となります。

sftp> put hoge.txt hogehoge.txt
Uploading hoge.txt to /20220308transfertest/hogehoge.txt
hoge.txt                                                                                                                                     100%    5     0.1KB/s   00:00    
sftp>

hoge.txtをアップロードしてみると...

$ aws s3 ls 20220308transfertest --recursive
2022-03-08 15:17:12          5 copied/hoge/hogehoge.txt
2022-03-08 15:17:11          5 hogehoge.txt

copied/hooge/直下にコピーされました。今回は1ファイルアップロードしただけで、タイムスタンプを見る限りタイムラグは1秒以内でしたが、複数のファイルや大容量ファイルを使った場合の検証を今度行ってみます。とても興味がある。

ちなみにワークフローに関するログは、通常のTransfer Familyのロググループ内に別のストリームで出力されます。
ストリームにワークフローIDが含まれているのでそこから判断しましょう。

カスタムファイル処理ステップ

次はカスタムファイル処理ステップです。
要は、ワークフローがLambdaを実行するので後は好きなようにしてくださいという感じです。

ここでは、Lambdaのイベント引数をSNSへ送信するLambda関数を作成しました。
注意点として、以下ハイライト部分で、ステップの結果(成功・失敗)をワークフローに送信する必要があります。
よってLambda関数には最低でも、Transfer FamilyへのSendWorkflowStepState権限が必要となります。

import json
import boto3

transfer = boto3.client('transfer')
sns = boto3.client('sns')

def lambda_handler(event, context):
    print(json.dumps(event))

    sns_response = sns.publish(
        TopicArn='arn:aws:sns:ap-northeast-1:123456789012:hoge-sns',
        Message=json.dumps(event),
        Subject='hoge'
    )

    transfer_response = transfer.send_workflow_step_state(
        WorkflowId=event['serviceMetadata']['executionDetails']['workflowId'],
        ExecutionId=event['serviceMetadata']['executionDetails']['executionId'],
        Token=event['token'],
        Status='SUCCESS'
    )

    print(json.dumps(response))

    return {
      'statusCode': 200,
      'body': json.dumps(response)
    }

事前定義ステップと異なる点としては、タイムアウトの設定が存在することです。
タイムアウトの最大値は30分のようです。長い。

以下がこのカスタムステップから送信されたSNS通知の内容です。
この内容からS3にアップロードされたファイルにアクセスするなり、ファイル名を確認するなり、するということです。

{
    "token": "NjBkZWMwNjUtMGZlMC00Y2MzLThmOWQtMDIxOTk3ZDVjOTg4",
    "serviceMetadata": {
        "executionDetails": {
            "workflowId": "w-9c9567b35e2566c4d",
            "executionId": "4c6e3adc-0b24-4f02-b58e-5ee985d39911"
        },
        "transferDetails": {
            "sessionId": "bcd3d2c36a9fb4e7",
            "userName": "hoge",
            "serverId": "s-4611c73ca8094ada8"
        }
    },
    "fileLocation": {
        "domain": "S3",
        "bucket": "20220308transfertest",
        "key": "hoge-after.txt",
        "eTag": "c59548c3c576228486a1f0037eb16a1b",
        "versionId": null
    }
}

さいごに

本日はTransfer Familyのマネージドワークフローを試してみました。
気になるのは大量のファイルや大きなサイズのときにどういう挙動をするかですね。ある程度並列実行されるのかキューイングされるのか。そのあたりは次回以降にもう少し検証したいと思います。

直近でTransfer Family経由でカスタムロジックを実行する予定があったのですが、ちょうどこの機能使えそうだなと思ったので検証する良い機会でした。