[Kotlin]コルーチンのChannelのハマり所

コルーチンに Channel があります。Queueのようなもので、メッセージの受け渡しができます。

とっても便利なんですが、今までと違うパラダイムなのでハマる可能性があります。

例:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>() // <- receiveされていないメッセージが何個でもOK
    launch(CommonPool) {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

実行結果

1
4
9
16
25
Done!

Kotlin/kotlinx.coroutinesより

launch(CommonPool)で別スレッドで channel.send でメッセージを送っています。sendされたメッセージは、 channel.receive() で受信しています。

注意:非同期ではなく中断であること

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>() 
    repeat(5) { println(channel.receive()) } //sendとreceiveを反対にした
    launch(CommonPool) {
        for (x in 1..5) channel.send(x * x)
    }

    println("Done!")
}

実行結果

(ずっと処理中で返ってこない)

実行結果は、何も起こらず止まっていることでしょう。

その原因は repeat(5) { println(channel.receive()) } にあります。5つメッセージくるまで処理が中断されるためです。つまり、それ以降処理がされないためずっと待ち続けます。

非同期の場合は、その時5つなければ、無視したり、今ある分だけ処理したり、いずれにせよ処理が途中で中断することはありません。channelはsendされるまでそのスレッド(context)を中断し、sendの後、処理が再開しreceiveが5つできた時にforから抜けます。

中断非同期 を同じように考えているとハマります。

これを解決するために、receive()するcontextをメインから別のスレッド(context)にします

fun main(args: Array<String>) = runBlocking<Unit> {
        val channel = Channel<Int>(1)
        launch(CommonPool) { //別のスレッド(context)で待つ
            repeat(5) {
                println(channel.receive())
                delay(200) //200msの待ちをいれる(重たい処理)
            }
        }
        launch(CommonPool) {
            for (x in 1..5) channel.send(x * x)
        }
        println("Done!")
}

実行結果

Done!

receive() ができるまでの間中断されますが CommonPool (newスレッド)で中断するため runBlocking が中断されることはありません。めでたし?

しかし、残念なことに実行結果はDoneだけです、今度は receive() がrunBlockingのスレッドで実行していないので、中断してDoneまでいってしまいました。

処理が終わるまで待ってあげる必要があります。

fun main(args: Array<String>) = runBlocking<Unit> {
        val channel = Channel<Int>(1)
        val job = launch(CommonPool) { //Jobの返り値をもらう
            repeat(5) {
                println(channel.receive())
                delay(200) //200msの待ちをいれる
            }
        }
        launch(CommonPool) {
            for (x in 1..5) channel.send(x * x)
        }
        job.join() //receive()が終わるまで待つ(中断)
        println("Done!")
}

実行結果

1
4
9
16
25
Done!

参考文献