view slides/2018/01/30/slide.md @ 52:73b27e5c1d79 default tip

auto-Update generated slides by script
author Takahiro SHIMIZU <anatofuz@cr.ie.u-ryukyu.ac.jp>
date Tue, 16 Apr 2019 18:58:24 +0900
parents 2e1724369e51
children
line wrap: on
line source

title: 分散フレームワークakkaの調査
author: Takahiro Shimizu
profile:
lang: Japanese


# 調査目的
* 先輩の修論の比較材料の為に行う
* 分散フレームワークの1つであるakkaがどのような書き方、及び処理なのかを調査する


# 今週の進捗
* ゼミ資料良い感じに作れる便利スクリプト作ってました
* akkaでインクリメトを行うプログラムを書いてました
* コンパイラ構成論も進めてました

# akkaのインクリメント

* akkaのインスタンスはprops経由で作成する
* 今回は1対1の通信なのでprops内には出力用のみ用意しました
* `system.terminate()` しないとsystemが動いたままになってしまう

```scala
package IncrementSample


import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, TypedActor}


object Incrementer {
  def props(printerActor:ActorRef): Props = Props(new Incrementer(printerActor))

  final case class SendData(data:Int)
  final case class SetActor(actor:ActorRef)
}

class Incrementer(printerActor: ActorRef) extends Actor {
  import  Incrementer._
  import  Printer._


  var otherActor = ActorRef.noSender
  val MAX_SIZE = 10

  def receive = {
    case SendData(data) =>
        printerActor ! Println(data.toString)
        if (data < MAX_SIZE) {
          val send_data = data + 1
          otherActor ! SendData(send_data)
        }
    case SetActor(actor :ActorRef) =>
      otherActor = actor
  }
}

object Printer {
  def props: Props = Props[Printer]

  final case class Println(message: String)
}

class Printer extends  Actor with ActorLogging {
  import Printer._

  def receive = {
    case Println(message) =>
      log.info(s"Printer received (from ${sender()} ): $message")
  }
}

object IncrementSample extends App {

  import Incrementer._


  val system: ActorSystem = ActorSystem("incrementSample")

  val printer: ActorRef = system.actorOf(Printer.props,"printerActor")

  val actorOne: ActorRef = system.actorOf(Incrementer.props(printer),"ActorOne")
  val actorTwo: ActorRef = system.actorOf(Incrementer.props(printer),"ActorTwo")

  actorOne ! SetActor(actorTwo)
  actorTwo ! SetActor(actorOne)

  val FIRST_DATA = 1

  actorOne ! SendData(FIRST_DATA)
  system.terminate()
}

```


# Incrementer

* ActorRefの型の初期値が不明なので `ActorRef.noSender` でごまかす
* 別Actorに処理を投げる際は `otherActor ! Method` な書き方をする
    * この際に `otherActor ! receive(Method)` と書くと自分自身を呼び出してしまう

```scala
object Incrementer {
  def props(printerActor:ActorRef): Props = Props(new Incrementer(printerActor))

  final case class SendData(data:Int)
  final case class SetActor(actor:ActorRef)
}

class Incrementer(printerActor: ActorRef) extends Actor {
  import  Incrementer._
  import  Printer._


  var otherActor = ActorRef.noSender
  val MAX_SIZE = 10

  def receive = {
    case SendData(data) =>
        printerActor ! Println(data.toString)
        if (data < MAX_SIZE) {
          val send_data = data + 1
          otherActor ! SendData(send_data)
        }
    case SetActor(actor :ActorRef) =>
      otherActor = actor
  }
}
```

# 実行結果

```
/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=65202:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/e155730/Canossa/working/cr/akka/increment-sample/target/scala-2.12/classes:/Users/e155730/.sbt/boot/scala-2.12.4/lib/scala-library.jar:/Users/e155730/.ivy2/cache/org.scala-lang.modules/scala-java8-compat_2.12/bundles/scala-java8-compat_2.12-0.8.0.jar:/Users/e155730/.ivy2/cache/com.typesafe.akka/akka-testkit_2.12/jars/akka-testkit_2.12-2.5.3.jar:/Users/e155730/.ivy2/cache/com.typesafe.akka/akka-actor_2.12/jars/akka-actor_2.12-2.5.3.jar:/Users/e155730/.ivy2/cache/com.typesafe/config/bundles/config-1.3.1.jar IncrementSample.IncrementSample
[INFO] [01/29/2018 16:45:05.275] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 3
[INFO] [01/29/2018 16:45:05.276] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 2
[INFO] [01/29/2018 16:45:05.276] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 4
[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 5
[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 6
[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 7
[INFO] [01/29/2018 16:45:05.277] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 8
[INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): 9
[INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/FooActor#-1325429432] ): 10
[INFO] [01/29/2018 16:45:05.278] [incrementSample-akka.actor.default-dispatcher-5] [akka://incrementSample/user/printerActor] Printer received (from Actor[akka://incrementSample/user/HogeActor#-884834315] ): fin
```


# テスト

* Akkaの提供するTestKitライブラリを主に利用する
* actorを管理する `system` はTestKitがラップしたものを利用する
* またTestKitを利用するとシングルスレッドでakkaが処理を行うようになる
* Scalaテストは慣例でファイル名に `Specs`を含める(Scalaのテストフレームワークに由来)
* 出力テストを行う場合、出力を受け取るようのアクターを `TestPropbe()` で置き換えれば可能

```
  val printer: ActorRef = system.actorOf(Printer.props,"printerActor")
```

```
val testProbe = TestProbe()
```

# Actorテスト

* Actorの内部にアクセスしたい場合 `system.actorOf(Actor.props,"name)` の書き方ではなく
`TestActorRef()` にする
* TestActorでラップするとActor内部の値を参照できるらしい(出来なかった)



```
  val actorOne: ActorRef = system.actorOf(Incrementer.props(printer),"ActorOne")
  val actorTwo: ActorRef = system.actorOf(Incrementer.props(printer),"ActorTwo")
```

```
    val actorOne = TestActorRef(Incrementer.props(testProbe.ref),"ActorOne")
    val actorTwo = TestActorRef(Incrementer.props(testProbe.ref),"ActorTwo")
```

# akkaのトポロジーサポート

- 大城くんに投げてます
- Graphと呼ばれる構造なら出来そうな気はしますが、StackOverflowでは無理と言われていた

# akkaのデータ圧縮

* akkaのstreamの[akka.stream.scaladsl.Compression](https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html?language=scala#dealing-with-compressed-data-streams)でデータ圧縮が可能
* scala版の[API](https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Compression$.html)