title: 分散フレームワークakkaの調査 author: Takahiro Shimizu profile: lang: Japanese # 調査目的 * 先輩の修論の比較材料の為に行う * 分散フレームワークの1つであるakkaがどのような書き方、及び処理なのかを調査する # 今週の進捗 * ゼミ資料良い感じに作れる便利スクリプト作ってました * akkaでインクリメトを行うプログラムを書いてました * コンパイラ構成論も進めてました # akkaのインクリメント * akkaのインスタンスはprops経由で作成する * 今回は1対1の通信なのでprops内には出力用のみ用意しました * `system.terminate()` しないとsystemが動いたままになってしまう ```scala package IncrementSample import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, TypedActor} object Incrementer { def props(printerActor:ActorRef): Props = Props(new Incrementer(printerActor)) final case class SendData(data:Int) final case class SetActor(actor:ActorRef) } class Incrementer(printerActor: ActorRef) extends Actor { import Incrementer._ import Printer._ var otherActor = ActorRef.noSender val MAX_SIZE = 10 def receive = { case SendData(data) => printerActor ! Println(data.toString) if (data < MAX_SIZE) { val send_data = data + 1 otherActor ! SendData(send_data) } case SetActor(actor :ActorRef) => otherActor = actor } } object Printer { def props: Props = Props[Printer] final case class Println(message: String) } class Printer extends Actor with ActorLogging { import Printer._ def receive = { case Println(message) => log.info(s"Printer received (from ${sender()} ): $message") } } object IncrementSample extends App { import Incrementer._ val system: ActorSystem = ActorSystem("incrementSample") val printer: ActorRef = system.actorOf(Printer.props,"printerActor") val actorOne: ActorRef = system.actorOf(Incrementer.props(printer),"ActorOne") val actorTwo: ActorRef = system.actorOf(Incrementer.props(printer),"ActorTwo") actorOne ! SetActor(actorTwo) actorTwo ! SetActor(actorOne) val FIRST_DATA = 1 actorOne ! SendData(FIRST_DATA) system.terminate() } ``` # Incrementer * ActorRefの型の初期値が不明なので `ActorRef.noSender` でごまかす * 別Actorに処理を投げる際は `otherActor ! Method` な書き方をする * この際に `otherActor ! receive(Method)` と書くと自分自身を呼び出してしまう ```scala object Incrementer { def props(printerActor:ActorRef): Props = Props(new Incrementer(printerActor)) final case class SendData(data:Int) final case class SetActor(actor:ActorRef) } class Incrementer(printerActor: ActorRef) extends Actor { import Incrementer._ import Printer._ var otherActor = ActorRef.noSender val MAX_SIZE = 10 def receive = { case SendData(data) => printerActor ! Println(data.toString) if (data < MAX_SIZE) { val send_data = data + 1 otherActor ! SendData(send_data) } case SetActor(actor :ActorRef) => otherActor = actor } } ``` # 実行結果 ``` /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=65202:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/e155730/Canossa/working/cr/akka/increment-sample/target/scala-2.12/classes:/Users/e155730/.sbt/boot/scala-2.12.4/lib/scala-library.jar:/Users/e155730/.ivy2/cache/org.scala-lang.modules/scala-java8-compat_2.12/bundles/scala-java8-compat_2.12-0.8.0.jar:/Users/e155730/.ivy2/cache/com.typesafe.akka/akka-testkit_2.12/jars/akka-testkit_2.12-2.5.3.jar:/Users/e155730/.ivy2/cache/com.typesafe.akka/akka-actor_2.12/jars/akka-actor_2.12-2.5.3.jar:/Users/e155730/.ivy2/cache/com.typesafe/config/bundles/config-1.3.1.jar IncrementSample.IncrementSample [INFO] [01/29/2018 16:45:05.275] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 3 [INFO] [01/29/2018 16:45:05.276] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 2 [INFO] [01/29/2018 16:45:05.276] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 4 [INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 5 [INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 6 [INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 7 [INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 8 [INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 9 [INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 10 [INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): fin ``` # テスト * Akkaの提供するTestKitライブラリを主に利用する * actorを管理する `system` はTestKitがラップしたものを利用する * またTestKitを利用するとシングルスレッドでakkaが処理を行うようになる * Scalaテストは慣例でファイル名に `Specs`を含める(Scalaのテストフレームワークに由来) * 出力テストを行う場合、出力を受け取るようのアクターを `TestPropbe()` で置き換えれば可能 ``` val printer: ActorRef = system.actorOf(Printer.props,"printerActor") ``` ``` val testProbe = TestProbe() ``` # Actorテスト * Actorの内部にアクセスしたい場合 `system.actorOf(Actor.props,"name)` の書き方ではなく `TestActorRef()` にする * TestActorでラップするとActor内部の値を参照できるらしい(出来なかった) ``` val actorOne: ActorRef = system.actorOf(Incrementer.props(printer),"ActorOne") val actorTwo: ActorRef = system.actorOf(Incrementer.props(printer),"ActorTwo") ``` ``` val actorOne = TestActorRef(Incrementer.props(testProbe.ref),"ActorOne") val actorTwo = TestActorRef(Incrementer.props(testProbe.ref),"ActorTwo") ``` # akkaのトポロジーサポート - 大城くんに投げてます - Graphと呼ばれる構造なら出来そうな気はしますが、StackOverflowでは無理と言われていた # akkaのデータ圧縮 * akkaのstreamの[akka.stream.scaladsl.Compression](https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html?language=scala#dealing-with-compressed-data-streams)でデータ圧縮が可能 * scala版の[API](https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Compression$.html)