Parallel クラス

Last Updated 2011/09/21


Parallel クラスは簡単に言うと、次の 5 つのメソッドしかありません。

Parallel.For メソッド

シーケンシャル処理

private void button1_Click(object sender, RoutedEventArgs e)
{
  int N = 10;

  for (int n = 0; n < N; ++n)
  {
    Debug.WriteLine(String.Format("{0}", n));
  }
}

並列処理

【メソッド構文】

private void button1_Click(object sender, RoutedEventArgs e)
{
  int N = 10;
  Parallel.For(0, N, Method1);
}

// For メソッドから呼び出すデリゲート
private void Method1(int n)
{
  Debug.WriteLine(String.Format("{0}", n));
}

【匿名メソッド構文】

private void button1_Click(object sender, RoutedEventArgs e)
{
  int N = 10;

  Parallel.For(0, N, delegate(int n)
  {
    Debug.WriteLine(String.Format("{0}", n));
  });
}

【ラムダ式構文】

private void button1_Click(object sender, RoutedEventArgs e)
{
  int N = 10;

  for (int n = 0; n < N; ++n)
  {
    Debug.WriteLine(String.Format("{0}", n));
  }
}

【注 意】

並列処理を実行する構文には上の例で示したとおり、3 種類があります。それぞれの構文の名前は私が適当に付けたものですが、どれが最適とは言えないと思います。ラムダ式は複雑な構文でも比較的簡単に(あくまで慣れればの話ですが)コードを書けるといえますが、ダラダラとやたらに長いラムダ式はどうかなとの思いもあります。

二重ループ

For ループの中に For ループを持つような二重ループをテストしてみました。button1 は Parallel.For の内側にも Parallel.For ループを仕込みます。button2 のほうは外側を Parallel.For ループを、内側をシーケンシャルな For ループにしてみました。テスト結果だけを示すと、button2 のほうが button1 の場合より 2 倍の速度になりました。ちなみに、外側と内側ともにシーケンシャルな For ループにすると、Button2 の 3 倍の時間になりました。

private void butto1_Click(object sender, RoutedEventArgs e)
{
  var stopWatch = Stopwatch.StartNew();

  Parallel.For(0, 10000, i =>
  {
    Parallel.For(0, 5000, j => Method(i, j));
  });

  stopWatch.Stop();
  textBox.Text = String.Format("{0}", stopWatch.ElapsedMilliseconds);
}

//---------------------------------------------------------------------
private void button2_Click(object sender, RoutedEventArgs e)
{
  var stopWatch = Stopwatch.StartNew();

  Parallel.For(0, 10000, i =>
  {
    for (int j = 0; j < 5000; ++j)
    {
      Method(i, j);
    }
  }); 

  stopWatch.Stop();
  textBox.Text = String.Format("{0}", stopWatch.ElapsedMilliseconds);
}

//---------------------------------------------------------------------
private void Method(int n1, int n2)
{
  double d;

  d = Math.Sqrt(n1 * n2);
}

Parallel.For<TLocal> メソッド

TLocal はスレッドローカル変数のデータ型です。スレッドローカルとは、スレッド固有と理解してください。まず、メソッド構文を使うこのメソッドの使い方を以下に示します。

// メンバ変数
// メソッド構文を使う場合、いろいろなメソッドからアクセス可能なようにメンバ変数として宣言する
private BlockingCollection<double> FBlock;

//---------------------------------------------------------------------
private void butto1_Click(object sender, RoutedEventArgs e)
{
  FBlock = new BlockingCollection<double>(10000000);
  double grandTotal = 0;

  var stopWatch = Stopwatch.StartNew();

  ParallelLoopResult result = Parallel.For(1, 10000000,
    LocalInit,       // スレッドローカル変数を初期化するデリゲートで、1 回だけ呼び出される
    Body,            // Body デリゲートで、ループごとに呼び出される
    LocalFinally     // Body デリゲート終了後に実行するデリゲートで、1 回だけ呼び出される
  );

  if (result.IsCompleted)
  {
    FBlock.CompleteAdding(); // 必須

    // ループごとの localTotal を合計する
    foreach (var item in FBlock.GetConsumingEnumerable())
    {
      grandTotal += item;
    }
  }

  stopWatch.Stop();
  textBox.Text = String.Format("GrandTotal={0}, Time={1}", grandTotal, stopWatch.ElapsedMilliseconds);
}

//---------------------------------------------------------------------
private double LocalInit()
{
  return 0.0; // この例では localTotal の初期値 0.0 である
}

//---------------------------------------------------------------------
private double Body(int i, ParallelLoopState state, double localTotal)
{
  return localTotal + Math.Sqrt(i);
}

//---------------------------------------------------------------------
private void LocalFinally(double localTotal)
{
  FBlock.Add(localTotal);
}

button1 と同じ処理をラムダ式で作ってみました。この方法のいいところはすべてローカル変数として扱える点です。この例では BlockingCollection 変数ですね。

private void button2_Click(object sender, RoutedEventArgs e)
{
  var block = new BlockingCollection<double>(10000000);
  double grandTotal = 0;

  var stopWatch = Stopwatch.StartNew();

  ParallelLoopResult result = Parallel.For(1, 10000000,
    () => 0.0,                                           // スレッドローカルデータの初期化
    (i, state, localTotal) => localTotal + Math.Sqrt(i), // body デリゲートで、localTotal を返す
    localTotal =>                                        // localTotal を合計する
    {
      block.Add(localTotal); // 1 〜 10000000 の値をコレクションに追加する
    }
  );

  if (result.IsCompleted)
  {
    block.CompleteAdding();

    foreach (var item in block.GetConsumingEnumerable())
    {
      grandTotal += item;
    }
  }

  stopWatch.Stop();
  textBox.Text = String.Format("GrandTotal={0}, Time={1}", grandTotal, stopWatch.ElapsedMilliseconds);
}

Parallel.ForEach<TSource> メソッド

シーケンシャル処理

private void button1_Click(object sender, RoutedEventArgs e)
{
  int N = 10;
  var intArray = Enumerable.Range(0, N);

  foreach (var n in intArray)
  {
    Debug.WriteLine(String.Format("{0}", n * 2));
  }
}

並列処理

private void button1_Click(object sender, RoutedEventArgs e)
{
  int N = 10;
  var intArray = Enumerable.Range(0, N);

  Parallel.ForEach(intArray,
    (n) => Debug.WriteLine(String.Format("{0}", n * 2))
  );
}

【注意】

ForEach メソッドも For メソッドの例と同様、3 種類の構文が可能ですが、ここでは省略します。


Parallel.ForEach<TLocal,TSource> メソッド

private int FTotal = 0;

private void button1_Click(object sender, RoutedEventArgs e)
{
  var array = Enumerable.Range(0, 10);

  Parallel.ForEach(array, LocalInit, Body, LocalFinally);

  textBox.Text = String.Format("Total= {0}", FTotal);
}

// スレッドローカル変数を初期化する
private int LocalInit()
{
  return 0;
}

// 演算の本体
private int Body(int n, ParallelLoopState state, int n1)
{
  Debug.WriteLine(String.Format("Body: n= {0}, n1= {1}, ThreadId= {2}",
      n, n1, Thread.CurrentThread.ManagedThreadId));

  int result = n * 2 + n1; // コレクション内の各要素を 2 倍にするだけ
  return result;
}

// 各スレッドの終わりに呼び出されるメソッド
private void LocalFinally(int n)
{
  FTotal += n;
  Debug.WriteLine(String.Format("LocalFinally: n= {0}, Total= {1}, ThreadId= {2}",
      n, FTotal, Thread.CurrentThread.ManagedThreadId));
}

実行結果:

Total= 90

Body: n= 0, n1= 0, ThreadId= 9
Body: n= 2, n1= 0, ThreadId= 10
Body: n= 1, n1= 0, ThreadId= 11
Body: n= 7, n1= 2, ThreadId= 11
Body: n= 6, n1= 4, ThreadId= 10
Body: n= 5, n1= 0, ThreadId= 12
Body: n= 4, n1= 0, ThreadId= 9
Body: n= 3, n1= 0, ThreadId= 8
LocalFinally: n= 6, Total= 24, ThreadId= 8
LocalFinally: n= 10, Total= 10, ThreadId= 12
Body: n= 8, n1= 16, ThreadId= 11
Body: n= 9, n1= 16, ThreadId= 10
LocalFinally: n= 34, Total= 90, ThreadId= 10
LocalFinally: n= 32, Total= 56, ThreadId= 11
LocalFinally: n= 8, Total= 18, ThreadId= 9
LocalFinally: n= 0, Total= 90, ThreadId= 11

LocalFinally は各スレッドごとに呼び出されている。n1 の値が変化するのはなぜでしょうか。


Parallel.Invoke メソッド

シーケンシャル処理

private void button1_Click(object sender, RoutedEventArgs e)
{
  Method1();
  Method2();
  Method3();
}

private void Method1()
{
  Debug.WriteLine("Method1 を起動した。");
}

private void Method2()
{
  Debug.WriteLine("Method2 を起動した。");
}

private void Method3()
{
  Debug.WriteLine("Method3 を起動した。");
}

並列処理

private void button1_Click(object sender, RoutedEventArgs e)
{
  Parallel.Invoke(Method1, Method2, Method3);
}

private void Method1()
{
  Debug.WriteLine("Method1 を起動した。");
}

private void Method2()
{
  Debug.WriteLine("Method2 を起動した。");
}

private void Method3()
{
  Debug.WriteLine("Method3 を起動した。");
}

【注意】
Method1、Method2、Method3 の順番でメソッドが呼び出されるとは限りません。また、メソッドに引数を渡すこともできないし、メソッドから戻り値を受け取ることもできません。


Parallel ループから抜け出る

ループの途中で特定の条件に達したとき、ループを抜け出したい場合があります。このような場合は、ParallelLoopState オブジェクトを指定する構文を利用します。以下は部分的なコードですが、こんな感じです。

int N = 10000;

Parallel.For(0, N, (n, loopState) =>
{
  ....

  if (条件)
  {
    loopState.Break();
    return;
  }
});

loopState の内容は .NET Framework が設定します。


Parallel ループを中止する

前項の「Parallel ループから抜け出る」とほぼ同じですが、Break メソッドのかわりに Stop メソッドを使います。

private void button1_Click(object sender, RoutedEventArgs e)
{
  var N = 1000;

  var loopResult = Parallel.For(0, N, (n, loopState) =>
  {
    if (n == 5)
    {
      loopState.Stop();
      return;
    }

    Debug.WriteLine(String.Format("n= {0} ThreadId= {1}", n, Thread.CurrentThread.ManagedThreadId));
  });

  if (!loopResult.IsCompleted && !loopResult.LowestBreakIteration.HasValue)
  {
    Debug.WriteLine("処理は Stop した");
  }
}

実行結果:

n= 0 ThreadId= 10
n= 1 ThreadId= 10
n= 2 ThreadId= 10
n= 3 ThreadId= 10
n= 250 ThreadId= 6
n= 500 ThreadId= 11
n= 501 ThreadId= 11
n= 502 ThreadId= 11
n= 750 ThreadId= 13
n= 751 ThreadId= 13
n= 4 ThreadId= 10
n= 7 ThreadId= 12
n= 251 ThreadId= 6
n= 752 ThreadId= 13
n= 503 ThreadId= 11
処理は Stop した

Parallel.ForEach メソッドにデータを渡す

Parallel クラスの ForEach メソッドには実行するデリゲートにデータを直接渡す手段はありません。しかし、手はあります。以下のコードでは、MultipleInteger クラスの public な Multiple メソッドを実行するデリゲートとして渡します。

private void butto1_Click(object sender, RoutedEventArgs e)
{
  var obj = new MultipleInteger(2);

  Parallel.For(0, 10, obj.Multiple);

  textBox.Text = obj.Result.ToString();
}

//**********************************************************************
private class MultipleInteger
{
  public int Multiplier { get; set; } // 乗数
  public int Result { get; set; }

  public MultipleInteger(int m)
  {
    Multiplier = m;
  }

  // For ループの値に乗数をかけ、その合計を Result に代入する
  public void Multiple(int n)
  {
    Debug.WriteLine(String.Format("n={0}, ThreadId={1}", n, Thread.CurrentThread.ManagedThreadId));
    this.Result += n * this.Multiplier;
  }
}

実行結果:

textBox には 0 〜 9 の各値に 2 を乗算した結果の合計の 90 が表示されます。また、出力ウインドウには以下が表示されます。各値に対して複数のスレッドを使っていることが分かります。

n=0, ThreadId=9
n=2, ThreadId=10
n=1, ThreadId=9
n=4, ThreadId=9
n=5, ThreadId=9
n=8, ThreadId=9
n=9, ThreadId=9
n=7, ThreadId=13
n=6, ThreadId=11
n=3, ThreadId=10

Parallel クラスによる並列処理のキャンセル

最近のパソコンは高速ですからたいていの場合、処理をキャンセルする間もなく終了してしまいます。しかし、時間がかかる処理の場合、それをキャンセルする手段を提供する必要があることにかわりありません。

Parallel クラスが提供する各メソッドを使って開始した処理をキャンセルする場合は引数として ParallelOptions オブジェクトを使用する構文を使用しなければなりません。ParallelOptions クラスの CancellationToken プロパティに CancellationToken 型を設定できるからです。逆に言うと、外部からキャンセルする手段はありません。

private void buttonl_Click(object sender, RoutedEventArgs e)
{
  var tokenSource = new CancellationTokenSource();

  ParallelOptions options = new ParallelOptions();
  options.CancellationToken = tokenSource.Token;

  try
  {
    ParallelLoopResult loopResult = Parallel.For(0, 100, options, (x, state) =>
    {
      if (x == 5)
        tokenSource.Cancel();
      else
        Debug.WriteLine(String.Format("{0}", x));

      Thread.Sleep(100);
    });
  }
  catch (OperationCanceledException oce)
  {
    Debug.WriteLine("OperationCanceledException");
  }
  catch (AggregateException ae)
  {
    Debug.WriteLine("AggregateException");
  }
}

実行結果:

0
25
50
....
4
3
27
77
OperationCanceledException

並列処理のキャンセルに関するより詳しいことは、「並列処理のキャンセル」を参照してください。


Parallel クラスの秘密

Parallel クラスの各メソッドをテストしているうちに、その秘密の一つが分かってきました。私のパソコンの CPU は 4 つのコアを持ちますが、For メソッドの反復する範囲を 4 分割し、4 つのコアに割り当てているようです。たとえば、インデックスの範囲が 1,000 の場合は、0 〜249、250 〜 499、500 〜 749、750 〜 999 といった具合です。つまり、Parallele クラスの内部で Partitioner オブジェクトを作成し、処理を分割しているものと想像します。

−以上−