この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
サーモン大好き横山です。
python 3.2から入った concurrent.futures を使って並行処理を書いたときのはまったことを書いて行こうと思います。
検証環境
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.14.6
BuildVersion: 18G2022
$ python3 -V
Python 3.7.5
$ pwd
/path/to
$ python3 -mvenv venv; . venv/bin/activate
概要
大きいタスクを concurrent.futures.ProcessPoolExecutor
を利用し、大きいタスク内の小さなタスクを concurrent.futures.ThreadPoolExecutor
を使用して、並行作業しようとしました。
処理イメージ
コード
実行結果
$ python main.py
num = 0
num = 1
num = 2
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result
exception=exception))
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 358, in put
obj = _ForkingPickler.dumps(obj)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.RLock objects
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "main.py", line 26, in <module>
main()
File "main.py", line 21, in main
for inner_f in outer_f.result():
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
TypeError: can't pickle _thread.RLock objects
原因
concurrent.futures.ProcessPoolExecutor
の結果を受け取るときに、 threading.RLock
が pickle
化できず、エラーになります。
concurrent/futures/process.py#L201-L208
def _sendback_result(result_queue, work_id, result=None, exception=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc))
これは、conccurent.fetures.Future
クラスのメンバー変数 self._condition
が持っています。
concurrent/futures/_base.py#L309-L319
class Future(object):
"""Represents the result of an asynchronous computation."""
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
対策
大きい方のタスクの ProcessPoolExecutor
が pickle
化できない 返り値を受け取ろうしているが問題なので、 outer_concurrent
の戻り値を変更する。もしくは、multiprocessにこだわりがなければ、 大きい方のタスクのexecutorをconcurrent.futures.ThreadPoolExecutor
に変更すると解決します。
戻り値を細工する場合
def outer_concurrent():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(inner_concurrent, i) for i in range(3)]
return [f.result() for f in futures] # resultの内容だけ返すように修正
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(outer_concurrent) for _ in range(1)]
for outer_f in futures:
for inner_f in outer_f.result():
print(f"result = {inner_f}") # resultの内容だけになったので、 「.result()」を削除
大きいタスクのexecutorをThreadPoolExecutorにする場合
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(outer_concurrent) for _ in range(1)]
for outer_f in futures:
for inner_f in outer_f.result():
print(f"result = {inner_f.result()}")
変更後の実行結果
$ python main.py
num = 0
num = 1
num = 2
result = 0
result = 1
result = 4
まとめ
multiprocessing、threadingを意識せず使用できる concurrent.futres
ですが、便利な分エラーが起こったときの調査が意外と大変になりがちです。
そんな人達の助力になれば幸いです。
事業開発部ではソフトウェアエンジニアを募集中です
現在私はクラスメソッドの事業開発部で prismatix というサービスの開発に携わっています。 事業開発部ではソフトウェアエンジニアを募集しています。
もし興味のある方がいましたら、こちらのページ を見ていただけますと幸いです。