S3上のオブジェクトを結合するnpmパッケージで結合順を設定できるようにした話

S3上のオブジェクトを結合するnpmパッケージで結合順を設定できるようにした話

Clock Icon2025.02.10

はじめに

昨年 5 月(2024 年 5 月)に以下のブログを書きました。本記事はそのアップデートブログとなります。

https://dev.classmethod.jp/articles/shuntaka-s3-concat/
https://www.npmjs.com/package/s3-concat

仕組みを簡単に説明すると、マルチパートアップロードで UploadPartCopyUploadPart を組み合わせて、複数のオブジェクトを 1 つに結合しています。

以前は、5 MiB 以上のオブジェクトとそれ未満のオブジェクトに分けて、まずは 5 MiB 以上の S3 オブジェクトを UploadPartCopy で送り、残りを UploadPart でストリーム転送していました。そのため、S3 オブジェクトの連結順序がサイズによって入れ替わる仕様になっていたのです。

当時のユースケースでは優先度が低かったため、この問題への対応を見送っていましたが、フィードバックをいただき、あらためて需要を感じたため対応することにしました。

https://github.com/shuntaka9576/s3-concat/issues/5

仕組み

最初に述べた方法を少し改善したものです。ざっくり説明すると、結合したい S3 オブジェクトを指定した順序で両端キュー(以下 Deque とします)に詰め込み、Deque が空になるまで以下を繰り返します。

  • 5 MiB 以上の場合:
    UploadPartCopy で 5 GiB ごとに始点・終点を指定したUploadPartCopy の元となるリクエストを作成し、5 MiB 未満の残り部分があれば Deque の先頭に追加する
  • 5 MiB 未満の場合:
    5 MiB になるまで複数の S3 オブジェクトを始点・終点を指定し、UploadPart の元となるリクエストを作成

実際のリクエスト生成部分がこのツールのコアで、150 行程度です。Deque を使わずに配列のみで構成した最小実装は Issue #5 のコメント にも掲載しています(初期の動作検証用のため、参考程度でお願いします)。Deque を使う理由は後述します。

https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3/task.ts#L114-L198

上記のリクエストを作成したら、非同期かつ並行でリクエストを送信します。
https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3-concat.ts#L239-L256
https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3/client.ts#L123-L176

最後に Etag をパート順にソートしてマルチパートアップロードを完了します。

https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3/client.ts#L178-L189

Deque を使った理由については後述します。

試してみる

準備

本ライブラリでは、オブジェクト名・サイズ・更新日時でソートが可能です。今回はオブジェクト名でソートした場合の例を紹介します。

今回は以下のような S3 オブジェクトを用意します。

サイズ 個数 内容 オブジェクト名
9 GiB 1 aaaa\n の繰り返し a_001.txt
100 MiB 9 bbbb\n の繰り返し b_001.txt ~ b_009.txt
1 MiB 124 cccc\n の繰り返し c_001.txt ~ c_124.txt

合計 134 個、合計サイズは 10 GiB になります。

期待する結果は下表のとおりです。ファイルはあくまでアルファベットのどれかが先頭/末尾にあることで確認とし、実際の内容が正しく繋がっていれば問題ありません。

ソート対象 昇順 (head) 昇順 (tail) 降順 (head) 降順 (tail)
S3 オブジェクト名 a c c a

テストファイルの作成

testdata.sh
#!/bin/bash
mkdir -p testdata

yes "aaaa" | head -c $((9 * 1024 * 1024 * 1024)) > testdata/a_001.txt

# ---- 100MiBファイル(9個)----
# 行数 = 100MiB / 5バイト = 20971520 行
for i in $(seq -w 1 9); do
  yes "bbbb" | head -n 20971520 > testdata/b_00${i}.txt
done

# ---- 1MiBファイル(124個)----
# サイズ指定で作るので最後は行の途中で切れます
for i in $(seq -w 1 124); do
  yes "cccc" | head -c $((1024 * 1024)) > testdata/c_${i}.txt
done
./testdata.sh

出力されたファイルを確認します。

$ cd testdata
$ ls
a_001.txt  b_005.txt  c_001.txt  c_006.txt  ...  c_122.txt
b_001.txt  b_006.txt  c_002.txt  c_007.txt  ...  c_123.txt
b_002.txt  b_007.txt  c_003.txt  c_008.txt  ...  c_124.txt
b_003.txt  b_008.txt  c_004.txt  c_009.txt  ...
b_004.txt  b_009.txt  c_005.txt  c_010.txt  ...
$ eza -l --binary | head -n 11
.rw-r--r--@ 9.0Gi ec2-user 10 Feb 02:17 a_001.txt
.rw-r--r--@ 100Mi ec2-user 10 Feb 02:17 b_001.txt
...
.rw-r--r--@ 1.0Mi ec2-user 10 Feb 02:17 c_001.txt

次に、S3 にアップロードします。

aws s3 sync . s3://test-s3-concat/testdata/

134 ファイル、合計 10 GB アップロードされていることを確認します。
S3 上で 134 ファイル 10 GB を確認

TypeScript(Node.js)の実行は CloudShell 上で実施します。CloudShell のスペックは以下ですが、本ライブラリは 5 GiB 以上は S3 同士の UploadPartCopy を使用し、5 MiB 以下はすべてストリームで流すため、メモリやストレージが少ない環境でも実行可能です。

  • 1 vCPU (仮想 CPU)
  • 2 GiB RAM
  • 1 GB の永続ストレージ

オブジェクト名昇順

import { S3Client } from '@aws-sdk/client-s3';
import { S3Concat } from 's3-concat';

const s3Client = new S3Client({});
const srcBucketName = 'test-s3-concat';
const dstBucketName = 'test-s3-concat';

const dstPrefix = 'destination';

const main = async () => {
  const now = Date.now();
  const s3Concat = new S3Concat({
    s3Client,
    srcBucketName: srcBucketName,
    dstBucketName: dstBucketName,
    dstPrefix,
    concatFileName: `concatedfile_${now}.txt`,
    pLimit: 10,
    joinOrder: 'keyNameAsc',
  });

  await s3Concat.addFiles('testdata');
  const result = await s3Concat.concat();

  console.log(JSON.stringify(result));
};

await main();

実行すると、約 1 分半ほどで完了しました。

[cloudshell-user@ip-xx-xxx-xx-xxx s3-concat-esm]$ time npx tsx ./src/index.ts 
{"kind":"concatenated","keys":[{"key":"destination/concatedfile_1739155059046.txt","size":10737418240}]}

real    1m25.758s
user    0m3.885s
sys     0m0.958s

S3 上に 10 GiB のファイルが作成されていることを確認します。
S3 に 10 GiB のファイルが作成

先頭に a_xxx.txt の内容、末尾に c_xxx.txt の内容が連結されていることを確認します。

$ head concatedfile_1739155059046.txt
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa

$ tail concatedfile_1739155059046.txt
cccc
cccc
cccc
cccc
cccc
cccc
cccc
cccc
cccc

オブジェクト名降順

-    joinOrder: 'keyNameAsc',
+    joinOrder: 'keyNameDesc',
[cloudshell-user@ip-xx-xxx-xx-xxx s3-concat-esm]$ time npx tsx ./src/index.ts 
{"kind":"concatenated","keys":[{"key":"destination/concatedfile_1739155403235.txt","size":10737418240}]}

real    1m27.023s
user    0m3.850s
sys     0m0.839s

同様に S3 に 10 GiB ファイルが作成されており、先頭に c_xxx.txt、末尾に a_xxx.txt が連結されていることを確認します。

$ head concatedfile_1739155403235.txt
cccc
cccc
cccc
cccc
cccc
cccc
cccc
cccc
cccc
cccc

$ tail concatedfile_1739155403235.txt
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa
aaaa

注意したところ

最終パート以外で 5 MiB 以下の S3 オブジェクトが見つかった場合

たとえば 5.0GiB + 0.1MiB のようなオブジェクトは、リクエスト上は 5.0 GiB 部分と 0.1 MiB 部分に分割して扱います。0.1 MiB 部分は最終的に GetObjectCommand のバイト指定でストリーム取得し、UploadPart で送信します。

5 MiB 未満のオブジェクト群は pipeline を使用してストリームを連結し、
https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3/client.ts#L192-L222

その連結ストリームをそのまま UploadPart で送信しています。

https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3/client.ts#L152-L167

連結対象オブジェクトが多い場合

Deque を採用した理由は、先頭への追加・取得が頻繁に発生するためです。以下は先頭への追加と取り出しを 50 万回ほど行った場合のベンチマークです。

---- 配列で unshift / shift ----
Array unshift: 21.085s
Array shift: 19.993s

---- Deque で pushFront / popFront ----
Deque pushFront: 6.598ms
Deque popFront: 2.736ms
ベンチマークソースコード
class Deque<T> {
  private capacity: number;
  private buffer: (T | undefined)[];
  private head: number;
  private tail: number;
  private _size: number;

  constructor(initialCapacity = 8) {
    this.capacity = initialCapacity;
    this.buffer = new Array<T | undefined>(this.capacity);
    this.head = 0;
    this.tail = 0;
    this._size = 0;
  }

  public get size(): number {
    return this._size;
  }

  public pushFront(item: T): void {
    if (this._size === this.capacity) {
      this.resize();
    }
    this.head = (this.head - 1 + this.capacity) % this.capacity;
    this.buffer[this.head] = item;
    this._size++;
  }

  public pushBack(item: T): void {
    if (this._size === this.capacity) {
      this.resize();
    }
    this.buffer[this.tail] = item;
    this.tail = (this.tail + 1) % this.capacity;
    this._size++;
  }

  public popFront(): T | undefined {
    if (this._size === 0) {
      return undefined;
    }
    const item = this.buffer[this.head];
    this.buffer[this.head] = undefined;
    this.head = (this.head + 1) % this.capacity;
    this._size--;
    return item;
  }

  public popBack(): T | undefined {
    if (this._size === 0) {
      return undefined;
    }
    this.tail = (this.tail - 1 + this.capacity) % this.capacity;
    const item = this.buffer[this.tail];
    this.buffer[this.tail] = undefined;
    this._size--;
    return item;
  }

  public front(): T | undefined {
    if (this._size === 0) {
      return undefined;
    }
    return this.buffer[this.head];
  }

  public back(): T | undefined {
    if (this._size === 0) {
      return undefined;
    }
    return this.buffer[(this.tail - 1 + this.capacity) % this.capacity];
  }

  private resize() {
    const newCapacity = this.capacity * 2;
    const newBuffer = new Array<T | undefined>(newCapacity);

    for (let i = 0; i < this._size; i++) {
      newBuffer[i] = this.buffer[(this.head + i) % this.capacity];
    }

    this.buffer = newBuffer;
    this.head = 0;
    this.tail = this._size;
    this.capacity = newCapacity;
  }
}

function performanceTest() {
  const testSize = 500000;
  const testArray: number[] = [];
  const testDeque = new Deque<number>();

  console.log(`\n---- 配列で unshift / shift ----`);
  console.time('Array unshift');
  for (let i = 0; i < testSize; i++) {
    testArray.unshift(i);
  }
  console.timeEnd('Array unshift');

  console.time('Array shift');
  while (testArray.length > 0) {
    testArray.shift();
  }
  console.timeEnd('Array shift');

  console.log(`\n---- Deque で pushFront / popFront ----`);
  console.time('Deque pushFront');
  for (let i = 0; i < testSize; i++) {
    testDeque.pushFront(i);
  }
  console.timeEnd('Deque pushFront');

  console.time('Deque popFront');
  while (testDeque.size > 0) {
    testDeque.popFront();
  }
  console.timeEnd('Deque popFront');
}

performanceTest();

そのほか、非同期リクエスト数を調整できるよう pLimit を設けています。
https://github.com/shuntaka9576/s3-concat/blob/90b9be102d476ee0b2eb2f737c193d16e1697d79/lib/s3-concat.ts#L72-L77

弱点とその対応策

ストリームは再送できないのでネットワークが不安定な環境だと安定しづらいです。メモリは消費しますが、ストリームをやめてリトライ可能にするオプションをつけても良いと思います。1リクエストは5MiBなためです。ただ並行になると重くなるので、非同期リクエスト数の調整は必要になりそうです。

さいごに

今回は新機能の簡易的な利用例を紹介しました。実際、結合対象を配列で直接指定できた方が便利な気がしてました。需要があれば対応を検討していきます。

また、この実装を Go や Rust に移植し、CLI として提供することも近いうちに取り組みたいと思います。

問題があればこちらまでご報告頂ければと思います。

https://github.com/shuntaka9576/s3-concat/issues

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.