読者です 読者をやめる 読者になる 読者になる

新人SEの学習記録

14年度入社SEの学習記録用に始めたブログです。気づけば社会人3年目に突入。

学習記録:Scala関数型デザイン 第7章

第7章:純粋関数型の並列処理(続き)

表現の選択

ここまで考えてきた結果,APIの大まかなイメージは次のようになった。

// 直ちにa値が得られる計算を作成
def unit[A](a: A): Par[A]
// 2つの並列計算の結果を2項関数で結合
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C]
// runによる並列計算の対象としてマーク
def fork[A](a: => Par[A]): Par[A]
// 式aをrunによる並列評価のためにラッピング
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
// 与えられたParを完全に評価し,forkによって要求される並列計算を生成して結果の値を取得
def run[A](a: Par[A]): A

これらの関数には,大まかな意味も割り当てられている。

  • unit: 定数値を並列計算に昇格させる。
  • map2: 2つの並列計算の結果を2項関数で結合する。
  • fork: 計算を並列評価の対象としてマークする。この評価はrunによって強制されるまで実際には発生しない。
  • lazyUnit: 評価されていない引数をParでラッピングし,並列評価の対象としてマークする。
  • run: 実際に計算を行うことで,Parから値を取得する。
Exercise 7.2
  • これらのAPIの関数を実装できるParの表現を考え出せ。

runが非同期タスクを実行する必要がある。
低レベルAPIを自分で書くことも可能だが,Javaの標準ライブラリには既にjava.util.concurrent.ExecutorServiceがある。

※ExecutorServiceについて
Threadオブジェクトをそのまま使うのではなく,処理を小さなタスクに分割して,それらをExecutorによって実行する。
タスクの定義はCallableインタフェースを実装するだけでよく,型パラメータがcallされたときの返り値になる。
参考:Java で簡単マルチスレッドプログラミング - にょきにょきブログ

public class Task implements Callable<String> {
  @Override
  public String call() throws Exception {
    Thread.sleep(1000);
    System.out.println("execute!");
    return "success";
  }
}

Executorによって処理されるタスクの状態には,created, submitted, started, completedの4つが存在する。
createdが作成されたばかりの状態で,submitまたはexecuteメソッドでタスクをサブミットするとsubmittedになる。
その後実行が開始されるとstartedに,完了するとcompletedに変化する。これらの変化は不可逆である。

public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService service = Executors.newCachedThreadPool();

  System.out.println("task start");

  // タスクの実行依頼
  service.submit(new Task());

  System.out.println("task end");

  service.shutdown();
}
task start
# タスクが別スレッドで実行され,1000ミリ秒sleepに入る
task end
execute!

Thread + Runnableの組み合わせでは実現できなかったことの一つに,タスクの戻り値の取得がある。
submitメソッドはFutureクラスのインスタンスを返し,そのgetメソッドを呼ぶことで戻り値を得ることができる。
submitメソッドが直接タスクの返り値を返さないのは,タスクが別スレッドで実行されるので,
一度別のオブジェクトを介さなければ同期的に返り値が取得できないためである。
なお,Futureオブジェクト取得後にgetメソッドを呼ぶと,タスクの処理が終わるまでメインスレッドをブロックする。

public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService service = Executors.newCachedThreadPool();

  System.out.println("task start");

  // タスクの実行依頼
  Future<String> future = service.submit(new Task());
  System.out.println(future.get());

  System.out.println("task end");

  service.shutdown();
}
task start
# タスクが別スレッドで実行され,1000ミリ秒sleepに入る
# future.getメソッドを呼んだので,タスクの終了までメインスレッドがブロックされる
execute!
# タスクの戻り値を取得して出力
success
task end

また,ExecutorServiceを使うと,スレッドプールも簡単に作成することができる。

public static void main(String[] args) throws InterruptedException, ExecutionException {
  // 3つのスレッドを作成
  ExecutorService service = Executors.newFixedThreadPool(3);

  System.out.println("task start");

  // タスクを4つ実行する
  service.submit(new Task());
  service.submit(new Task());
  service.submit(new Task());
  Future<String> future = service.submit(new Task());
  // 4つ目のタスクの実行結果を取得
  System.out.println(future.get());

  System.out.println("task end");

  service.shutdown();
}
task start
# 3つのタスクが別スレッドで実行され,それぞれ1000ミリ秒sleepする
# 4つ目のタスクはsubmitted状態でキューに入る
# メインスレッドはfuture.getでブロックされる
execute!
execute!
execute!
# スレッドに空きが出たので4つ目のタスクがstartedになり,1000ミリ秒sleepする
execute!
# 4つ目のタスクが実行終了したので返り値を取得して出力
success
task end

さて,ExecutorServiceの表現をScalaに直すと以下のようになる。

class ExecutorService {
  def submit[A](a: Callable[A]): Future[A]
}

trait Callable[A] { def call: A } // 実質的にはAの遅延
trait Future[A] {
  def get: A
  def get(timeout: Long, unit: TimeUnit): A
  def cancel(evenIfRunning: Boolean): Boolean
  def isDone: Boolean
  def isCancelled: Boolean
}

ScalaでもExecutorServiceを使ってCallable値を渡し,対応するFutureを受け取ることができる。
Futureから値を取り出すにはgetメソッドを使用し,値が取得可能になるまでカレントスレッドをブロックする。
また,ブロック状態のまま一定時間が過ぎると例外をスローするなど,処理を取り消すための追加の機能がある。

run関数がExecutorServiceにアクセスできると過程した場合,それによりParの表現が決定されるだろうか。

def run[A](s: ExecutorService)(a: Par[A]): A

Par[A]の最も単純なモデルはExecutorService => A,ExecutorServiceを引数にA型の値を返すものになる。
しかし,計算をどれくらい待つか,あるいは中止させるかについてはrunの呼び出し元で決定させたい。
したがって,Par[A]はExecutorService => Future[A],runはFutureを返すだけにした方が良い。

type Par[A] = ExecutorService => Future[A]

def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)