読者です 読者をやめる 読者になる 読者になる

新人SEの学習記録

14年度入社SEの学習記録用に始めたブログです。気づけば社会人3年目に突入。

学習記録:Scala関数型デザイン 第7章

第7章:純粋関数型の並列処理(続き)

APIの代数

前節で示したように,必要な演算の型シグネチャを書き出し,型に従って実装を進めるだけでほとんどの場合上手くいく。
map2とunitをベースにしてmapを実装したときのように,具体的な定義域を無視して型を並べることに専念できる。
これは自然なスタイルの推論であり,代数方程式を簡約するときに行う推論に相当する。
APIを代数(1つ以上の集合と,これらの集合のオブジェクトを操作する一連の関数)として扱い,
この代数が定めたルールに従って記号の操作を行うだけである。

ここまでやってきた便宜的な推論でも問題はないが,APIに適用される法則を明文化しておくと役立つことがある。
頭の中で作り上げた,期待するプロパティや法則のイメージを,実際に書き出して明確にしておくと設計上の選択肢が見えてくる。

マッピングの法則

例として,妥当に思える法則を作成してみる。

map(unit(1))(_ + 1) == unit(2)

このコードは,_ + 1関数によるunit(1)へのマッピングが,ある意味unit(2)と同等であることを宣言している。
多くの場合,このような有効であると想定される恒等の具体的な例が法則の出発点になる。
どういう意味で同等なのかは興味深い問題だが,とりあえずは有効な引数ExecutorServiceに対し,
それらの結果であるFutureが同じ値であるとしたら,2つのParオブジェクトは等しいということにしておこう。

これが特定のExecutorServiceに対して有効であることをチェックするには,以下のような関数を使用する。

def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
  p(e).get == p2(e).get

関数を一般化できるのと同様に,法則も一般化できる。

map(unit(x))(f) == unit(f(x))

先の法則を一般化すると上記のようになる。
このコードは,1と関数_ + 1だけでなく,xとfのあらゆる組み合わせに対してこれが有効でならなければならないことを示す。
fを恒等関数に置き換えると,方程式の両辺が単純化されはるかに単純な新しい法則が得られる。

※恒等関数:def id[A](a: A): A = aとして定義される関数。要は引数をそのまま返す関数。

map(unit(x))(f) == unit(f(x))
// fを恒等関数idで置換
map(unit(x))(id) == unit(id(x))
// id(x)はxに等しい
map(unit(x))(id) == unit(x)
// unit(x)をyで置換
map(y)(id) == y

この新しい法則が明文化しているのは,何とmapのことだけである。
この新しい法則の内容を理解するために,mapができないことについて考えてみる。
例えば,関数を結果に適用する前に例外をスローしたり,計算を途中でやめたりすることはできない。
mapは,構造を維持すること,つまり内部の値だけを計算して並列処理の構造を変化させないことを求められる。

フォークの法則

この法則による実装上の制約はそれほどたいしたものではない。
おそらく,これらの特性をそうとは知らずに想定してきたはずである――mapやunitの実装に特殊なケースがあったり,あるいはmapに例外をランダムにスローさせるのは合理的ではない。

もう少し強い特性として,forkが並列計算の結果に影響を及ぼすべきではないことについて考えてみる。

fork(x) == x

これが実装に当てはまるのは当然のことのように思える。
つまり,fork(x)はxと同じことを行うべきである。ただし,メインスレッドとは別の論理スレッドで,非同期に行われる。
しかし,この単純な特性によってforkの実装が大きく制約されることがわかる。

法則への違反:バグ

xのすべての候補と,ExecutorServiceの任意の候補に対して,fork(x) == xが成り立つものとする。
xの候補となるものは,fork, unit, map2, およびそこから派生するその他のコンビネータを利用する何らかの式である。

実は,forkのほとんどの実装で発生する,少し厄介な問題が存在する。
有界サイズのスレッドプールに関連付けられているExecutorServiceを使用すると,簡単にデッドロックに陥ってしまうのだ。
ExecutorServiceが最大数1のスレッドプールに関連付けられているとすると,

val a = lazyUnit(42 + 1)
val S = Executors.newFixedThreadPool(1)
println(Par.equal(S)(a, fork(a)))

このコードは,ほとんどの実装のforkでデッドロックになる。
forkの実装をもう一度見てみると,

  def fork[A](a: => Par[A]): Par[A] =
    es => es.submit(new Callable[A] {
      def call = a(es).get // Callableの結果を別のCallableで待機している
    })

最初にCallableを渡し,そのCallableの中で別のCallableをExecutorServiceに渡して,その結果が得られるまでブロックしている。
a(es)がExecutorServiceにCallableを渡し,Futureを受け取ることを思い出してほしい。
スレッドプールのサイズが1の場合,外側のCallableを唯一のスレッドが拾ってしまうため,
そのスレッドが完了するのは,そのスレッドの中で別のCallableが渡されてその結果をブロックした後である。
しかし,この2つ目のCallableを実行するためのスレッドはないので,どちらも相手が完了するのを待ってデッドロックになる。

このような反例を見つけた場合,法則に違反しないように実装を修正するか,法則を改良するかのどちらかをとることになる。
この場合の法則を改良する例としては,際限なく拡張できるスレッドプールが必要なことを明記するだけでよい。
それまで隠れていた不文律や前提を明文化することでも十分な効果がある。

では,固定サイズのスレッドプールに対応できるようにforkを修正できるだろうか。

def fork[A](fa: => Par[A]): Par[A] =
  es => fa(es)

これでデッドロックが回避されるのは明白だが,faを評価するためにスレッドをフォークしていないのは問題である。
結局,fork(hugeComputation)(es)ではhugeComputationがメインスレッドで実行されてしまう。
これではforkの意味がないが,これはこれで計算が必要になるまでインスタンス化を遅らせることができるので,
コンビネータとしては便利である。これをdelayと呼ぶことにする。

def delay[A](fa: => Par[A]): Par[A] =
  es => fa(es)

固定サイズのスレッドプールで任意の計算を実行できるようにするには,Parの別の表現を選択する必要があるようだ。