新人SEの学習記録

14年度入社SEの学習記録用に始めたブログです。もう新人じゃないかも…

学習記録:Scala関数型デザイン 第7章

第7章:純粋関数型の並列処理

CPUごとに複数のコアが搭載され,CPU自体が複数搭載されることも珍しくない現代のコンピュータでは,
この並列処理能力を活用できるようにプログラムを設計することがこれまで以上に重要になっている。
しかし,同時実行されるプログラムの相互作用は複雑で,実行スレッド間でのやり取りを可能にする従来のメカニズム
(シェアードミュータブルメモリ)は推論が難しいことで有名である。
このため,油断すると競合状態やデッドロックを引き起こし,テストやスケールしにくいプログラムになってしまう。

本章では,並列処理と非同期処理を可能にする純粋関数型のライブラリを作成する。
並列プログラムを純粋関数だけで表現することで,複雑さに歯止めをかけ推論を単純化する。

また,並列処理のライブラリを作成する方法だけでなく,
純粋関数型のライブラリの設計に取り組む方法も学んでもらう。
本章の主なテーマは,合成可能性とモジュール性に優れたライブラリを作成することである。
例えば,本章の最後のほうではparMapというコンビネータを作成する。

 val outputList = parMap(inputList)(f)

このコンビネータは,コレクション内のすべての要素に関数fを簡単かつ同時に適用できる。
本章では,ここに到達するための作業を反復方式で進めていく。

データ型と関数の選択

関数型ライブラリの設計にとりかかる際には,通常は何をできるようにしたいかについての大まかなアイデアがある。
設計を進めるにあたって難しいのは,そうしたアイデアを練り直し,必要な機能を可能にするデータ型を見つけることである。
この場合は並列計算を可能にすることが目的だが,厳密にはどういうことだろうか。
並列化が可能な単純な計算として,整数リストの合計を調べながら,これを実現可能なものに改良していく。

def sum(ints: Seq[Int]): Int =
  ints.foldLeft(0)((a, b) => a + b)

Seqは標準ライブラリのリストや他のシーケンスのスーパクラスで,foldLeftメソッドが定義されている。
上記のような逐次的な畳込みの代わりに,分割統治アルゴリズムを使うこともできる。

// IndexedSeqはVectorなどのランダムアクセスシーケンスのスーパクラス。
// リストと異なり,特定のインデックスでシーケンスを2つに分割するための効率のよいsplitAtが用意されている。
def sum(ints: IndexedSeq[Int]): Int =
  if (ints.size <= 1)
    ints.headOption getOrElse 0  // 先頭要素を取り出す。Noneなら0。
  else {
    val (i,r) = ints.splitAt(ints.length/2)  // splitAt関数を使ってシーケンスを半分に分割
    sum(l) + sum(r) // 両半分を再帰的に合計し,結果を加算
  }

上記では,splitAt関数を使ってシーケンスを半分に分割し,それぞれを再帰的に合計した結果を合計をしている。
foldLeftベースの実装とはことなり,この実装では並列化が可能=両半分の合計を並行して求めることができる。
どのようなデータ型と関数を使用すればこの計算を並列化できるかを考えるときには,
この並列化が最終的にどのように実装されるのかに着目し,APIの実装を無理やり始めるのではなく,
サンプルによって明らかになる理想のAPIを設計し,そこから実装に向かって逆方法に作業を進めていく。

並列計算のためのデータ型

先ほどのsum関数の行sum(l) + sum(r)では,両半分のシーケンスでsumを再帰的に呼び出している。
この1行を見ただけで,並列計算を表すための選択肢となるデータ型が結果を保持できなければいけないことがわかる。
その結果は何らかの有意義な型(この場合はInt)の値となり,この結果を取得する方法も必要である。
この新たに発見された情報を設計に適用し,まずは結果を格納するためのコンテナ型Par[A]を作成する。

  • def unit[A](a: => A): Par[A]

評価されていないAを受け取り,それを別のスレッドで評価するための計算を返す。
unitという名前は,1つの値をラッピングするだけという1単位の並列化を実現することを意味する。

  • def get[A](a: Par[A]): A

並列計算から結果の値を取り出す。

さしあたり,他に必要な関数やParの内部表現については心配せず,簡単な例を詳しく調べていく。
まずはParデータ型を使ってsumを更新する。

def sum(ints: IndexedSeq[Int]): Int =
  if (ints.size <= 1)
    ints.headOption getOrElse 0
  else {
    val (i,r) = ints.splitAt(ints.length/2)
    val sumL: Par[Int] = Par.unit(sum(l))  // 左半分を並行して計算
    val sumR: Par[Int] = Par.unit(sum(r))  // 右半分を並行して計算    
    Par.get(sumL) + Par.get(sumR) // 両方の結果を取得して合計
  }

上記の例では,sumの2つの再帰呼び出しをunitの呼び出しで囲み,結果の取得にgetを使っている。
unitとgetの意味については選択の余地がある。
unitについては,その引数の評価を別の(論理)スレッドで直ちに開始することが可能である。
あるいは,getが呼び出されるまで引数をとっておき,それから評価を開始することもできる。
ただし並列性を少しでも獲得したいのであれば,unitがその引数の評価を同時に開始してすぐに制御を戻す必要がある。

Scalaでは,関数の引数は必ず左から右の順序で評価される。
このため,getが呼び出されるまでunitの実行を遅らせる場合は,
1つ目の並列計算が終わってから2つ目の並列計算が実行されるという,実質的に逐次実行になってしまう。

しかし,unitがその引数の評価を同時に開始した場合,getの呼び出しによって参照透過性が失われてしまう。
また,その評価が完了するのをgetが待機することになり,sumLとsumRの左辺と右辺が同時実行されなくなってしまう。

以上のことからunitに副作用があることは明らかだが,それはget限定の副作用である。
つまり,unitはPar[Int]を返すだけで非同期計算になるが,Parをgetに渡した途端にそれを明示的に待機することになり,
副作用が露呈してしまう。
従って,getの呼び出しを避けるか,少なくとも最後の最後まで呼び出しを遅らせるのが良さそうである。
非同期計算が終了するのを待たずに,それらを結合できるようにしたいところである。