ReactiveXのFlatMapを理解する

ReactiveXの中でも少し複雑なオペレータFlatMapについて説明をしていきます。 これを利用すればより複雑な処理が可能になります。 どんな局面で有効なのか具体例を交えつつ説明します。
2021.08.31

FlatMapについて

FlatMapは「ストリームを生成するような関数を一つのストリームにするような操作」です。

例としてリストで考えてみます。 今回はPythonでサンプルコードを書きますが、基本的にはどの言語でもRxを使っていれば近い書き方になると思います。

ストリームを生成するような関数

これはリストを生成するような関数です。 例として整数を受け取って0からN-1までの値が入ったリストを返す関数を考えます。

to_n

def to_n(n):
    return [i for i in range(n)]

一つのストリームにするような操作です

これは先ほどの関数によって生成されたリストのリストを一つにする操作です。 実装は省きますが次のような機能を期待します。

flatmap

>>> flat_map([1, 2, 3], to_n) 
-> [0, 0, 1, 0, 1, 2]

第1引数はリストでこの各要素を第2引数の関数に適用します。 基本はmapと同じです。

mapの場合は下のようになります。 mapはPython組み込みの関数と同じでリストと関数を受け取って、リストの要素を関数に適用させていきます。

map

>>> map([1, 2, 3], to_n) 
-> [[0], [0, 1], [0, 1, 2]]

mergeという関数を考えてみます。 リストを次々に連結していくような操作です

merge

>>> merge([[0, 1], [2, 3], [4]])
-> [0, 1, 2, 3, 4]

flatmapではリスト内のリストが連結され最終的には1つのリストとして出力されています。 flatmapはmapとmergeの合成関数です。

flatmap

def flat_map(l, f):
    return merge(map(l, f))

つまり、flatmapはリストを生成するような関数を適用しつつ、その連結も行ってくれます。

Rxの話に戻ります。

flatmap.py

from rx import from_
from rx import operators as op


def to_n(n):
    return [i for i in range(n)]


from_([1, 2, 3])\
    .pipe(
        op.flat_map(to_n)
).subscribe(
    on_next=print
)

これを実行すると以下のような結果が得られます。

実行結果

$ python3 flatmap.py 
0
0
1
0
1
2

flat_mapではなくmapを使ってみます。

map.py

from rx import from_
from rx import operators as op


def to_n(n):
    return [i for i in range(n)]


from_([1, 2, 3])\
    .pipe(
        op.map(to_n)
).subscribe(
    on_next=print
)

実行結果は以下のようになります。

実行結果

$ python3 flatmap.py 
[0]
[0, 1]
[0, 1, 2]

リストが返ってきています。 先ほどのflat_mapではリストが平坦化されて続々と流れてきていました。

リストを用いた例はこのくらいにしてもう少し本質的な話をします。

MergeなのになぜFlatten?

Mapは省略して、Mergeについて一度整理します。 Mergeはマーブル図で言うと次のように表されます。 マーブル図の説明についてはこちらの記事をどうぞ

複数のストリームから1つのストリームを生成します。 この時、元のストリームのイベントは新しいストリームの同じ時点で生成されています。 つまり、Mergeは複数のストリームの射影(Projection)を生成しているわけです。

簡単なイメージとしては複数のストリームを重ねる感じです。 これは自分の推測ですが多次元のストリームを1次元に潰しているのでFlattenなんだと思います。 ちなみに、RxJSではmergeMapとも呼ばれています。

先ほどのリストの例のマーブル図を見てみます。

こちらは、各ストリームのイベントの生成されている時間が重なっていないので、あたかも連結(Concat)しているかのように振る舞っています。

公式のコード例のtimerを例に説明します。

timer.py

import concurrent.futures
import time

import rx
from rx import operators as ops

seconds = [5, 1, 2, 4, 3]


def sleep(tm):
    time.sleep(tm)
    return tm


def output(result):
    print('%d seconds' % result)

with concurrent.futures.ProcessPoolExecutor(5) as executor:
    rx.from_(seconds).pipe(
        ops.flat_map(lambda s: executor.submit(sleep, s))
    ).subscribe(output)

実行結果は以下のようです。

実行結果

$ python timer.py 
1 seconds
2 seconds
3 seconds
4 seconds
5 seconds

マルチプロセスでsleepが実行され、値が小さい順から出力されています。(元々のリストは順序がバラバラ) このとき、マーブル図は次のようになります。 先ほどのリストの例と異なり、新しく生成されるストリーム上のイベントの順番が変わりました。 Mergeは複数のストリームの中で早くきた順から新しいストリームに流すような操作ともいえます。

ちなみに、switchMapconcatMapはこの1次元への潰し方が異なります。

ストリームを返すような関数

リストを返すような関数を平坦化するような方法はわかりました。 実際、Rxを使用する上でflat_mapを使用するのはネストしたリストだけではありません。 ストリームを返すような関数を実装する局面について考えてみます。

  • 多重リストなどのネストしたイテレーション
  • 非同期処理
  • GroupByなどの集約系

他にも使える局面はあると思いますが、今回はこのくらいにします。 リストは先ほど説明したので残りの2つを順に説明していきます。

非同期処理

先ほどのスリープの例からもわかるようにFuture/Promiseをストリームとして扱うことができます。 これによって非同期的に解決された処理が早い順に次の処理へ移るといったことが可能になります。

GroupByなどの集約系

GroupByやWindowなどは変換に際してストリームを生成します。 以下では0ー9までの整数を3で割ったあまりでグループに分けています。

groupby.py

from rx import range
from rx import operators as op

range(10)\
    .pipe(
        op.group_by(lambda x: x % 3),
        op.do_action(print),
        op.flat_map(lambda x: x.pipe(op.to_list()))
).subscribe(print)

実行結果

$ python3 group.py 
<rx.core.observable.groupedobservable.GroupedObservable object at 0x102e586a0>
<rx.core.observable.groupedobservable.GroupedObservable object at 0x102e585b0>
<rx.core.observable.groupedobservable.GroupedObservable object at 0x102e588b0>
[0, 3, 6, 9]
[1, 4, 7]
[2, 5, 8]

一度目の出力からわかるように、GroupByはObservable(ストリーム)を返します。 その後、flat_mapでこれらをリストにしてから一つのストリームに変換しています。

まとめ

  • FlatMapを使うことでRx上でより複雑な処理が可能になります。
  • Rxではリストだけでなく、Observableというより広い対象に対して同様の処理を行うこと可能です。

参考