Mercurial > hg > Members > anatofuz > slides
diff slides/2018/20180130/slide.md @ 23:c0ec001d8a28
update
author | Takahiro SHIMIZU <anatofuz@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 05 Apr 2018 11:34:39 +0900 |
parents | slides/20180130/slide.md@80767afba59c |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/slides/2018/20180130/slide.md Thu Apr 05 11:34:39 2018 +0900 @@ -0,0 +1,190 @@ +title: 分散フレームワークakkaの調査 +author: Takahiro Shimizu +profile: +lang: Japanese + + +# 調査目的 +* 先輩の修論の比較材料の為に行う +* 分散フレームワークの1つであるakkaがどのような書き方、及び処理なのかを調査する + + +# 今週の進捗 +* ゼミ資料良い感じに作れる便利スクリプト作ってました +* akkaでインクリメトを行うプログラムを書いてました +* コンパイラ構成論も進めてました + +# akkaのインクリメント + +* akkaのインスタンスはprops経由で作成する +* 今回は1対1の通信なのでprops内には出力用のみ用意しました +* `system.terminate()` しないとsystemが動いたままになってしまう + +```scala +package IncrementSample + + +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, TypedActor} + + +object Incrementer { + def props(printerActor:ActorRef): Props = Props(new Incrementer(printerActor)) + + final case class SendData(data:Int) + final case class SetActor(actor:ActorRef) +} + +class Incrementer(printerActor: ActorRef) extends Actor { + import Incrementer._ + import Printer._ + + + var otherActor = ActorRef.noSender + val MAX_SIZE = 10 + + def receive = { + case SendData(data) => + printerActor ! Println(data.toString) + if (data < MAX_SIZE) { + val send_data = data + 1 + otherActor ! SendData(send_data) + } + case SetActor(actor :ActorRef) => + otherActor = actor + } +} + +object Printer { + def props: Props = Props[Printer] + + final case class Println(message: String) +} + +class Printer extends Actor with ActorLogging { + import Printer._ + + def receive = { + case Println(message) => + log.info(s"Printer received (from ${sender()} ): $message") + } +} + +object IncrementSample extends App { + + import Incrementer._ + + + val system: ActorSystem = ActorSystem("incrementSample") + + val printer: ActorRef = system.actorOf(Printer.props,"printerActor") + + val actorOne: ActorRef = system.actorOf(Incrementer.props(printer),"ActorOne") + val actorTwo: ActorRef = system.actorOf(Incrementer.props(printer),"ActorTwo") + + actorOne ! SetActor(actorTwo) + actorTwo ! SetActor(actorOne) + + val FIRST_DATA = 1 + + actorOne ! SendData(FIRST_DATA) + system.terminate() +} + +``` + + +# Incrementer + +* ActorRefの型の初期値が不明なので `ActorRef.noSender` でごまかす +* 別Actorに処理を投げる際は `otherActor ! Method` な書き方をする + * この際に `otherActor ! receive(Method)` と書くと自分自身を呼び出してしまう + +```scala +object Incrementer { + def props(printerActor:ActorRef): Props = Props(new Incrementer(printerActor)) + + final case class SendData(data:Int) + final case class SetActor(actor:ActorRef) +} + +class Incrementer(printerActor: ActorRef) extends Actor { + import Incrementer._ + import Printer._ + + + var otherActor = ActorRef.noSender + val MAX_SIZE = 10 + + def receive = { + case SendData(data) => + printerActor ! Println(data.toString) + if (data < MAX_SIZE) { + val send_data = data + 1 + otherActor ! SendData(send_data) + } + case SetActor(actor :ActorRef) => + otherActor = actor + } +} +``` + +# 実行結果 + +``` +/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=65202:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/e155730/Canossa/working/cr/akka/increment-sample/target/scala-2.12/classes:/Users/e155730/.sbt/boot/scala-2.12.4/lib/scala-library.jar:/Users/e155730/.ivy2/cache/org.scala-lang.modules/scala-java8-compat_2.12/bundles/scala-java8-compat_2.12-0.8.0.jar:/Users/e155730/.ivy2/cache/com.typesafe.akka/akka-testkit_2.12/jars/akka-testkit_2.12-2.5.3.jar:/Users/e155730/.ivy2/cache/com.typesafe.akka/akka-actor_2.12/jars/akka-actor_2.12-2.5.3.jar:/Users/e155730/.ivy2/cache/com.typesafe/config/bundles/config-1.3.1.jar IncrementSample.IncrementSample +[INFO] [01/29/2018 16:45:05.275] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 3 +[INFO] [01/29/2018 16:45:05.276] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 2 +[INFO] [01/29/2018 16:45:05.276] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 4 +[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 5 +[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 6 +[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 7 +[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 8 +[INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 9 +[INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 10 +[INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): fin +``` + + +# テスト + +* Akkaの提供するTestKitライブラリを主に利用する +* actorを管理する `system` はTestKitがラップしたものを利用する +* またTestKitを利用するとシングルスレッドでakkaが処理を行うようになる +* Scalaテストは慣例でファイル名に `Specs`を含める(Scalaのテストフレームワークに由来) +* 出力テストを行う場合、出力を受け取るようのアクターを `TestPropbe()` で置き換えれば可能 + +``` + val printer: ActorRef = system.actorOf(Printer.props,"printerActor") +``` + +``` +val testProbe = TestProbe() +``` + +# Actorテスト + +* Actorの内部にアクセスしたい場合 `system.actorOf(Actor.props,"name)` の書き方ではなく +`TestActorRef()` にする +* TestActorでラップするとActor内部の値を参照できるらしい(出来なかった) + + + +``` + val actorOne: ActorRef = system.actorOf(Incrementer.props(printer),"ActorOne") + val actorTwo: ActorRef = system.actorOf(Incrementer.props(printer),"ActorTwo") +``` + +``` + val actorOne = TestActorRef(Incrementer.props(testProbe.ref),"ActorOne") + val actorTwo = TestActorRef(Incrementer.props(testProbe.ref),"ActorTwo") +``` + +# akkaのトポロジーサポート + +- 大城くんに投げてます +- Graphと呼ばれる構造なら出来そうな気はしますが、StackOverflowでは無理と言われていた + +# akkaのデータ圧縮 + +* akkaのstreamの[akka.stream.scaladsl.Compression](https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html?language=scala#dealing-with-compressed-data-streams)でデータ圧縮が可能 +* scala版の[API](https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Compression$.html)