ReactiveXのFlatMapを理解する
FlatMapについて
FlatMapは「ストリームを生成するような関数を一つのストリームにするような操作」です。
例としてリストで考えてみます。 今回はPythonでサンプルコードを書きますが、基本的にはどの言語でもRxを使っていれば近い書き方になると思います。
ストリームを生成するような関数
これはリストを生成するような関数です。 例として整数を受け取って0からN-1までの値が入ったリストを返す関数を考えます。
def to_n(n): return [i for i in range(n)]
一つのストリームにするような操作です
これは先ほどの関数によって生成されたリストのリストを一つにする操作です。 実装は省きますが次のような機能を期待します。
>>> flat_map([1, 2, 3], to_n) -> [0, 0, 1, 0, 1, 2]
第1引数はリストでこの各要素を第2引数の関数に適用します。 基本はmapと同じです。
mapの場合は下のようになります。 mapはPython組み込みの関数と同じでリストと関数を受け取って、リストの要素を関数に適用させていきます。
>>> map([1, 2, 3], to_n) -> [[0], [0, 1], [0, 1, 2]]
mergeという関数を考えてみます。 リストを次々に連結していくような操作です
>>> merge([[0, 1], [2, 3], [4]]) -> [0, 1, 2, 3, 4]
flatmapではリスト内のリストが連結され最終的には1つのリストとして出力されています。 flatmapはmapとmergeの合成関数です。
def flat_map(l, f): return merge(map(l, f))
つまり、flatmapはリストを生成するような関数を適用しつつ、その連結も行ってくれます。
Rxの話に戻ります。
from rx import from_ from rx import operators as op def to_n(n): return [i for i in range(n)] from_([1, 2, 3])\ .pipe( op.flat_map(to_n) ).subscribe( on_next=print )
これを実行すると以下のような結果が得られます。
$ python3 flatmap.py 0 0 1 0 1 2
flat_map
ではなくmap
を使ってみます。
from rx import from_ from rx import operators as op def to_n(n): return [i for i in range(n)] from_([1, 2, 3])\ .pipe( op.map(to_n) ).subscribe( on_next=print )
実行結果は以下のようになります。
$ python3 flatmap.py [0] [0, 1] [0, 1, 2]
リストが返ってきています。
先ほどのflat_map
ではリストが平坦化されて続々と流れてきていました。
リストを用いた例はこのくらいにしてもう少し本質的な話をします。
MergeなのになぜFlatten?
Mapは省略して、Mergeについて一度整理します。 Mergeはマーブル図で言うと次のように表されます。 マーブル図の説明についてはこちらの記事をどうぞ
複数のストリームから1つのストリームを生成します。 この時、元のストリームのイベントは新しいストリームの同じ時点で生成されています。 つまり、Mergeは複数のストリームの射影(Projection)を生成しているわけです。
簡単なイメージとしては複数のストリームを重ねる感じです。
これは自分の推測ですが多次元のストリームを1次元に潰しているのでFlattenなんだと思います。
ちなみに、RxJSではmergeMap
とも呼ばれています。
こちらは、各ストリームのイベントの生成されている時間が重なっていないので、あたかも連結(Concat)しているかのように振る舞っています。
公式のコード例のtimerを例に説明します。
import concurrent.futures import time import rx from rx import operators as ops seconds = [5, 1, 2, 4, 3] def sleep(tm): time.sleep(tm) return tm def output(result): print('%d seconds' % result) with concurrent.futures.ProcessPoolExecutor(5) as executor: rx.from_(seconds).pipe( ops.flat_map(lambda s: executor.submit(sleep, s)) ).subscribe(output)
実行結果は以下のようです。
$ python timer.py 1 seconds 2 seconds 3 seconds 4 seconds 5 seconds
マルチプロセスでsleep
が実行され、値が小さい順から出力されています。(元々のリストは順序がバラバラ)
このとき、マーブル図は次のようになります。
先ほどのリストの例と異なり、新しく生成されるストリーム上のイベントの順番が変わりました。
Mergeは複数のストリームの中で早くきた順から新しいストリームに流すような操作ともいえます。
ちなみに、switchMap
やconcatMap
はこの1次元への潰し方が異なります。
ストリームを返すような関数
リストを返すような関数を平坦化するような方法はわかりました。
実際、Rxを使用する上でflat_map
を使用するのはネストしたリストだけではありません。
ストリームを返すような関数を実装する局面について考えてみます。
- 多重リストなどのネストしたイテレーション
- 非同期処理
- GroupByなどの集約系
他にも使える局面はあると思いますが、今回はこのくらいにします。 リストは先ほど説明したので残りの2つを順に説明していきます。
非同期処理
先ほどのスリープの例からもわかるようにFuture/Promiseをストリームとして扱うことができます。 これによって非同期的に解決された処理が早い順に次の処理へ移るといったことが可能になります。
GroupByなどの集約系
GroupByやWindowなどは変換に際してストリームを生成します。 以下では0ー9までの整数を3で割ったあまりでグループに分けています。
from rx import range from rx import operators as op range(10)\ .pipe( op.group_by(lambda x: x % 3), op.do_action(print), op.flat_map(lambda x: x.pipe(op.to_list())) ).subscribe(print)
$ python3 group.py <rx.core.observable.groupedobservable.GroupedObservable object at 0x102e586a0> <rx.core.observable.groupedobservable.GroupedObservable object at 0x102e585b0> <rx.core.observable.groupedobservable.GroupedObservable object at 0x102e588b0> [0, 3, 6, 9] [1, 4, 7] [2, 5, 8]
一度目の出力からわかるように、GroupByはObservable(ストリーム)を返します。
その後、flat_map
でこれらをリストにしてから一つのストリームに変換しています。
まとめ
- FlatMapを使うことでRx上でより複雑な処理が可能になります。
- Rxではリストだけでなく、Observableというより広い対象に対して同様の処理を行うこと可能です。