asyncioでPythonの非同期処理を書いてみる

こんにちは、CX事業本部の夏目です。

LambdaでAWSリソースへアクセスする際に直列的に処理するのではなく、並列的に処理したいことは度々あります。

NodeだとPromiseを使えば簡単にできるのですが、Pythonではどうやるのか気になったので調べてみました。

注意

ここではPython3.5以降を使うことを想定して書いてます。

asyncio モジュール

Pythonではasyncioモジュールを使って並列的に処理を書くことができます。

超ざっくり言うと、イベントループを使ってコルーチンを実行しているようです。

それ以上の説明は下記記事を見てください。

Pythonの非同期通信(asyncioモジュール)入門を書きました - ゆくゆくは有へと

非同期処理のことから知らない人向けに Python くらいしかろくに知らない人間が書きました。せっかくキーワードが文法に組み込まれたんだから理解したいじゃんか! asyncioモジュールを使うための基本的な概念が理解できるようになってるはず、多分。 環境としては Python3.5 以上を想定しています。つまり、 await や async キーワードを使っていきます。 ...

簡単に使ってみる

コード

import asyncio
import time


async def sleeping(sec):
    loop = asyncio.get_event_loop()
    print(f'start:  {sec}秒待つよ')
    await loop.run_in_executor(None, time.sleep, sec)
    print(f'finish: {sec}秒待つよ')


def main():
    array = [5, 1, 8, 3, 4]

    loop = asyncio.get_event_loop()

    print('=== 一つだけ実行してみよう ===')
    loop.run_until_complete(sleeping(2))

    print('\n=== 5つ並列的に動かしてみよう')
    gather = asyncio.gather(
        sleeping(5),
        sleeping(1),
        sleeping(8),
        sleeping(3),
        sleeping(4)
    )
    loop.run_until_complete(gather)


if __name__ == '__main__':
    main()

実行結果

=== 一つだけ実行してみよう ===
start:  2秒待つよ
finish: 2秒待つよ

=== 5つ並列的に動かしてみよう
start:  4秒待つよ
start:  1秒待つよ
start:  8秒待つよ
start:  5秒待つよ
start:  3秒待つよ
finish: 1秒待つよ
finish: 3秒待つよ
finish: 4秒待つよ
finish: 5秒待つよ
finish: 8秒待つよ

説明

基本的には、

  • asyncio.get_event_loop()でイベントループを取得
  • 並列的に動かしたい関数はasyncをつけて定義
  • 時間がかかる処理は、awaitと宣言してからイベントループのrun_in_executorで呼び出す
  • イベントループのrun_until_completeで並列的に実行しつつ終わるまで待つ
  • 複数同時に処理したい場合はgatherでくくってから、run_until_completeにわたす

といった感じです。

並列数を制限しながら実行する

コード

import asyncio
import functools
import time


async def sleeping(sec):
    loop = asyncio.get_event_loop()
    func = functools.partial(time.sleep, sec)
    print(f'start:  {sec}秒待つよ')
    await loop.run_in_executor(None, func)
    print(f'finish: {sec}秒待ったよ')


async def limited_parallel_call(sec_list, limit):
    sem = asyncio.Semaphore(limit)

    async def call(sec):
        with await sem:
            return await sleeping(sec)

    return await asyncio.gather(*[call(x) for x in sec_list])


def main():
    loop = asyncio.get_event_loop()
    options = [5, 1, 8, 3, 4]

    print('=== 並列実行数制限なし ===')
    loop.run_until_complete(asyncio.gather(*[sleeping(x) for x in options]))

    print('=== 2並列に制限 ===')
    loop.run_until_complete(limited_parallel_call(options, 2))
    print('=== finish ===')


if __name__ == '__main__':
    main()

実行結果

=== 並列実行数制限なし ===
start:  3秒待つよ
start:  4秒待つよ
start:  1秒待つよ
start:  8秒待つよ
start:  5秒待つよ
finish: 1秒待ったよ
finish: 3秒待ったよ
finish: 4秒待ったよ
finish: 5秒待ったよ
finish: 8秒待ったよ
=== 2並列に制限 ===
start:  4秒待つよ
start:  8秒待つよ
finish: 4秒待ったよ
start:  3秒待つよ
finish: 3秒待ったよ
start:  5秒待つよ
finish: 8秒待ったよ
start:  1秒待つよ
finish: 1秒待ったよ
finish: 5秒待ったよ
=== finish ===

説明

並列的に処理できるからって全部一度に実行するばかりだと困ることがあります。
なので、今回は並列数を制限してやってみます。

並列数の制限にはSemaphoreを使います。
これはオブジェクトを作成する際に最大並列数を指定して、同時に実行できる数を制限できます。

並列に実行した処理の結果を受け取る

コード

import asyncio
import time
import functools
import requests


def sleep(sec):
    time.sleep(sec)
    return sec


async def get_global_ip():
    loop = asyncio.get_event_loop()
    resp = await loop.run_in_executor(None, requests.get, 'http://inet-ip.info/ip')
    print('get_ip')
    return resp.text


async def parallel_sleep(seconds):
    loop = asyncio.get_event_loop()
    resp = await loop.run_in_executor(None, sleep, seconds)
    print(f'sleep {resp}sec')
    return resp


def main():
    loop = asyncio.get_event_loop()
    gather = asyncio.gather(
        parallel_sleep(10),
        get_global_ip(),
        parallel_sleep(1)
    )
    results = loop.run_until_complete(gather)
    print(results)


if __name__ == '__main__':
    main()

実行結果

get_ip
sleep 1sec
sleep 10sec
[10, '121.101.70.247', 1]

説明

並列的に処理した結果を使いたいことは多々あると思います。

取得する方法は簡単で

  • await loop.run_in_executorの返り値は渡した関数の返り値
  • run_until_completeで一つだけ実行する場合は、実行したものの返り値をそのまま貰える
  • run_until_completeに複数の処理を渡した場合は、渡した順番で結果が格納された配列を貰える

になります。

関数の引数の指定の方法

コード

import asyncio
import time
import functools


def delayed_print(mes, sec):
    time.sleep(sec)
    print(mes)


async def call_1(mes, sec):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, delayed_print, mes, sec)


async def call_2(mes, sec):
    loop = asyncio.get_event_loop()
    func = functools.partial(delayed_print, mes, sec)
    await loop.run_in_executor(None, func)


async def call_3(message, seconds):
    loop = asyncio.get_event_loop()
    func = functools.partial(delayed_print, mes=message, sec=seconds)
    await loop.run_in_executor(None, func)


def main():
    loop = asyncio.get_event_loop()
    gather = asyncio.gather(
        call_1('333', 3),
        call_2('222', 2),
        call_1('111', 1)
    )
    loop.run_until_complete(gather)


if __name__ == '__main__':
    main()

実行結果

111
222
333

説明

引数を指定する際、いくつか方法があります。

  • 順番に引数を渡す場合、run_in_executorに関数を渡したあと順番に引数書く
  • 順番に引数を渡す場合、functoolモジュールのpartialに関数を渡したあと順番に引数を書き、partialの返り値をrun_in_executorに渡す
  • 名前付き引数を使う場合、functoolモジュールのpartialに関数を渡したあと名前付き引数を書いて、partialの返り値をrun_in_executorに渡す

順番に引数を渡すだけならrun_in_executorだけで事足りるのですが、名前付き引数を使おうとするとfunctoolモジュールのpartialを使用する必要があります。

まとめ

Pythonで非同期処理を行う方法について簡単にまとめてみました。

ぜひ使ってみてください。