Pythonで並行作業を書こうとして、ProcessPoolExecutorではまったお話

サーモン大好き横山です。

python 3.2から入った concurrent.futures を使って並行処理を書いたときのはまったことを書いて行こうと思います。

検証環境

$ sw_vers
ProductName:	Mac OS X
ProductVersion:	10.14.6
BuildVersion:	18G2022
$ python3 -V
Python 3.7.5
$ pwd
/path/to
$ python3 -mvenv venv; . venv/bin/activate

概要

大きいタスクを concurrent.futures.ProcessPoolExecutor を利用し、大きいタスク内の小さなタスクを concurrent.futures.ThreadPoolExecutor を使用して、並行作業しようとしました。

処理イメージ


コード

gist

実行結果

$ python main.py
num = 0
num = 1
num = 2
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result
    exception=exception))
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 358, in put
    obj = _ForkingPickler.dumps(obj)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.RLock objects
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "main.py", line 26, in <module>
    main()
  File "main.py", line 21, in main
    for inner_f in outer_f.result():
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
TypeError: can't pickle _thread.RLock objects

原因

concurrent.futures.ProcessPoolExecutor の結果を受け取るときに、 threading.RLockpickle 化できず、エラーになります。

concurrent/futures/process.py#L201-L208

def _sendback_result(result_queue, work_id, result=None, exception=None):
    """Safely send back the given result or exception"""
    try:
        result_queue.put(_ResultItem(work_id, result=result,
                                     exception=exception))
    except BaseException as e:
        exc = _ExceptionWithTraceback(e, e.__traceback__)
        result_queue.put(_ResultItem(work_id, exception=exc))

これは、conccurent.fetures.Future クラスのメンバー変数 self._condition が持っています。

concurrent/futures/_base.py#L309-L319

class Future(object):
    """Represents the result of an asynchronous computation."""

    def __init__(self):
        """Initializes the future. Should not be called by clients."""
        self._condition = threading.Condition()
        self._state = PENDING
        self._result = None
        self._exception = None
        self._waiters = []
        self._done_callbacks = []

対策

大きい方のタスクの ProcessPoolExecutorpickle 化できない 返り値を受け取ろうしているが問題なので、 outer_concurrent の戻り値を変更する。もしくは、multiprocessにこだわりがなければ、 大きい方のタスクのexecutorをconcurrent.futures.ThreadPoolExecutor に変更すると解決します。

戻り値を細工する場合

def outer_concurrent():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        futures = [executor.submit(inner_concurrent, i) for i in range(3)]
    return [f.result() for f in futures] # resultの内容だけ返すように修正


def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        futures = [executor.submit(outer_concurrent) for _ in range(1)]

    for outer_f in futures:
        for inner_f in outer_f.result():
            print(f"result = {inner_f}") # resultの内容だけになったので、 「.result()」を削除

大きいタスクのexecutorをThreadPoolExecutorにする場合

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        futures = [executor.submit(outer_concurrent) for _ in range(1)]

    for outer_f in futures:
        for inner_f in outer_f.result():
            print(f"result = {inner_f.result()}")

変更後の実行結果

$ python main.py
num = 0
num = 1
num = 2
result = 0
result = 1
result = 4

まとめ

multiprocessing、threadingを意識せず使用できる concurrent.futres ですが、便利な分エラーが起こったときの調査が意外と大変になりがちです。
そんな人達の助力になれば幸いです。

事業開発部ではソフトウェアエンジニアを募集中です

現在私はクラスメソッドの事業開発部で prismatix というサービスの開発に携わっています。 事業開発部ではソフトウェアエンジニアを募集しています。

もし興味のある方がいましたら、こちらのページ を見ていただけますと幸いです。