新人SEの学習記録

14年度入社SEの学習記録用に始めたブログです。もう新人じゃないかも…

学習記録:Scala関数型デザイン 第7章

第7章:純粋関数型の並列処理(続き)

データ型と関数の選択(続き)

並列計算の結合

unitとgetの組み合わせには落とし穴があることがわかったが,次にこれをどうすれば回避できるか考えてみる。
getを呼び出さないということは,sum関数からPar[Int]を返さなければいけないということになる。

def sum(ints: IndexedSeq[Int]): Par[Int] =
  if (ints.size <= 1)
    Par.unit(ints.headOption getOrElse 0)
  else {
    val (l, r) = ints.splitAt(ints.length/2)
    Par.map2(sum(l), sum(r))(_ + _)
  }
Exercise 7.1
  • 上記のsum関数に書かれたmap2関数のシグネチャはどのようなものになるか。

sum(l)とsum(r)に対して関数fを適用するので,以下のようになる。

def map2(a: Par[A], b: Par[B])(f: (A, B) => C): Par[C]

単純なテストケースとして,map2が両方の引数に対して正格である場合について考えてみる。

sum(indexedSeq(1,2,3,4))

map2(
  sum(Indexed(Seq(1,2)),
  sum(Indexed(Seq(3,4)))(_ + _)

map2(
  map2(
    sum(IndexedSeq(1)),
    sum(IndexedSeq(2)))(_ + _),
  sum(IndexedSeq(3,4)))(_ + _)

map2(
  map2(
    unit(1),
    unit(2))(_ + _),
  sum(IndexedSeq(3,4)))(_ + _)

map2(
  map2(
    unit(1),
    unit(2))(_ + _),
  map2(
    sum(IndexedSeq(3)),
    sum(IndexedSeq(4)))(_ + _))(_ + _)

...

map2は正格であり,Scalaでは引数が左から右へと評価されるため,
map2(sum(x), sum(y))(_ + _)が検出されたらsum(x)などを再帰的に評価する必要がある。
このため,総和ツリーの右半分の(正格な)生成に進む前に,
左半分全愛を(正格に)生成するという殘念な結果になってしまう。
上記のテストケースでは,sum(IndexedSeq(1,2))がsum(IndexedSeq(3,4))を検討する前に完全に展開されている。
map2が引数を並行して評価する場合,計算の右半分がまだ生成もされていない段階で左半分の計算が実行されることになる。

map2の正格性を保ちつつ,実行をすぐに開始させないようにしたらどうか。
Par値は平行処理の対象となるものの記述を生成するだけで,getのような関数で評価するまで何も起こらないようにする。
ここで問題となるのは,正格な記述を生成するとかなり大きなオブジェクトになってしまうことである。

map2(
  map2(
    unit(1),
    unit(2))(_ + _),
  map2(
    unit(3),
    unit(4))(_ + _))(_ + _)

この記述の格納にどのようなデータ構造を使ったとしても,元のリストより多くのスペースを占めてしまう。
map2を先送りし,両半分の計算をすぐに同時に開始した方が良さそうである。

明示的なフォーク

map2の2つの引数を常に同時に評価したいかというと,おそらくそうではないはずである。
以下の例について考えてみる。

Par.map2(Par.unit(1), Par.unit(1))(_ + _)

この場合,結合の対象となる2つの計算が非常に素早く実行されるため,
それらを評価するために別の論理スレッドを生成しても余り意味がないことがなんとなくわかる。
しかし,このAPIにはこの種の情報を提供する手段がなく,計算をメインスレッドからフォークさせるタイミングが曖昧である。
このため,フォークさせるタイミングをプログラマがうまく計れない。

フォークを明示的にするため,def fork[A](a: => Par[A]): Par[A]という別の関数を作成してみる。
そして,特定のParを別の論理スレッドで実行すべきであるという意味で,この関数を利用する。

def sum(ints: IndexedSeq[Int]): Par[Int] =
  if (ints.length <= 1)
    Par.unit(ints.headOption getOrElse 0)
  else {
    val (l, r) = ints.splitAt(ints.length/2)

    Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)
  }

forkを利用すれば,map2を正格にし,引数のラッピングをプログラマに委ねることが可能になる。
ここでは,2つの問題に対処する。
まず,2つの並列タスクの結果を結合すべきであることを指定する方法が必要である。
また,特定のタスクを非同期で実行すべきかどうかの選択がある。

unitを正格にするか遅延にするのかの問題に戻る。
forkを利用すれば,表現力を失うことなくunitを正格にすることが可能である。
非正格バージョンであるlazyUnitは,unitとforkを使って実装できる。

def unit[A](a: A): Par[A]
def lazyUnit[A](a: => A): Par[A] = for(unit(a))

forkを使ってその引数が別の論理スレッドで評価されるようにしたいわけだが,問題がまだ残っている。
呼び出された時点ですぐに評価するべきか,それとも後で強制的に評価されたときに評価するようにすべきなのか。
要するに,評価をforkとgetのどちらに任せるべきか,ということになる。

forkが引数の評価を直ちに開始するとしたら,スレッドを作成する方法やタスクを何らかのスレッドプールに渡す方法について,
実装が直接または間接的に知っていなければならないことは明白である。
さらに,スレッドプールなどの並列化を実装するためのリソースにアクセスできなければならず,
forkを呼び出したい場所でそうしたリソースが正しく初期化されていなければいけない。
これは,プログラムの様々な部分で使用される並列化戦略を制御する能力が失われることを意味する。

また,並列タスクの実行にグローバルリソースを利用することは本質的に悪いことではないが,
どの実装がどこで使用されるのかをより細かく制御できるに越したことはない。
どうやら,スレッドの作成と実行タスクの受け渡しはgetに任せた方が良さそうである。

これに対し,forkがその引数を抱えたまま評価を先送りするとしたら,
並列化を実装するためのメカニズムにアクセスできる必要はない。
未評価のParに並列評価のマークを付けておけばよいだけである。
このモデルでは,Par自体が並列処理の実装方法を知っている必要はなく,
あとからget関数などによって解釈される並列計算の記述である。

ここまでは,利用可能になった時点で取得すればよい値の入れ物としてみなされていたParだが,
今や実行可能なファーストクラスのプログラムに昇格したことになる。
ここで,get関数の名前をrunに変更し,ここが並列化を実装する場所であることを指定する。

def run[A](a: Par[A]): A

Parが純粋なデータ構造になったところで,新しいスレッドを生成するのか,他アスクをスレッドプールにデリゲートするのか,
あるいはその他のメカニズムを利用するのかはともかく,並列化を実装するための手段がrunには必要である。