asyncioでPythonの非同期処理を書いてみる
こんにちは、CX事業本部の夏目です。
LambdaでAWSリソースへアクセスする際に直列的に処理するのではなく、並列的に処理したいことは度々あります。
NodeだとPromiseを使えば簡単にできるのですが、Pythonではどうやるのか気になったので調べてみました。
注意
ここではPython3.5以降を使うことを想定して書いてます。
asyncio モジュール
Pythonではasyncioモジュールを使って並列的に処理を書くことができます。
超ざっくり言うと、イベントループを使ってコルーチンを実行しているようです。
それ以上の説明は下記記事を見てください。
Pythonの非同期通信(asyncioモジュール)入門を書きました - ゆくゆくは有へと
非同期処理のことから知らない人向けに Python くらいしかろくに知らない人間が書きました。せっかくキーワードが文法に組み込まれたんだから理解したいじゃんか! asyncioモジュールを使うための基本的な概念が理解できるようになってるはず、多分。 環境としては Python3.5 以上を想定しています。つまり、 await や async キーワードを使っていきます。 ...
簡単に使ってみる
コード
import asyncio import time async def sleeping(sec): loop = asyncio.get_event_loop() print(f'start: {sec}秒待つよ') await loop.run_in_executor(None, time.sleep, sec) print(f'finish: {sec}秒待つよ') def main(): array = [5, 1, 8, 3, 4] loop = asyncio.get_event_loop() print('=== 一つだけ実行してみよう ===') loop.run_until_complete(sleeping(2)) print('\n=== 5つ並列的に動かしてみよう') gather = asyncio.gather( sleeping(5), sleeping(1), sleeping(8), sleeping(3), sleeping(4) ) loop.run_until_complete(gather) if __name__ == '__main__': main()
実行結果
=== 一つだけ実行してみよう === start: 2秒待つよ finish: 2秒待つよ === 5つ並列的に動かしてみよう start: 4秒待つよ start: 1秒待つよ start: 8秒待つよ start: 5秒待つよ start: 3秒待つよ finish: 1秒待つよ finish: 3秒待つよ finish: 4秒待つよ finish: 5秒待つよ finish: 8秒待つよ
説明
基本的には、
asyncio.get_event_loop()
でイベントループを取得- 並列的に動かしたい関数は
async
をつけて定義 - 時間がかかる処理は、
await
と宣言してからイベントループのrun_in_executor
で呼び出す - イベントループの
run_until_complete
で並列的に実行しつつ終わるまで待つ - 複数同時に処理したい場合は
gather
でくくってから、run_until_complete
にわたす
といった感じです。
並列数を制限しながら実行する
コード
import asyncio import functools import time async def sleeping(sec): loop = asyncio.get_event_loop() func = functools.partial(time.sleep, sec) print(f'start: {sec}秒待つよ') await loop.run_in_executor(None, func) print(f'finish: {sec}秒待ったよ') async def limited_parallel_call(sec_list, limit): sem = asyncio.Semaphore(limit) async def call(sec): with await sem: return await sleeping(sec) return await asyncio.gather(*[call(x) for x in sec_list]) def main(): loop = asyncio.get_event_loop() options = [5, 1, 8, 3, 4] print('=== 並列実行数制限なし ===') loop.run_until_complete(asyncio.gather(*[sleeping(x) for x in options])) print('=== 2並列に制限 ===') loop.run_until_complete(limited_parallel_call(options, 2)) print('=== finish ===') if __name__ == '__main__': main()
実行結果
=== 並列実行数制限なし === start: 3秒待つよ start: 4秒待つよ start: 1秒待つよ start: 8秒待つよ start: 5秒待つよ finish: 1秒待ったよ finish: 3秒待ったよ finish: 4秒待ったよ finish: 5秒待ったよ finish: 8秒待ったよ === 2並列に制限 === start: 4秒待つよ start: 8秒待つよ finish: 4秒待ったよ start: 3秒待つよ finish: 3秒待ったよ start: 5秒待つよ finish: 8秒待ったよ start: 1秒待つよ finish: 1秒待ったよ finish: 5秒待ったよ === finish ===
説明
並列的に処理できるからって全部一度に実行するばかりだと困ることがあります。
なので、今回は並列数を制限してやってみます。
並列数の制限にはSemaphore
を使います。
これはオブジェクトを作成する際に最大並列数を指定して、同時に実行できる数を制限できます。
並列に実行した処理の結果を受け取る
コード
import asyncio import time import functools import requests def sleep(sec): time.sleep(sec) return sec async def get_global_ip(): loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, requests.get, 'http://inet-ip.info/ip') print('get_ip') return resp.text async def parallel_sleep(seconds): loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, sleep, seconds) print(f'sleep {resp}sec') return resp def main(): loop = asyncio.get_event_loop() gather = asyncio.gather( parallel_sleep(10), get_global_ip(), parallel_sleep(1) ) results = loop.run_until_complete(gather) print(results) if __name__ == '__main__': main()
実行結果
get_ip sleep 1sec sleep 10sec [10, '121.101.70.247', 1]
説明
並列的に処理した結果を使いたいことは多々あると思います。
取得する方法は簡単で
await loop.run_in_executor
の返り値は渡した関数の返り値run_until_complete
で一つだけ実行する場合は、実行したものの返り値をそのまま貰えるrun_until_complete
に複数の処理を渡した場合は、渡した順番で結果が格納された配列を貰える
になります。
関数の引数の指定の方法
コード
import asyncio import time import functools def delayed_print(mes, sec): time.sleep(sec) print(mes) async def call_1(mes, sec): loop = asyncio.get_event_loop() await loop.run_in_executor(None, delayed_print, mes, sec) async def call_2(mes, sec): loop = asyncio.get_event_loop() func = functools.partial(delayed_print, mes, sec) await loop.run_in_executor(None, func) async def call_3(message, seconds): loop = asyncio.get_event_loop() func = functools.partial(delayed_print, mes=message, sec=seconds) await loop.run_in_executor(None, func) def main(): loop = asyncio.get_event_loop() gather = asyncio.gather( call_1('333', 3), call_2('222', 2), call_1('111', 1) ) loop.run_until_complete(gather) if __name__ == '__main__': main()
実行結果
111 222 333
説明
引数を指定する際、いくつか方法があります。
- 順番に引数を渡す場合、
run_in_executor
に関数を渡したあと順番に引数書く - 順番に引数を渡す場合、
functool
モジュールのpartial
に関数を渡したあと順番に引数を書き、partial
の返り値をrun_in_executor
に渡す - 名前付き引数を使う場合、
functool
モジュールのpartial
に関数を渡したあと名前付き引数を書いて、partial
の返り値をrun_in_executor
に渡す
順番に引数を渡すだけならrun_in_executor
だけで事足りるのですが、名前付き引数を使おうとするとfunctool
モジュールのpartial
を使用する必要があります。
まとめ
Pythonで非同期処理を行う方法について簡単にまとめてみました。
ぜひ使ってみてください。