Scalaで並行処理#1 – Scala標準のActorを使う

2012.01.19

はじめに

Javaの場合、並行処理を行うためにはThreadクラスを直接使用したりConcurrent APIを使用すると思います。
これに加えてScalaではActorモデル ※1を用いて並行処理を行うためのライブラリを標準でもっています。
簡単にいえばActorとは、メッセージを受けてそれに応じて処理するオブジェクトです。
今回はScalaのActorを使用してサンプルをつくってみます。

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.7.2
  • Java : 1.6.0_26
  • Scala : 2.9.1 final
  • Eclipse : 3.7

Actorの基本

基本的なActorを用いたプログラミングサンプルを作成してみます。

シンプルなActor

Scalaのアクターはscala.actors.Actorクラスを継承し、actメソッドを実装します。
下記コードをREPL上で入力してみてください。

import scala.actors.Actor
import scala.actors.Actor._

class MyAct extends Actor {
  def act = {
    println("Hello MyAct")
  }
}

クラスが定義されました。
Actorを開始するには、クラスをインスタンス化してstartメソッドを実行します。

scala> val ma = new MyAct
ma: MyAct = MyAct@4f35ecf1

scala> ma.start
Hello MyAct
res1: scala.actors.Actor = MyAct@ac13b15

actメソッドが実行され、コンソールに文字が出力されます。
Actorの生成はもうひとつ方法があります。それはscala.actors.Actorsのファクトリメソッドを使用する方法です。

val ma = actor {
   println("Hello MyAct by factory")
 }

上記コードをREPLで実行すると、そのままActorが開始されます。

Hello MyAct by factory
ma: scala.actors.Actor = scala.actors.Actor$$anon$1@50987e1e

メッセージパッシング

Actorの重要な機能として、メッセージパッシングがあります。テキストでも独自クラスでも、どんなオブジェクトでも受けとることが可能です。
Actorはそれぞれ受けとったメッセージをキューに入れておくためのmailboxをもっており、情報はすべてメッセージでやりとりします。
メッセージをうけとったときに処理を記述するためのメソッドは下記のものがあります。

  • receive:マッチした型のメッセージを受け取るまでブロックする。結果を返す事が可能
  • receiveWithin:receiveと同じだが、指定時間をすぎるとブロックを解除
  • react:結果は返さない。reveiveより速い
  • reactWithin:receiveと同じ。指定時間をすぎるとブロックを解除

いくつか試してみましょう。まずはreceiveです。receiveは常にスレッドを使用するのでパフォーマンスがいまいちですが、処理結果を返すことが可能です。REPL上で実行してみます。

//アクター定義
import scala.actors.Actor
import scala.actors.Actor._
val myActor = actor {
  val rRes = receive {
    case i:Int =>
      println("receive " + i)
      "Int:" + i
  }
  println("receive result " + rRes)
}
//!でメッセージ送信
scala> myActor !  1
receive 1
receive result Int:1

receiveでかこったブロックがメッセージ受信時に行う処理です。パターンマッチを使用してInt型が来た場合の処理を記述しています。
メッセージを送信するには「<アクター> ! <メッセージ> 」形式でメッセージパッシングを行なっています。
!は非同期でメッセージパッシングを行います。!?を使用すると同期メッセージを送ることができます。

次はreactです。これはメッセージ処理中のみスレッドを使用するため、receiveよりパフォーマンスが良いです。
しかし、結果を返せないという欠点があります。

import scala.actors.Actor
import scala.actors.Actor._
val myActor = actor {
  react {
    case i:Int => println("react " + i)
  }
}
scala> myActor !  1
react 1

loopを使えば受信を繰り返すことができます。

import scala.actors.Actor
import scala.actors.Actor._
val myActor = actor {
  loop {
    react {
      case i:Int => println("loop react " + i)
    }
  }
}
scala>myActor !  1
scala>myActor !  2

メッセージ返信

処理をうけとったら、メッセージ送信元へ返信することも可能です。その際にはreplyを使用します。

import scala.actors.Actor
import scala.actors.Actor._
val myActor = actor {
  loop {
    react {
      case i:Int =>
        println("react " + i)
        reply(i * 2)
    }
  }
}
scala> val result = myActor !?  1
react 1
result: Any = 2

メッセージ送信方法

先ほど!で非同期送信、!?で同期送信と説明しましたが、もう1つ、!!というメソッドがあります。
これは非同期で送信を行いますが、同時にコールバックをわたして処理を行うことができます。

scala> val x = myActor !! (10,{ case i:Int => println("res=" + i)}) 
x: myActor.Future[Unit] = <function0>
react 10
res=20

まとめ

Actorの基本のまとめです。

  • actorメソッドで処理内容を設定するとActorがインスタンス化され、startが実行される
  • ! メソッドで非同期メッセージ送信、!? メソッドでメッセージ送信後に結果の取得を待機(同期)
  • !!メソッドは非同期で送信し、コールバックを実行できる。
  • receive や react メソッドにメッセージ受信後の処理内容を設定
  • reply メソッドでメッセージの送信元に返信
  • loop メソッドで処理内容を繰り返す
  • receiveのかわりにreactを使用することでパフォーマンスが向上する
  • reactはメッセージを処理しても制御を返さない

今回はActorの基本的なところをご紹介しました。
これを踏まえた上で、次回Akkaについてご紹介する予定です。

参考サイトなど