Pythonでリアクティブプログラミングしてみる

Reactive ExtensionsのPython実装であるRxPyを使ってリアクティブプログラミングに入門してみます。
2021.08.18

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

リアクティブプログラミング

WikipediaによればReactive Programmingとは以下のようなものらしいです。

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

リアクティブプログラミングはデータストリームと変更の伝播に重きをおいた宣言的なプログラミングパラダイムのことです。

こんな説明もあります。

この「リアクティブプログラミング」(以下、RP)とは、“時間とともに変化していくデータ”(ストリーム)同士の関連性と操作を、“宣言”的に記述するプログラミングの手法です。

この記事ではデータ列の処理に重点を置いたプログラミングの流派の一つだと思ってもらえれば大丈夫です。
この手法をとることでモジュール性が高くなり、また状態の管理が楽になると考えられています。

今回紹介するReactive Extensionsはこれを実現しやすくする便利なライブラリです。

Reactive Extensions とは

Reactive Extensions, ReactiveX, Rx と呼ばれるような物についてまずは解説します。(以下 Rx) Reactive Extensions とはこのページによれば以下のようなものです。

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

Rx はオブザーバブル列を用いて非同期でイベントベースなプログラミングをするためのライブラリのです。

これだけの説明だと何を行っているのか分からないと思うので、もう少し噛み砕いて説明します。

前提としてあるのはRxはライブラリということです。 データフローをコントロールするのに便利な物が集まったライブラリで、様々な言語で実装されています。 関数の抽象度が絶妙な塩梅で、様々な用途に使うことができます。 クリックやボタンなどのイベントを扱うこともできますし、HTTP サーバーなども作れると思います。 基本的な関数の使い方を覚えてしまえばどの言語でも似たようなコードで処理を書くことが可能です。

Rx を用いることでデータストリームの取り扱いが行いやすくなります。 様々なイベントをデータストリームとして扱うことができ、それを変換したりフィルタしたりすることでプログラミングを行うことができます。 イメージとしては List に対する Map や Fillter を用いてプログラミングをしていく感じです。 時間に沿って発生しうるクリックなどのイベントをリストのように扱い、関数のチェーンで処理をしていく感じです。

また、オブザーバーパターンを用いているので非同期処理がしやすくなっています。 基本的には非同期なイベントが発火した際にストリームにデータが流れ、処理が行われます。 非同期処理の完了後に呼ばれるコールバック関数のようなイメージです。 これにより非同期処理を扱いやすくなります。

Rx はパイプラインの用にデータを処理していくための便利系ライブラリって感じの理解でこの記事では問題ありません。

Python でつかってみる

Rx の Python 実装であるRxPyを軽く使ってみます。

Pipなら以下のコマンドでインストールできます。

インストール

pip3 install rx

偶数を数える

10未満の整数のうち2で割り切れるものの個数を数えてみます。

count.py

from rx import range
from rx import operators as op

stream = range(10)

stream.pipe(
    op.filter(lambda x: x % 2 == 0),
    op.count(),
).subscribe(
    on_next=print,
    on_completed=lambda: print('Done!')
)

関数のチェーンで処理を書くことができました。 pipeで関数をチェーンさせることができます。

データの変換の流れがわかりやすく記述できていると思います。 For文などのループで書くこともできますが、処理が関数になることでテストしやすいコードになると思います。 特に複雑な処理ではこの恩恵は大きいと思います。

処理の流れは以下のようです。

  1. 0-9までの整数を生成
  2. 2で割ったあまりが0になるものだけをフィルタリング
  3. 個数を数える

実行結果

$ python count.py 
5
Done!

マルチスレッド

マルチスレッドで郵便番号から都道府県を調べてみます。

zipcode.py

import json
from urllib.request import urlopen
from rx import from_
from rx import operators as op
from rx.scheduler import ThreadPoolScheduler

ENDPOINT = "http://zipcloud.ibsnet.co.jp/api"
code_list = [
    "0700001",
    "7070403",
    "9051308",
]

stream = from_(code_list)
pool_scheduler = ThreadPoolScheduler(3)

stream.pipe(
    op.do_action(lambda x: print(f'郵便番号: {x}')),
    op.map(lambda r: f"{ENDPOINT}/search?zipcode={r}"),
    op.observe_on(pool_scheduler),
    op.map(urlopen),
    op.map(json.load),
    op.map(lambda r: r["results"][0]["address1"]),
).subscribe(
    on_next=lambda x: print(f"都道府県: {x}"),
)

input()

最後のinput()は完了待ちのために入れています。(これ無しでもかけますが簡略化のために今回はこれで)
ハイライト部分のコードがマルチスレッド化しています。これより後の処理は全て別スレッドで行われます。 処理に1行入れるだけでマルチスレッド化することが可能です。
各スレッドでの実行もスケジューラ(ThreadPoolScheduler)が管理してくれます。 他にも様々なスケジューラがあり、実行の方法を柔軟に変えることができます。

処理の流れは以下のようです。

  1. 郵便番号3つが流れるストリームを作成
  2. URLに変換
  3. マルチスレッドで実行するようにする(これ以下は別のスレッドで行われる)
  4. urlopenでリクエストを送る
  5. レスポンスをJSONとしてデコード
  6. 都道府県のプロパティを取得

mapを連続して使用することで
郵便番号->URL->HTTP レスポンス->dict->都道府県
という流れが分かりやすく記述できていると思います。

実行結果

郵便番号: 0700001
郵便番号: 7070403
郵便番号: 9051308
都道府県: 北海道
都道府県: 岡山県
都道府県: 沖縄県

最初に郵便番号が全て表示されてから、リクエストが送られています。 リクエストは別スレッドで行われているのでその間にURLへの変換などが行われていることがわかります。

以下はメインスレッドのみで実行した結果です。 20行目のop.observe_on(pool_scheduler),をコメントアウトするとシングルスレッドになります。

シングルスレッド

郵便番号: 0700001
都道府県: 北海道
郵便番号: 7070403
都道府県: 岡山県
郵便番号: 9051308
都道府県: 沖縄県

郵便番号と都道府県が交互に表示されています。ストリームはスレッド内で逐次処理さていくのでこういった結果になります。

おわりに

RxPyを使うことでデータフローをわかりやすくコントロールできます。 同時性の複雑な処理についても比較的低コストで実装できると思います。 今回はごく浅い部分しか紹介できませんでしたが、機会があればより詳しく紹介できればと思います。