Amazon S3でjava.util.concurrentを使って高速アップロードを実現する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon S3で大量のファイルを高速にアップロードしたい

前回の記事で、Amazon S3でオブジェクトをまとめて削除する機能が追加されたことをご紹介しました。その際に作ったプログラムではファイルのアップロードに時間が掛かっていました。今回は、このアップロードを高速に行います。

AWSのAPIは処理が独立している

まず始めにどのような方法で高速にアップロードしようか考えました。まずはAPIリファレンスを見て、「bulk upload」とか「batch upload」とか「multi upload」などのキーワードを探しました。結果、Multipartファイルアップロードが見つかりましたが、これは1つの大きなファイルを分割してアップロードする高レベルの機能でした。そこで、APIとして提供されていないならば、呼び方を工夫すればよいのかなということで、並行処理に行き着いたわけです。

以下は完成系のイメージです

java.util.concurrentパッケージとは?

java.util.concurrentパッケージは、JDK 5で追加されたマルチスレッドプログラミング(並行処理)を行うためのクラス群です。教科書に載っているのはThreadクラスやrunnableインタフェースなどの基本的なものが中心かと思いますが、concurrentパッケージはより実用的なクラスが多く含まれています。

まずは基本的なアップロード時間を測定する

比較対象として普通に1つファイルをアップロードした場合の時間を測定します。以下はサンプルソースです。

import java.io.*;
import java.util.*;

import com.amazonaws.*;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.s3.*;
import com.amazonaws.services.s3.model.*;

public class PutS3Sample {
	
	private static final int COUNTS = 1;

	public static void main(String[] args) throws IOException {

		AmazonS3 s3 = new AmazonS3Client(new PropertiesCredentials(
				PutS3Sample.class
						.getResourceAsStream("AwsCredentials.properties")));

		String bucketName = "my-first-s3-bucket-" + UUID.randomUUID();
		String key = "MyObjectKey";

		try {
			System.out.println("Creating bucket " + bucketName + "\n");
			s3.createBucket(bucketName);

			long startSingle = System.currentTimeMillis();

			for (int i = 0; i < COUNTS; i++) {
				s3.putObject(new PutObjectRequest(bucketName, key
						+ UUID.randomUUID(), createSampleFile()));
			}

			long endSingle = System.currentTimeMillis();
			System.out.format("%d Uploaded : %d ms\n",COUNTS,
					(endSingle - startSingle));

		} catch (AmazonClientException ace) {
			System.out.println("Error Message: " + ace.getMessage());
		}
	}

	private static File createSampleFile() throws IOException {
		File file = File.createTempFile("aws-java-sdk-", ".txt");
		file.deleteOnExit();

		Writer writer = new OutputStreamWriter(new FileOutputStream(file));
		writer.write("abcdefghijklmnopqrstuvwxyz\n");
		writer.close();

		return file;
	}
}
[/java]

<p>結果は以下のようになりました。</p>


<p>次に100回ファイルをアップロードすると以下のような結果になりました。順次実行していますので概ね100倍(誤差あり)ですね。</p>


<p>以下は100回繰り返したイメージです。</p>
<p><a href="https://dev.classmethod.jp/cloud/amazon-s3-java-util-concurrent-file-upload/attachment/s3upload-001/" rel="attachment wp-att-17251"><img src="http://public-blog-dev.s3.amazonaws.com/wp-content/uploads/2012/01/s3upload-001.png" alt="" title="s3upload-001" width="584" height="413" class="alignnone size-full wp-image-17251" /></a></p>

<h2 id="toc-1">マルチスレッドでまとめてアップロードする</h2>
<p>今回行いたいのは、100個のスレッドを作成してそれぞれでAmazon S3にファイルをアップロードします。全てが終わったら時間を計測して報告という流れです。全てのスレッドが正常に終わったか知る方法って何気に実装難しいですよね。concurrentパッケージはここらへんをよろしくやってくれます。完成イメージで書いた通りにマルチスレッドプログラミングします。今回初めてFuture、Callable、ExecutorService、Executorなどを使いましたが、あまりに簡単にやりたい事が実現できてしまうのでビックリしました。</p>


import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.PutObjectRequest;

class S3UploadTask implements Callable<Boolean> {
	private String bucket = null;
	private String key = null;

	public S3UploadTask(String bucket, String key) {
		this.bucket = bucket;
		this.key = key;
	}

	@Override
	public Boolean call() {
		try {
			AmazonS3 s3 = new AmazonS3Client(new PropertiesCredentials(
					S3UploadTask.class
							.getResourceAsStream("AwsCredentials.properties")));

			s3.putObject(new PutObjectRequest(bucket, key, createSampleFile()));

			return true;
		} catch (Exception e) {
			return false;
		}
	}

	private static File createSampleFile() throws IOException {
		File file = File.createTempFile("aws-java-sdk-", ".txt");
		file.deleteOnExit();

		Writer writer = new OutputStreamWriter(new FileOutputStream(file));
		writer.write("abcdefghijklmnopqrstuvwxyz\n");
		writer.close();

		return file;
	}
}

public class ConcurrentUpload {

	private static final int CLIENTS = 100;
	private static final int THREADS = 100;

	private static final ExecutorService executorPool = Executors
			.newFixedThreadPool(THREADS);

	public static void main(String[] args) throws IOException {
		int success = 0;
		int failure = 0;

		AmazonS3 s3 = new AmazonS3Client(new PropertiesCredentials(
				ConcurrentUpload.class
						.getResourceAsStream("AwsCredentials.properties")));

		String bucket = "my-first-s3-bucket-" + UUID.randomUUID();
		String key = "MyObjectKey";

		s3.createBucket(bucket);

		Collection<S3UploadTask> collection = new ArrayList<S3UploadTask>();
		for (int i = 0; i < CLIENTS; i++) {
			S3UploadTask task = new S3UploadTask(bucket, key
					+ UUID.randomUUID());
			collection.add(task);
		}

		long startTime = System.currentTimeMillis();
		try {
			List<Future<Boolean>> list = executorPool.invokeAll(collection);
			for (Future<Boolean> fut : list) {
				int ignore = fut.get() ? success++ : failure++;
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			System.out.println("TOTAL SUCCESS - " + success);
			System.out.println("TOTAL FAILURE - " + failure);
			System.out.println("Total time - "
					+ (System.currentTimeMillis() - startTime) + " ms");
			executorPool.shutdown();
		}
	}
}

実行結果は以下となりました。100個のスレッドで同時にHTTPリクエストですw

TOTAL SUCCESS - 100
TOTAL FAILURE - 0
Total time - 2156 ms

比較対象として1スレッドで実行してみました。毎回AmazonS3Clientをインスタンス化して順次処理となっているので大変時間が掛かりますね。

TOTAL SUCCESS - 100
TOTAL FAILURE - 0
Total time - 140730 ms

スレッド数を増やし過ぎるとダメ

スレッド数さえ増やせばいくらでも同時にアップロードできるかなと思いましたが2つ問題があります。1つ目はクライアント側のメモリが足りなくなることです。2つ目はサーバー側つまりAWS側がリクエストを拒否する可能性があることです。ということで、アップロードするファイル数が数十万あったとしても、最大で100程度のスレッド数に押さえておきましょう。といっても、これをどのようにプログラムで実現するか悩みますよね。ご安心を、concurrentパッケージですよ!ExecutorServiceは、スレッドプールとしての機能を持っていますので、例えば100スレッドを上限値として空き次第次を実行するいう処理が可能です。

まとめ

java.util.concurrentパッケージのクラス群を使って、簡単に同時ファイルアップロードを実現することができました。今回は実験用に小さなファイルのアップロードのみを行いましたが、実用性を上げるために、例えば100MBを超えるファイルがあった場合にはMultipartアップロードにするなど処理を切り替えてあげてはいかがでしょうか。ちょっとした工夫で何十倍速になりますね。マルチスレッドの使い方をマスターして高速化マエストロになろう!