Amazon S3のマルチパートアップロードの流れをコードを書いて理解した

2023.04.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

初めに

S3では巨大なファイルを分割して送信できるようにマルチパートアップロードと呼ばれる機能が存在しています。

AWS CLI を使用して、Amazon S3 にファイルをマルチパートアップロードする方法を教えてください。

これまで高レベルAPIでの呼び出しで細かく意識することがなかったのですが、
現在書いているfor RustのSDKでは高レベルAPIの機能が存在しないため具体的な処理と向き合う必要が出てきました。

今回は実際にコードを書いてそのフローを理解します。

※ 実際のコードを差し替えている関係で一部利用されない変数等が存在します。
このプログラムについてはそのうち記事を書くと思います。

マルチパートアップロードのプロセス

マルチパートアップロードのプロセス

マルチパートアップロードは以下の3つのステップで構成されます。

  1. マルチアップロードの開始
  2. パートデータのアップロード
  3. マルチアップロードの完了

注) 以降の図は個人的なイメージです。実際の内部処理は不明ですのでご注意ください。

マルチアップロードの開始

最初からいきなりデータアップロードではなく
まずはクライアント側から開始APIによりアップロードの識別しであるアップロードIDを発行します。

開始対象のファイルへのパートの追加やキャンセル、完了等の操作はこのIDを指定して実行します。

pub struct S3 {
    //aws_sdk_s3::new()の結果
    client: aws_sdk_s3::Client,
    //バケット名
    bucket: String
}
...

impl S3 {
    ...
    pub async fn upload(&self,key: &str, binary: Vec<u8>) ->  Result<(), Box<dyn std::error::Error + Send + Sync>> {
      let info = self.client.create_multipart_upload()
                      .bucket(&self.bucket)
                      .key(key)
                      .send().await?;

      println!("upload_id is {}", info.upload_id().unwrap_or_default());

      //リストから今回のアップロードIDの情報だけ出力
      let list = self.client.list_multipart_uploads()
                      .bucket(&self.bucket)
                      .send().await?;
      list.uploads().unwrap_or_default().iter().for_each(|upload| {
          if upload.upload_id().unwrap_or_default() == upload_id { println!("{:?}", upload); }
      });

      Ok(())
    }
...
}

出力結果は以下の通りです。
進行中のマルチパートの一覧からはバケット情報やオーナー情報等が得られます。

upload_id is mxQB7cuHa6fay.iPBgUzp97zS_VYFFZBmRthHVi6QJHyFvYDhcosIDjCxOL7sG65KMGKAKKnM27TnlqmttK7mg.PmiXVdjEsSZP1EyIpGM0nrg4PtNMJvgh4EhITIgrp
MultipartUpload { upload_id: Some("mxQB7cuHa6fay.iPBgUzp97zS_VYFFZBmRthHVi6QJHyFvYDhcosIDjCxOL7sG65KMGKAKKnM27TnlqmttK7mg.PmiXVdjEsSZP1EyIpGM0nrg4PtNMJvgh4EhITIgrp"), key: Some(key), initiated: Some(DateTime { seconds: 1680265029, subsecond_nanos: 0 }), storage_class: Some(Standard), owner: Some(Owner { display_name: Some("xxxxx"), id: Some("xxxxxxx") }), initiator: Some(Initiator { id: Some("arn:aws:iam::xxxxxx:user/xxxxx"), display_name: Some("xxxxx") }), checksum_algorithm: None }

なお、完了しないまま同名ファイルのマルチパートアップロードを開始した場合、既に存在するアップロードIDが返却される…という仕様はなく別のマルチパートアップロードが作成されます。

オブジェクトのアップロードの一時停止と再開
オブジェクトの複数のパートを徐々にアップロードできます。マルチパートアップロードを開始した後は終了期限がありません。マルチパートアップロードは明示的に完了または停止する必要があります。

また一度アップロードIDが作成された時点で明示的に処理しない限り不完全な状態でマルチアップロードは残り続けます。

現状マネジメントコンソールから確認できるものではないので意識しないと不完全のデータに気づくのは難しいものになりますが、
幸いライフタイムサイクルで削除可能なものですので必要に応じ設定しておきましょう。

パートデータのアップロード

データのアップロードとしてはこの部分が本体になります。 先ほどのステップで取得したアップロードIDに順番を判別するパート番号を付与してアップロードしていきます。

パート番号によって、アップロードするオブジェクトに含まれるパートとその位置が一意に識別されます。選択するパート番号は、連続している必要はありません (例えば、1、5、14など)。以前にアップロードしたパートと同じパート番号を使って新しいパートをアップロードした場合、以前のパートは上書きされます。

パート番号はクライアント側で1~10000の範囲で独自に採番します。
最終的にこの番号の順番でS3側でパートが結合されますが、必ずしも連番である必要はなく飛び番でのアップロードも可能です。

順序的にB→Aで送りたいけどA→Bの順番で結合したい! という時にAは1からBは1001から採番しようのような使い方も可能です。

ただし、番号が重複した場合は上書きされる点と番号は10000までになる点は注意しましょう。

以下は0xiのデータをループ処理でパートアップロードする場合の処理例です。

    //先ほどのcreateとlistの処理の間に追加
    let mut upload_result: Vec<UploadPartOutput> = Vec::new();
      for i in 1..10 {
          upload_result.push(
              self.client.upload_part()
                  .bucket(&self.bucket)
                  .upload_id(upload_id)
                  .part_number(i32::from(i))
                  .key(key)
                  .body(ByteStream::new(From::from(vec![i])))
                  .send().await?
          );
      }
    upload_result.iter().for_each(|result| {
        println!("{:?}", result);
    });

ちなみのコードはデータが原因で完了処理に失敗します(後述)。

upload_part()のレスポンスの出力は以下です。

UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"55a54008ad1ba589aa210d2629c1df41\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"9e688c58a5487b8eaf69c9e1005ad0bf\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"8666683506aacd900bbd5a74ac4edf68\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"ec7f7e7bb43742ce868145f71d37b53c\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"8bb6c17838643f9691cc6a4de6c51709\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"06eca1b437c7904cc3ce6546c8110110\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"89e74e640b8c46257a29de0616794d5d\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"e2ba905bf306f46faca223d3cb20e2cf\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
UploadPartOutput { server_side_encryption: Some(Aes256), e_tag: Some("\"5e732a1878be2342dbfeff5fe3ca5aa3\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, sse_customer_algorithm: None, sse_customer_key_md5: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }

注意点として最終パート以外はは全てのパートが5MiB以上である必要がありますが、
アップロード時点では下回っていてもエラーとならない点です(上記のコードのケース)。

後述の完了処理にて初めてエラーが返却されます。

マルチアップロードの完了

マルチパートアップロードは完了処理を持って進行中状態を抜け出しアップロードしたデータが通常のS3のオブジェクトして参照可能となります。

順序が異なっていてもパート番号順に結合されるかを確認したいので非同期処理で実行します。

非同期処理に対応するために全体的にコードを調整しているので全体像を再掲します。

//今回テストデータをvec<u8>で直接生成しているためbinaryの値は利用していません
pub async fn upload(&self,key: &str, binary: Vec<u8>) ->  Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let key = Arc::new(key.to_string());
    //マルチアップロードを開始
    let info = self.client.create_multipart_upload()
        .bucket(self.bucket.as_ref())
        .key(key.as_ref())
        .send().await?;

    let upload_id = Arc::new(
        info.upload_id().unwrap_or_default().to_string()
    );
    println!("upload_id is {}", &upload_id);

    let mut upload_tasks: Vec<JoinHandle<Result<CompletedPart, SdkError<UploadPartError>>>> = Vec::new();
    for i in 1..10 {
        let upload_id = upload_id.clone();
        let key = key.clone();
        let client = self.client.clone();
        let bucket = self.bucket.clone();
        //パートアップロードを実行
        upload_tasks.push(
            tokio::spawn(async move {
                let res = client.upload_part()
                    .bucket(bucket.as_ref())
                    .upload_id(upload_id.as_ref())
                    .part_number(i32::from(i))
                    .key(key.as_ref())
                    //各パートは0xiiを5MiB分埋めたデータ(i=1の場合0x0101)
                    .body(ByteStream::new(From::from(vec![i; 5 * 1024 * 1024])))
                    .send().await?;
                println!("[End] upload part {}", &i);
                Ok(
                    CompletedPart::builder()
                    .set_e_tag(res.e_tag)
                    .part_number(i32::from(i))
                    .build()
                )
            })
        );
    }
    let upload_results = future::try_join_all(upload_tasks).await?;

    //complete処理に引き渡すためにパートアップロードの結果を加工する
    let mut part_info = Vec::new();
    for result in upload_results.into_iter() {
        part_info.push(result?);
    };
    //完了処理
    let complete_result = self.client.complete_multipart_upload()
                    .bucket(self.bucket.as_ref())
                    .key(key.as_ref())
                    .multipart_upload(
                        CompletedMultipartUpload::builder().set_parts(Some(part_info)).build()
                    )
                    .upload_id(upload_id.as_ref())
                    .send()
                    .await.unwrap();
    
    println!("{:?}", complete_result);
    //進行中のマルチアップロードを出力
    let list = self.client.list_multipart_uploads()
                    .bucket(self.bucket.as_ref())
                    .send().await?;
    println!("-------part info-------");
    list.uploads().unwrap_or_default().iter().for_each(|upload| {
        if upload.upload_id().unwrap_or_default() == upload_id.as_ref() { println!("{:?}", upload); }
    });
    println!("-------part info-------");
    Ok(())
}

出力は以下のようになります。

upload_id is zpVbbXwyXjsagc1YsEYgfO5nTKdGEAWK0j5KVZY9U7donyXMr_RBbL1mAUUSDlfJxwgiKdqeI1ZT0WE_Pm9pd4z0nKz_YvqvGDhHtbySedmdIPU.7608bIzzIQeYDbwV
[End] upload part 1
[End] upload part 2
[End] upload part 6
[End] upload part 7
[End] upload part 9
[End] upload part 5
[End] upload part 8
[End] upload part 3
[End] upload part 4
CompleteMultipartUploadOutput { location: Some("https://xxxxx.s3.us-west-2.amazonaws.com/test%2Fdata"), bucket: Some("xxxxx"), key: Some("test/data"), expiration: None, e_tag: Some("\"14846d6ed5c60ec55b933af6c99181ce-9\""), checksum_crc32: None, checksum_crc32_c: None, checksum_sha1: None, checksum_sha256: None, server_side_encryption: Some(Aes256), version_id: None, ssekms_key_id: "*** Sensitive Data Redacted ***", bucket_key_enabled: false, request_charged: None }
-------part info-------
-------part info-------

list_multipart_uploads()は進行中のパートアップロードのみが表示されるため完了処理を行った今回のパートアップロードの情報は出力されなくなります。

1→2→6の順番にアップロードされていますが、
ファイルを確認すると境目となる10MiB目の部分で想定通り3番目のパートが結合されていることが確認できます。

非同期処理において非常に強力

マルチパートアップロードはよくネットワークのスループットの向上がメリットとして挙げられている印象が個人的にはあります。

ただ個人的には非同期処理を行うことによるリソースの有効活用の部分も非常に大きいのではないかなと感じました。

切り替えのオーバーヘッドもある為ここまで綺麗には行かないとは思いますが、
本来であればディスクのI/O+ネットワークのI/O分の時間がかかるところをネットワークのI/O中にディスクのI/O処理ができるだけでも短縮できます。

あくまで今回のケースで感じた点でありシステムの構成や環境等によるため何が一番良い部分かというのは変わるとは思います。

送信前にファイルが完成している必要はない

さらに言えば分割したパートデータは最終的にS3側で結合されるため、
送信元で完全なデータを持っている必要はなく部分的にも出来上がった側から送信することが可能です。

まずはシンプルに一度ZIPファイルを作成しアップロードする場合を考えてみます。

この場合アップロードするには一度ZIPファイルの完成を待つ必要がありますし、
処理元でこのZIPを保持する必要がなければその分のディスクのI/Oが勿体なく感じます。

ZIPファイルのフォーマットを確認してみると大雑把には以下のようになっています。

ファイルエントリ毎に各ファイルのデータ本体、ヘッダも同様にファイル毎で持って独立しているため個々に処理がが可能です。

結合処理はS3側で行えるため実のところZIPファイルを送信元で作る必要はなく、
各データの処理した結果を送信するだけでS3側で結合しZIPという形にしてもらうことができます。

このように一度ZIPを作るのではなく、
個別のデータを送信することでより処理時間を短縮することが可能となります。

ただし処理自体も複雑になりますし、
圧縮処理が終わるまでサイズが不明な中パート毎の5MiB制限をどうするかなど、
考慮時点も多くメンテナンスコストとのトレードになる可能性がある点は注意しましょう。

終わりに

今回実際に低レベルAPIを利用してマルチパートアップロードを実行することで、
処理フローと得られるメリットを以前より具体的なイメージに落とし込むことができました。

なんとなく使っていた頃は並列でアップロードするからなんか早くなるんだな
ほどの認識でしたが実際にはネットワークの部分のみにならず非同期に処理をすることでクライアント上のリソースもより有効活用できる機能であると感じました。

書き込めば書き込むほど答えてくれる部分ではありますがその分複雑化してしまうためどこまでやるかのバランスは重要となります。

それでもアップロードやファイルの生成処理部分でもう少し速度が欲しいような場合、是非一度処理の見直しの検討材料の1つとしてみてはいかがでしょうか。