新人SEの学習記録

14年度入社SEの学習記録用に始めたブログです。気づけば社会人3年目に突入。

学習記録:ドワンゴ 新人向けScala研修テキスト

17. Future/Promiseについて

非同期プログラミングにおいて,終了しているかどうかわからない処理結果を抽象化した型で,Futureは未来の結果を表し,Promiseは一度だけ,成功あるいは失敗を表す処理または値を設定することでFutureに変換できる型になる。

Futureとは

非同期に処理される結果が入ったOption型のようなもので,その処理が終わっているか(isCompleted)や正しく終わった時/例外が起こった時の処理を適用する(onSuccess/onFailure)などが可能。

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> object FutureSample extends App {
     |   val s = "Hello"
     |   val f: Future[String] = Future {
     |     Thread.sleep(1000)
     |     s + " future!"
     |   }
     |   f.onSuccess { case s: String => println(s) }
     |   println(f.isCompleted)
     |   Thread.sleep(5000)
     |   println(f.isCompleted)
     | }
defined object FutureSample

scala> FutureSample.main(null)
false
Hello future!
true

Futureシングルトンは,関数を与えるとその関数を非同期に与えるFuture[T]を返す。
上記の実装例では,まず1000ミリ秒待機して"Hello" と "future!"を文字列結合という処理を非同期に処理している。
そして成功時の処理を定義した後,Futureの処理が終わっているか確認し,5000ミリ秒待った後,再度結果を確認している。

このように5000ミリ秒待たなくても,そのFuture自体の処理を待つこともできる。Await.ready(f, 5000 millisecond)とすることで,Futureが終わるまで最大5000ミリ秒待つという書き方になる。
ただし,以下のインポート文が必要。

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

では,これらを使ってコードを書いてみる。

scala> object FutureSample extends App {
     | 
     |   val s = "Hello"
     |   val f: Future[String] = Future {
     |     Thread.sleep(1000)
     |     println(s"[ThreadName] In Future: ${Thread.currentThread.getName}")
     |     s + " future!"
     | 
     |   }
     |   f.onSuccess { case s: String =>
     |     println(s"[ThreadName] In onSuccess: ${Thread.currentThread.getName}")
     |     println(s)
     |   }
     |   println(f.isCompleted) // false
     |   Await.ready(f, 5000 millisecond) // Hello future!
     |   println(s"[ThreadName] In App: ${Thread.currentThread.getName}")
     |   println(f.isCompleted) // true
     | }
defined object FutureSample

scala> FutureSample.main(null)
false
[ThreadName] In Future: ForkJoinPool-1-worker-5
[ThreadName] In App: run-main-0
true
[ThreadName] In onSuccess: ForkJoinPool-1-worker-5
Hello future!

上記のコードではスレッド名を各所で出力してみている。FutureとonSuccessに渡した関数については,mainスレッドとは異なるスレッドで実行されている。つまり,Futureを使うことで知らないうちにマルチスレッドのプログラミングが実行されていたということになる
また,Await.readyを使うことで,isCompleteの処理(true)の方が"Hello future!"よりも早く出力されているが,これは文字列結合の方が値参照よりコストが高いためである。

ForkJoinPoolに関しては,Javaの並行プログラミングをサポートするExecutorServiceというインタフェースを被ったクラスになる。内部的にスレッドプールを持っており,スレッドを使い回すことでスレッド生成のコストを低減している。

では続いて,FutureがOptionのように扱えることを説明する。

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> import scala.util.{Failure, Random, Success}
import scala.util.{Failure, Random, Success}

scala> object FutureOptionUsageSample extends App {
     |   val random = new Random()
     |   val waitMaxMilliSec = 3000
     | 
     |   val futureMilliSec: Future[Int] = Future {
     |     val waitMilliSec = random.nextInt(waitMaxMilliSec);
     |     if(waitMilliSec < 1000) throw new RuntimeException(s"waitMilliSec is ${waitMilliSec}" )
     |     Thread.sleep(waitMilliSec)
     |     waitMilliSec
     |   }
     |   val futureSec: Future[Double] = futureMilliSec.map(i => i.toDouble / 1000)
     |   futureSec onComplete {
     |     case Success(waitSec) => println(s"Success! ${waitSec} sec")
     |     case Failure(t) => println(s"Failure: ${t.getMessage}")
     |   }
     |   Thread.sleep(3000)
     |  }
defined object FutureOptionUsageSample

scala> FutureOptionUsageSample.main(null)
Success! 1.667 sec

scala> FutureOptionUsageSample.main(null)
Success! 1.225 sec

scala> FutureOptionUsageSample.main(null)
Failure: waitMilliSec is 952

この処理では,3000以下のランダムな値を生成し,生成した値が1000未満であれば失敗とみなして例外を投げ,そうでなければ生成した値のミリ秒だけ待ってその値を返すFutureを定義している。
この最初に得られるFutureをfutureMilliSecとしてFuture[Int]型にしているが,その後,map型を利用してIntのミリ秒をDoubleの秒に変換している(futureSec)。
なお,先ほどと異なり,onSuccessではなくonCompleteを利用して成功と失敗の両方の処理を記述している。

Futureを使って非同期に取れてくる複数の結果を利用して結果を作る

さて,FutureにはflatMapやfor式が利用できる。これらはよく複数のFutureを組み合わせて新しいFutureを作るのに用いられる。

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> import scala.language.postfixOps
import scala.language.postfixOps

scala> import scala.util.{Failure, Success, Random}
import scala.util.{Failure, Success, Random}

scala> object CompositeFutureSample extends App {
     |   val random = new Random()
     |   val waitMaxMilliSec = 3000
     |   def waitRandom(futureName: String): Int = {
     |     val waitMilliSec = random.nextInt(waitMaxMilliSec);
     |     if(waitMilliSec < 500) throw new RuntimeException(s"${futureName} waitMilliSec is ${waitMilliSec}" )
     |     Thread.sleep(waitMilliSec)
     |     waitMilliSec
     |   }
     |   val futureFirst: Future[Int] = Future { waitRandom("first") }
     |   val futureSecond: Future[Int] = Future { waitRandom("second") }
     | 
     |   val compositeFuture: Future[(Int, Int)] = for { 
     |     first: Int <- futureFirst
     |     second: Int <- futureSecond
     |   } yield (first, second)
     | 
     |   compositeFuture onComplete  {
     |     case Success((first, second)) => println(s"Success! first:${first} second:${second}")
     |     case Failure(t) => println(s"Failure: ${t.getMessage}")
     |   }
     | 
     |   Thread.sleep(5000)
     |  }
defined object CompositeFutureSample

scala> CompositeFutureSample.main(null)
Success! first:2085 second:1574

scala> CompositeFutureSample.main(null)
Failure: first waitMilliSec is 349

scala> CompositeFutureSample.main(null)
Failure: second waitMilliSec is 390

ランダムで最大3秒待つ関数を用意し,500ミリ秒未満の場合は失敗とみなしている。その関数を実行する関数をFutureとして2つ用意し,それらをfor式で畳み込んで新しいFutureを作っている。そして最終的に,新しいFutureに対して成功した場合と失敗した場合を出力する。

Promiseとは

成功あるいは失敗を表す処理または値を設定することにより,Futureに変換することのできるクラスである。

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> import scala.concurrent.{Promise, Future}
import scala.concurrent.{Promise, Future}

scala> import scala.util.{Success, Failure, Random}
import scala.util.{Success, Failure, Random}

scala> object PromiseSample extends App {
     |   val random = new Random()
     |   val promiseGetInt: Promise[Int] = Promise[Int]
     |   val futureGetInt: Future[Int] = promiseGetInt.success(random.nextInt(1000)).future
     |   futureGetInt.onComplete {
     |     case Success(i) => println(s"Success! i: ${i}")
     |     case Failure(t) => println(s"Failure! t: ${t.getMessage}")
     |   }
     | 
     |   Thread.sleep(1000)
     | }
defined object PromiseSample

scala> PromiseSample.main(null)
Success! i: 785

scala> PromiseSample.main(null)
Success! i: 269

Promiseに成功を表すsuccess(random.nextInt())を与えることによって,この処理は必ずSuccess! i: <ランダムな値> という値を返す。

この1度だけしか結果ができようされないという特性を活かして,以下のような実装が可能である。なお,複数successが定義される場合,trySuccessというPromiseのメソッドを利用する。successを利用して複数回成功した値を定義した場合,IllegalStateExceptionが投げられる。

scala> object PromiseFutureCompositionSample extends App {
     |   val promiseGetInt: Promise[Int] = Promise[Int]
     | 
     |   val firstFuture: Future[Int] = Future {
     |     Thread.sleep(100)
     |     1
     |   }
     |   firstFuture.onSuccess{ case i => promiseGetInt.trySuccess(i)}
     | 
     |   val secondFuture: Future[Int] = Future {
     |     Thread.sleep(200)
     |     2
     |   }
     |   secondFuture.onSuccess{ case i => promiseGetInt.trySuccess(i)}
     | 
     |   val futureGetInt: Future[Int] = promiseGetInt.future
     | 
     |   futureGetInt.onComplete {
     |     case Success(i) => println(s"Success! i: ${i}")
     |     case Failure(t) => println(s"Failure! t: ${t.getMessage}")
     |   }
     | 
     |   Thread.sleep(1000)
     | }
defined object PromiseFutureCompositionSample

scala> PromiseFutureCompositionSample.main(null)
Success! i: 1

scala> PromiseFutureCompositionSample.main(null)
Success! i: 1

結果は必ず1になる。100ミリ秒待って1を返すfirstFutureと,200ミリ秒待って2を返すsecondFutureが定義されているが,時系列的にfirstFutureがpromiseのfutureを完成させる役割をするため,必ず1が変える仕組みになっている。