読者です 読者をやめる 読者になる 読者になる

新人SEの学習記録

14年度入社SEの学習記録用に始めたブログです。気づけば社会人3年目に突入。

学習記録:Scala関数型デザイン 第7章

第7章:純粋関数型の並列処理(続き)

APIの改良

APIの設計と表現の選択の境界線はそれほど明白ではなく,2つの始点を流動的に切り替えながら,
疑問点が生じたら実験を行い,プロトタイプを作成するといった流れになる。

ここでは,ここまで開発してきたAPIの機能を実装することから始める。
Parの表現は既に選択してあるので,選択したParの表現を使って,ごく単純な方法でParを実装する。

object Par {
  // unitはUnitFutureを返す関数として表される。                                     
  // unitはExecutorServiceを使用せず,常に完了し,途中で中止できない。              
  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)

  // UnitFutureは定数値をラッピングするだけのFutureの簡単な実装である。             
  // getメソッドは渡された値を返すだけ。                                            
  private case class UnitFuture[A](get: A) extends Future[A] {
    def isDone = true
    def get(timeout: Long, units: TimeUnit) = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false
  }

  // map2はfの呼び出しを別の論理スレッドで評価しない。                              
  // 設計上の選択として,並列化を制御する唯一の関数をforkとしているため,           
  // fの評価を別スレッドで処理させたい場合はfork(map2(a,b)(f))を実行する。          
  def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] =
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      // このmap2の実装はタイムアウトを考慮に入れない。                             
      // 両方のPar値にExecutorServiceを渡し,afとbfのFutureの結果を待って,         
      // それからfを適用してUnitFutureでラッピングするだけである。                  
      UnitFuture(f(af.get, bf.get))
    }

  // forkの最も単純で自然な実装だが,問題点がいくつかある。                         
  // 1つは,外側のCollableが内側のタスクが完了するのを待つ間ブロックすること。      
  // このブロックにより何らかのリソースが消費されるため並列化の可能性を逃す。       
  // 実質的には,スレッド1つで十分なはずの場所でスレッドを2つ使用してしまっている。
  def fork[A](a: => Par[A]): Par[A] =
    es => es.submit(new Callable[A] {
      def call = a(es).get
    })

Futureのインタフェースは純粋関数型ではないので,ユーザにFutureを直接使わせるのは避けたいところである。
しかし,Futureのメソッドが副作用に依存するにも関わらず,ParのAPI全体が純粋である点は重要である。
Futureの仕組みを公開するExecutorServiceが実装に渡されるのは,ユーザがrunを呼び出した後になる。
つまり,ユーザは純粋なインタフェースに対してプログラミングを行うものの,その実装は副作用に依存している。
しかし,APIは純粋なままなので,それらは作用であって副作用ではないことになる。

Exercise 7.4
  • lazyUnitを使って,任意の関数A => Bからその結果を非同期で評価する関数asyncFを記述せよ。
  // A型の引数aをとり,Par[B]を返す関数を返り値とする。                             
  // ParBを返すには,lazyUnitを使ってf(a)を引数とする。                             
  def asyncF[A,B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

List[Int]を生成する並列計算を表すPar[List[Int]]があるとする。
これを結果がソートされるPar[List[Int]]に変換する例を考える。

Parを実行し(run),結果として得られたリストをソートし,unitを使ってParにまとめるという方法もあるが,
runを呼び出すのは避けたいところである。
Parの値を操作できるコンビネータは今のところmap2だけなので,parListをmap2の一方に渡してソートを行う。
map2のもう一方には何を渡しても構わないので,NO-OPでも渡しておく。

def sortPar(parList: Par[List[Int]]: Par[List[Int]] =
  map2(parList, unit(()))((a,_) => a.sorted)

これで,リストをソートしたい胸をPar[List[int]]に伝えることができる。
これを更に一般化し,A => B型の任意の関数をリフトし,Par[A]を受け取ってPar[B]を返す関数にすることができる。

  def map[A,B](pa: Par[A])(f: A => B): Par[B] =
    map2(pa, unit(()))((a,_) => f(a))

このmap関数を使うことで,sortParは以下のように単純になる。

  // Par[List[Int]]をソートする。                                                   
  def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)

こちらの方が単純にして明快である。
map2の引数として偽の値であるunit(())を渡すのがまずい方法だったということでは決してなく,
map2をベースとしてmapを実装することが可能で,その逆が不可能であるということは,
単にmap2の方がmapよりも強力であるというだけで,ライブラリを設計する際にはよくあることである。
プリミティブに思える関数が,それより強力なプリミティブを使って表現できることが判明するのは珍しいことではない。

このAPIを使って他に何を実装できるだろうか。
リストでのマッピングを同時に行うことは可能だろうか。
2つの並列計算を結合するmap2に対して,parMapではN個の並列計算を結合する必要がある。

def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]]

既存のコンビネータをベースとして,parMapがどこまで実装できるか確かめてみる。

def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = {
  val fbs: List[Par[B]] = ps.map(asyncF(f))
  ...
}

asyncFはA => BをA => Par[B]に変換する,並列計算をフォークさせることで結果を生成するメソッドである。
これによりN個の並列計算をフォークするのは簡単だが,それらの結果をまとめる方法が必要となる。
型を調べてみると,List[Par[B]]をparMapの戻り値として要求されるPar[List[B]]に変換する方法が必要である。

 def sequence[A](as: List[Par[A]]): Par[List[A]]

sequenceメソッドの実装の詳細については省くが,このList[Par[A]]をPar[List[A]]に変換するメソッドを使い,
parMapを以下のように実装できる。

def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
  val fbs: List[Par[B]] = ps.map(asyncF(f))
  sequence(fbs)
}

この実装がforkの呼び出しで囲まれていることに注目してほしい。
この実装では,入力リストが巨大であっても,parMapからすぐに制御が戻る。
あとからrunを呼び出したときに,runによって非同期計算が1つフォークされ,そこからN個の並列計算が開始される。
そして,それらの計算が完了するのを待って,それらの結果がリストにまとめられる。