この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
AWSCLIのS3コマンドは自動でデータを分割してアップロードダウンロードするマルチパート処理に対応しているが、この閾値とマルチパート時に分割された1パートのサイズは幾つなのか? という話題が社内でありました。結論から先に書くと以下でした。
- マルチパートになる閾値 8MiB
- 1データのサイズ(チャンクサイズ) 7MiB
- 最大並列度(スレッド数) 10
これでブログが終わってしまうのもなんなので、調べた方法についてまとめます。
コードの取得
awscliのコードはgithubにて公開されています。
ですので、こちらよりgitコマンドでを取得します。取得後lsコマンドを実行するとaws-cliディレクトリがあることがわかります。
$ git clone https://github.com/aws/aws-cli.git
Cloning into 'aws-cli'...
remote: Counting objects: 12610, done.
remote: Compressing objects: 100% (3/3), done.
remote: Total 12610 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (12610/12610), 4.41 MiB | 868.00 KiB/s, done.
Resolving deltas: 100% (7180/7180), done.
Checking connectivity... done.
$ ls
aws-cli
S3マルチパート処理ファイルを探す
aws-cliディレクトリを確認するといろいろファイルがあることがわかります。
$ cd aws-cli/
$ ls
CHANGELOG.rst bin setup.py
LICENSE.txt doc tests
MANIFEST.in examples tox.ini
README.rst requirements.txt
awscli scripts
$
ここでよいツールと紹介できるとよいのですが、そういうものは用意していないのでfindとgrepをつかいます。
$ find . -name '*.py' | xargs grep -il 'multipart'
./awscli/customizations/s3/executor.py
./awscli/customizations/s3/fileinfo.py
./awscli/customizations/s3/s3handler.py
./awscli/customizations/s3/tasks.py
./awscli/customizations/s3/utils.py
./tests/integration/customizations/s3/test_plugin.py
./tests/integration/customizations/s3/test_s3handler.py
./tests/unit/customizations/s3/fake_session.py
./tests/unit/customizations/s3/test_s3handler.py
./tests/unit/customizations/s3/test_tasks.py
最近のOSであれば、もっと手短なコマンドがあるとおもいますが、20年くらい前の人なので、パイプをつかってこんな感じで処理しています。
- find . -name '*.py' で現在のディレクトリから下にあるPythonコード(.pyで終わるファイル)を列挙
- xargs コマンドで次のコマンドにfindの出力結果を引数として渡す。
- grep -il 'multipart'で multipartという文字列(大文字小文字関係なし)がはいっているファイルを列挙
出力結果をみるとcustomizations/s3というディレクトリにコードの実体があるようです。testsというフォルダもあるのですね。どんなテストしているのあ気になりますが、本題とずれるので次にゆきます
S3マルチパート処理を探す
さきほど処理のあるディレクトリが./awscli/customizations/s3/とあたりを付けられたので そちらのファイル一覧を確認します。ファイル名とファイルサイズからおおまかな当たりをつけます。後ろのコメントは私がつけています。
$ ls -l
total 360
-rw-r--r--+ 1 t s 565 7 30 13:42 __init__.py
-rw-r--r--+ 1 t s 8834 7 30 13:42 comparator.py #比較処理?
-rw-r--r--+ 1 t s 782 7 30 13:42 constants.py #定数設定?
-rw-r--r--+ 1 t s 6660 7 30 13:42 description.py #?
-rw-r--r--+ 1 t s 3277 7 30 13:42 dochandler.py #?
-rw-r--r--+ 1 t s 11799 7 30 13:42 executor.py #なにか実行している?
-rw-r--r--+ 1 t s 6029 7 30 13:42 fileformat.py #出力フォーマット関係?
-rw-r--r--+ 1 t s 10011 7 30 13:42 filegenerator.py #出力フォーマット関係?
-rw-r--r--+ 1 t s 11151 7 30 13:42 fileinfo.py #?
-rw-r--r--+ 1 t s 5814 7 30 13:42 filters.py #ファイルのフィルタ処理?
-rw-r--r--+ 1 t s 39343 7 30 13:42 s3.py #本体っぽい
-rw-r--r--+ 1 t s 14700 7 30 13:42 s3handler.py #?
-rw-r--r--+ 1 t s 28134 7 30 13:42 tasks.py #タスク、処理の実体?
-rw-r--r--+ 1 t s 11885 7 30 13:42 utils.py #ユーティリティクラス
以下があやしい気がします。
- s3.py
- s3handler.py
- tasks.py
この中で先程のgrepの時にはs3handler.py、tasks.pyが出てきたので、このファイルをエディタで開いて 確認します。multipartで検索すると以下のクラスがみつかりました。
class UploadPartTask(OrderableTask):
"""
This is a task used to upload a part of a multipart upload.
This task pulls from a ``part_queue`` which represents the
queue for a specific multipart upload. This pulling from a
``part_queue`` is necessary in order to keep track and
complete the multipart upload initiated by the ``FileInfo``
object.
"""
part_queueというキューにマルチパートのデータがあるのでそれを取得してアップロードするというこを行なうクラスとのことです。FileInfoが分割されたファイルの情報ということもわかりました。
ここでこのクラスは深く終わずに、もう一つのs3hander.pyも確認します。multiで検索するといかにもという変数がでてきました。
from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \
NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE
定数定義
awscli.customizations.s3.constantsクラスは同じディレクトリのconstraints.pyです。 早速みてみます
MULTI_THRESHOLD = 8 * (1024 ** 2) #マルチパートの閾値?
CHUNKSIZE = 7 * (1024 ** 2) #マルチパート分割の単位?
NUM_THREADS = 10 #最大スレッド数?
QUEUE_TIMEOUT_WAIT = 0.2 #?
MAX_PARTS = 950 #最大パート数?
MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3) #単一ファイルの最大アップロードサイズ?
MAX_UPLOAD_SIZE = 5 * (1024 ** 4) #最大アップロードサイズ?
MAX_QUEUE_SIZE = 1000 #最大キューサイズ?
なんとなくそれっぽい値がわかったかとおもいます。ただし処理を追った訳ではないのでコメントの最後に?がついています。私がコードを追うときは、まずはキーワードからデータを探し、そこからそのデータを使う処理をさがすようにしています。
処理を追う
これらの定数が本当につかわれているのか? 処理を追う必要があります。 MULTI_THRESHOLDでgrepしてみます。
takekawatsutomu-2:s3 takekawa.tsutomu$ grep -in 'MULTI_THRESHOLD' *
constants.py:13:MULTI_THRESHOLD = 8 * (1024 ** 2)
s3handler.py:18:from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \
s3handler.py:36: def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD,
s3handler.py:55: self.multi_threshold = multi_threshold
s3handler.py:186: above_multipart_threshold = filename.size > self.multi_threshold
grep -nは行番号表示です。 s3handlerの__init__処理でmulti_thresholdに値を設定していますので、この部分を確認します。
def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD,
chunksize=CHUNKSIZE):
self.session = session
self.result_queue = queue.Queue()
# The write_queue has potential for optimizations, so the constant
# for maxsize is scoped to this class (as opposed to constants.py)
# so we have the ability to change this value later.
self.write_queue = queue.Queue(maxsize=self.MAX_IO_QUEUE_SIZE)
self.params = {'dryrun': False, 'quiet': False, 'acl': None,
'guess_mime_type': True, 'sse': False,
'storage_class': None, 'website_redirect': None,
'content_type': None, 'cache_control': None,
'content_disposition': None, 'content_encoding': None,
'content_language': None, 'expires': None,
'grants': None}
self.params['region'] = params['region']
for key in self.params.keys():
if key in params:
self.params[key] = params[key]
self.multi_threshold = multi_threshold
self.chunksize = chunksize
self.executor = Executor(
num_threads=NUM_THREADS, result_queue=self.result_queue,
quiet=self.params['quiet'], max_queue_size=MAX_QUEUE_SIZE,
write_queue=self.write_queue
)
self._multipart_uploads = []
self._multipart_downloads = []
pythonはコードの行数が短かくてよいですね。キューやスレッドの初期化をしているようです。以下の二行でフィールドとして設定していますね。
self.multi_threshold = multi_threshold
self.chunksize = chunksize
また、こちらでexecutor呼ばれるスレッドを作っているようです。
self.executor = Executor(
num_threads=NUM_THREADS, result_queue=self.result_queue,
quiet=self.params['quiet'], max_queue_size=MAX_QUEUE_SIZE,
write_queue=self.write_queue
)
さらにmulti_threasholdでおっていくとマルチパートの判別ロジックがありました。
def _is_multipart_task(self, filename):
# First we need to determine if it's an operation that even
# qualifies for multipart upload.
if hasattr(filename, 'size'):
above_multipart_threshold = filename.size > self.multi_threshold
if above_multipart_threshold:
if filename.operation_name in ('upload', 'download',
'move', 'copy'):
return True
else:
return False
else:
return False
ということで、マルチパートになる閾値は8GiBというのがわかりました。残りはchunksizeとnum_threadですが、num_thread はExecutorクラスだと予想しているので先にchunksizeを探します。ここまでで処理の流れとしては、キューデータをにいれて、それをExecutorで並列実行という方式が見えているので、それっぽいところをchunksizeで探します
こちらのコードでダウンロードのタスクをキューに入れているようです
def _enqueue_range_download_tasks(self, filename, remove_remote_file=False):
chunksize = find_chunksize(filename.size, self.chunksize)
num_downloads = int(filename.size / chunksize)
context = tasks.MultipartDownloadContext(num_downloads)
create_file_task = tasks.CreateLocalFileTask(context=context,
filename=filename)
self.executor.submit(create_file_task)
for i in range(num_downloads):
task = tasks.DownloadPartTask(
part_number=i, chunk_size=chunksize,
result_queue=self.result_queue, service=filename.service,
filename=filename, context=context, io_queue=self.write_queue)
self.executor.submit(task)
complete_file_task = tasks.CompleteDownloadTask(
context=context, filename=filename, result_queue=self.result_queue,
params=self.params, io_queue=self.write_queue)
self.executor.submit(complete_file_task)
self._multipart_downloads.append((context, filename.dest))
if remove_remote_file:
remove_task = tasks.RemoveRemoteObjectTask(
filename=filename, context=context)
self.executor.submit(remove_task)
return num_downloads
ざっくりとした理解ですが、ファイルの作成、ダウンロード、ファイルの結合というタスクを並行して実行しています。何個のダウンロード数に分けるか? ということで、以下の計算をしているので1ファイルのサイズはchunksizeであることがわかります。
num_downloads = int(filename.size / chunksize)
個々のタスクについてはtasks.pyにコードが存在するのでそちらを見て確認しています。長くなるので端折っていますが、理解するのに時間がかかりました。
最後にExecutorですが、num_threadsをキーワードに検索すると以下のメソッドが見つかります。
def start(self):
self.io_thread.start()
# Note that we're *not* adding the IO thread to the threads_list.
# There's a specific shutdown order we need and we're going to be
# explicit about it rather than relying on the threads_list order.
# See .join() for more info.
self.print_thread.start()
for i in range(self.num_threads):
worker = Worker(queue=self.queue)
worker.setDaemon(True)
self.threads_list.append(worker)
worker.start()
こちらでワーカースレッドをnum_threads数個作成して、各ワーカースレッドはキューにある処理を随時実行するという形になってることがわかりました。
おわりに
awscliについてのマルチパート処理の閾値、データサイズについてどのように調べたか書いてみました。awscliは比較的おいやすいコードだと感じました。各サービス用のコードがそれほど大きくない点が理由です。正直とても恥ずかしいのですが、知るは一時の恥、知らぬは一生の恥ということで、こういう方法がいいよ、ここ間違っているということがあれば教えてもらえるとても嬉しいです。ちなみにエディタはEmacsをつかっておりますが、お好きなものを使うとよいと思います。
今回は、特別なツール等はつかっていないので、pythonに限らず、他でも応用は効くとおもいます。道具があるとは限らない困った状況においては少しは役に立つかもしれません。ではでは。