PDF

2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
分散プログラミング言語 X10 上の
耐故障性を備えたアプリケーション
河内谷 清久仁1,a)
概要:X10 は言語仕様として分散処理を記述できるプログラミング言語である.X10 アプリケーションは
複数の「プレース」(計算ノード)を用いて実行可能で,「at」文によりプレースの移動を行うことで分
散処理を行うことができる.しかし従来の X10 では,プレースを構成する計算ノードが一つでも故障す
ると,X10 アプリケーションの処理全体が終了してしまっていた.この問題を回避するため我々は,計算
ノードが故障した場合にアプリケーションにそれを例外(DeadPlaceException)として通知し,残りの
ノードで実行を継続することが可能な「Resilient X10」という言語拡張を行っている.本発表ではこの
Resilient X10 の機能を用いて耐故障性のある分散アプリケーションをどのように構成するかについて述べ
る.故障が起きた場合のアプリケーションの対応として,
(a)無視して正常ノードの結果だけを使う,
(b)
残りのノードに作業を再分配する,(c)定期スナップショットから実行を再開する,の 3 手法を紹介す
る.また,ライブラリとして,X10 がもともと備えている分散配列(DistArray)への耐故障性とスナッ
プショット機構の追加についても述べる.耐故障性を追加したアプリケーションやライブラリの,ベース
となるコードからの変更は非常にわずかである.またどれも,従来の X10 上でも(ノードが故障しない限
りは)実行可能である.耐故障性追加の実行性能への影響についても評価を示す.
キーワード:Resilient X10, 耐故障性, 分散プログラミング
Fault-Tolerant Applications on a
Distributed Programming Language X10
Kiyokuni Kawachiya1,a)
Abstract: X10 is a programming language which supports distributed computing in itself. X10 applications
can run over multiple “places” (computation nodes), and perform distributed computing by changing the
execution place using “at” statements. However, in a conventional X10 environment, when a node which
consists of the place(s) fails, the entire processing of X10 application is aborted. To solve this problem, we
have been extending X10 as a “Resilient X10” where the node failure is reported as a DeadPlaceException
and the execution can continue using remaining nodes. In this presentation, we show how to construct faulttolerant distributed applications over the Resilient X10 functions. Three typical methods are introduced to
handle the node failure — (a) just ignore it and use results from remaining nodes, (b) re-assign the work
to remaining nodes, or (c) restart the computation from a periodic snapshot. We also show a fault-tolerant
extension of an existing distributed X10 library DistArray. These modifications to add fault tolerance were
very small, and the modified codes can still run on normal X10 as far as node failure does not happen.
Impacts to execution performance by the modifications are also shown.
Keywords: Resilient X10, fault tolerance, distributed programming
1
a)
日本アイ・ビー・エム(株) 東京基礎研究所
IBM Research - Tokyo
[email protected]
1
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1. はじめに
スレッド 通信 データ 参照
アドレス空間 (プロセス)
(オブジェクト)
リモート参照
プレース0 プレース1 プレース2
X10 [21] は言語単体で並列・分散処理を記述可能なプロ
グラミング言語で,複数の計算ノードを用いて分散処理を
行うことができる.しかし従来の X10 では,計算ノード
が一つでも故障すると X10 アプリケーションの処理全体
が終了してしまっていた.この問題を回避するため我々
は,計算ノードの故障をアプリケーションに例外として通
知し,残りのノードで実行を継続することが可能な「Re-
Message Passing
アドレス空間
アクティビティ
(≒スレッド)
生成
移動
silient X10」という言語拡張を行っている [3].本稿ではこ
オブジェクト
の Resilient X10 の機能を用いて,耐故障性のある分散ア
ローカル参照
プリケーションをどのように構成するかについて述べる.
PGAS
(MPIなど)
リモート参照
分散環境上でのアプリケーション実行モデルとして X10
プレース0
asyn
c
Shared Memory
(UPC, CAF, X10など)
プレース1
at
async
(OpenMPなど)
・・・
nc
asy
プレースMAX_PLACES-1
at
at
分散配列
分散配列
不変(immutable)データ,クラス,構造体, 関数
図 1 X10 の実行モデル
Fig. 1 Execution model of X10.
は,
「PGAS(Partitioned Global Address Space)[13]」を
採用している.PGAS 実行モデルでは複数の計算ノードに
またがるグローバルなアドレス空間が利用可能である.た
である.本稿の貢献点としては,以下があげられる.
だし,その名のとおりアドレス空間はノードごとに分割さ
• 分散アプリケーションに耐故障性を追加するための複
れており,データがどの計算ノードに存在するかを意識
数のアプローチを示し,いずれもが非常に少ない変更
したプログラミングが必要になる.X10 ではこの分割され
た各アドレス空間を「プレース」と呼ぶ.データはどのプ
レースからでも「参照」可能であるが,どれかのプレース
に属しており,その中身にアクセスするには「at」文でそ
のプレースに移動する必要がある.
で Resilient X10 上に実装できることを示した.
• 耐故障性を追加したアプリケーションを実際に動か
し,実マシン上でさまざまな評価を行った.
2. X10 の実行モデルと,耐故障性の追加
計算ノードの故障は,X10 レベルでは特定プレースの消
並列・分散環境におけるプログラミングでは一般に,そ
失として現れる.つまり,そのプレース上のデータにアク
の並列性や分散メモリ構成をプログラマにどのように見せ
セスできなくなるということである.しかし,PGAS 実行
るかの選択が重要になる.図 1 の上の 3 つの図は,そのバ
モデルはアドレス空間が明示的に分割されているため,故
リエーションを示している.並列性を隠蔽し処理系が暗黙
障したノード(プレース)を「切り離し」て全体の処理を
的に並列化を行う OpenMP [12] のようなモデル(右)で
継続することが比較的容易である.たとえば at 文で移動
は,プログラミングが容易になる反面,ハードウェアを生
中のプレースが消失した場合,呼び出し側のプレースが生
かしきった高性能な処理が難しくなる危険性がある.一
きていれば,そこに何らかの失敗通知を行うことで,故障
方,メモリ共有の概念がなくデータの送受信まで明示的に
ノードを切り離して処理を継続できる.Resilient X10 は,
記述する MPI [9] のようなモデル(左)は,高性能だがハー
このような状況に対し「DeadPlaceException」という例
ドウェア構成を意識したプログラミングが必要になってし
外を通知する.
まう.
しかしもちろん,プレース消失を通知するだけではアプ
X10 の採用する PGAS 実行モデルはこの 2 つの中間形
リケーションを正常に続行することはできない.耐故障性
態で,グローバルなアドレス空間が複数の「プレース」に
を実現するためには,通知を元にアプリケーション(もし
分割されている.プレースはメモリの局所性を抽象化した
くはライブラリ)内でなんらかの対応を行う必要がある.
もので,典型的には一つの計算ノードに対応すると考えれ
本稿では,故障が起きた場合のアプリケーションの対応と
ばよい.データはどれかのプレースに所属し,それをまた
して,
(a)無視して正常ノードの結果だけを使う,
(b)残
いで移動することはない.データは他のプレースからも参
りのノードに作業を再分配する,
(c)定期スナップショッ
照することができるが,中身にアクセスできるのは同じプ
トから実行を再開する,の 3 手法を紹介し,Resilient X10
レースからのみである.
上での記述例を示す.また,ライブラリとして,X10 がも
X10 は PGAS を採用し,並列・分散環境を隠蔽するので
ともと備えている分散配列(DistArray)への耐故障性と
はなく「抽象化して見せる」ことで,生産性を保ちつつ高
スナップショット機構の追加についても述べる.耐故障性
性能を達成することを目指している.なお,PGAS 自体は
を追加したアプリケーションやライブラリの,ベースとな
X10 特有の概念ではなく,UPC(Unified Parallel C)[17]
るコードからの変更は非常にわずかである.またどれも,
や CAF(Co-Array Fortran)[19] などの言語でも採用さ
従来の X10 上でも(ノードが故障しない限りは)実行可能
れている.ただし,X10 では非同期実行のサポートなど
2
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
を強化しているため,区別して APGAS(Asynchronous
PGAS)モデルと呼ぶこともある.
1
2
3
4
2.1 APGAS 実行モデル
X10 の実行モデルを詳細に示したものが図 1 下である.
グローバルなアドレス空間を分割した各プレースの中に
は,それぞれ複数の「アクティビティ」と「オブジェクト」
が存在できる.
5
6
7
8
9
10
11
12
アクティビティは,プレース内で逐次動作する非同期
な実行主体で,軽量なスレッドだと考えればよい.async
13
14
15
class HelloWorld {
public static def main(args:Rail[String]) {
finish for (pl in Place.places()) {
at (pl) async { // 各プレースで並列・分散に処理
Console.OUT.println("Hello from " + here);
}
} // finish 文の終わり,全プレースの処理が終わるまで待つ
}
}
$ x10c++ HelloWorld.x10 -o HelloWorld # コンパイル
$ X10_NPLACES=4 runx10 HelloWorld
# 実行
Hello from Place(3)
Hello from Place(0)
Hello from Place(2)
Hello from Place(1)
文により同じプレース内に動的に生成したり,at 文によ
り他のプレースに移動することができる.finish 文によ
図 2 X10 による並列・分散 Hello World
り,非同期に実行されるアクティビティの終了を待ち合
Fig. 2 Parallel distributed Hello World in X10.
わせることができる.これは,そのブロック内部で生成さ
れたアクティビティ(孫も含む)がすべて終了するまで待
終了してしまっていた.しかし,PGAS 実行モデルではア
つ構文で,内部で起きた例外を伝播させることができる
ドレス空間がプログラムからも見える形で明示的に分割さ
(複数の例外が起きている可能性があるので,その際には
れているため,プレース消失の影響を局所化することが比
MultipleExceptions に包んで通知される).このように,
較的容易である.たとえば図 1 の下の図でプレース 1 が消
非同期に実行される処理の例外をも受け取れるのは X10 の
失した場合,そこに存在する 3 つのアクティビティの処理
「Rooted Exception Model」の特長である.
オブジェクトは特定のプレースに所属する変更可能な
と 2 つのオブジェクトの情報,2 つの分散配列の一部分が
失われるが,残りの部分には影響がない(グローバル参照
データ構造で,中身にアクセスできるのは同じプレースの
はアクセス不能になるが参照元が失われるわけではない)
.
アクティビティに限定される.ただし,GlobalRef という
このことを利用し我々は,ノードが故障した場合でも,残
仕組みにより,他のプレースからオブジェクトを「グローバ
りのノード(プレース)で処理を継続できるような耐故
ル参照(リモート参照)
」することができる [6][14].グロー
障性拡張を行ってきている.これが「Resilient X10」であ
バル参照には,参照しているオブジェクトがどのプレース
る [3].
に存在するかの情報が入っており,アクティビティがその
Resilient X10 では,プレースの消失に対して DeadPlace-
プレースに移動することでオブジェクトの中身にアクセ
Exception(以下,状況によっては DPE と省略)という専
ス可能となる.たとえば,
「data = at (gref) gref();」
用の例外が通知される.具体的には,消失したプレースに
という文は,グローバル参照 gref が存在するプレースに
アクティビティが移動中だった(または移動しようとし
移動しその内容を読み出すという処理を行っている.X10
た)場合,対応する at 文が DPE を通知する.async 文で
ではさらに,複数のプレースにまたがった特殊なデータ構
非同期にアクティビティが生成されていた場合は,それを
造として「分散配列(DistArray)」を作ることができる.
支配する finish 文が DPE を受け取り,その他の例外とま
分散配列の各要素はどれかのプレースに所属しており,そ
とめて MultipleExceptions に包んで通知する.消失し
のプレース内のアクティビティによって操作される.
たプレースの番号は DPE の place フィールドで確認する
図 2 は,X10 で書いた Hello World プログラムと,4 プ
ことができる.ほかに,特定のプレースが生きているかど
レースを使った実行例である.4 行目の at 文と async 文
うか調べる Place.isDead,消失したプレースの数を返す
により,各プレースに新しいアクティビティが生成され,
Place.numDead などのメソッドも用意されている.
「Hello from Place(n)」を出力する処理が並列・分散に
これらの追加機能を利用することで,ノード故障(プレー
行われる.これらのアクティビティがすべて終了すると
ス消失)時にも残りのノードで処理を継続する「耐故障性
finish 文から抜け,プログラムが終了する.
を備えたアプリケーション」を記述することが可能になる.
最も単純な例が図 3 である.このプログラムは,各プレー
2.2 Resilient X10
複数の計算ノードを用いた分散処理を X10 で行うには,
プレースを各ノード(上のプロセス)で立ち上げることに
スで並列・分散に処理 do something を行うが,プレース
が消失した場合の DPE 通知に対して「Place(n) died」と
単純に表示している(6∼7 行目).
なる.計算ノードが故障した場合,そのノードが担当して
さて,ノード故障によりアクティビティが消失した場合
いたプレースが消失し,その上のアクティビティとデータ
でも finish 文が(それを永遠に待つことなく)適切に完了
が失われてしまう.そのため,従来の X10 では処理全体が
するには,各 finish 文が支配するアクティビティの実行
3
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1
2
3
4
5
6
7
8
9
10
11
class ResilientExample {
public static def main(Rail[String]) {
finish for (pl in Place.places()) async {
try {
at (pl) do_something(); // 各プレースで並列・分散に処理
} catch (e:DeadPlaceException) {
Console.OUT.println(e.place + " died"); // 故障を表示
}
} // finish 文の終わり,全プレースの処理が終わるまで待つ
}
}
1
2
3
4
5
6
7
8
9
10
11
12
図3
故障ノードを表示するだけの対処例
Fig. 3 A simple program which just reports node failure.
13
14
15
16
状況(どのプレースでいくつ動いているか)と,finish 文
17
どうしの親子関係を示す情報がノード消失によって失われ
19
ないようにしなければならない.Resilient X10 ではこれら
のクリティカル情報を信頼性のあるストレージ(Resilient
Storage)に保持するようにしている.Resilient Storage の
実装はいくつかのバリエーションを開発中であるが,現在
18
20
21
22
23
24
25
26
最も安定して動作しているのは main の実行が開始される
「プレース 0」をそれに使うものである
*1 .
Resilient X10 の機能は 2013 年 12 月にリリースされた
27
28
29
30
import x10.util.*;
class ResilientMontePi {
static val ITERS = 1000000000/Place.MAX_PLACES; // 計 10 億回
public static def main (args:Rail[String]) {
val result = GlobalRef(new Cell(Pair[Long,Long](0,0)));
// (円の内側だった数, 試行回数) のペアを保持する領域
finish for (p in Place.places()) async {
try {
at (p) {
val rnd = new x10.util.Random(System.nanoTime());
var c:Long = 0;
for (iter in 1..ITERS) { // 各プレースで ITERS 回試行
val x = rnd.nextDouble(), y = rnd.nextDouble();
if (x*x + y*y <= 1.0) c++; // 円の内側だった場合
}
val count = c;
at (result) atomic { // 試行結果を足し込む
val r = result();
r() = Pair(r().first+count, r().second+ITERS);
}
}
} catch (e:DeadPlaceException) {
// 消失したプレースの結果は,単純に無視する
}
} // finish 文の終わり,全プレースの処理が終わるまで待つ
val pair = result()();
val pi = 4.0 * pair.first / pair.second;
Console.OUT.println("pi="+pi + " (try="+pair.second+")");
}
}
X10 2.4.1 から Technology Preview として含まれており,
図4
環境変数「X10 RESILIENT MODE=1」を指定することで有
効にすることができる.X10 の実装には,C++にコンパ
イルされネイティブ実行される「Native X10」と,JavaTM
にコンパイルされ複数の Java 仮想マシン上で実行される
ションの対応として,
(a)無視して正常ノードの結果だけを使う
「Managed X10 [6][16]」があるが,Resilient X10 はそのど
ちらでも使用可能である.ただし,使用できる通信レイヤ
はソケットに限られ,MPI [9] や PAMI [7] は未サポートで
モンテカルロ法による π の計算
Fig. 4 Computation of π with the Monte Carlo method.
(b)残りのノードに作業を再分配する
の 2 つを紹介する(第 3 の手法は 4 節で示す).
ある.
なお,Resilient X10 の実装の詳細については [3],プレー
ス消失時の動作セマンティクスは [2] を参照されたい.
3. 耐故障性のある X10 アプリケーション
Resilient X10 が言語処理系レベルで提供する耐故障性
のためのインタフェースは現時点では非常に限られたも
ので,プレース消失に対する DeadPlaceException の通知
と,Place.isDead や Place.numDead のようなサポートメ
ソッドのみである.しかし,これらの仕組みを活用するこ
とで,既存の分散 X10 アプリケーションに耐故障性を追加
することができる.
ただしそのためには,アプリケーションがどのように分
散処理を行っているかやノード故障時にどのように処理
の継続が可能かについての理解が必要となる.またそれ
によって,どのように耐故障性をサポートするかの手法
も違ってくる.本節では,故障が起きた場合のアプリケー
*1
この実装では,プレース 0 が消失した場合だけは X10 自体の実
行が終了してしまうという問題が残っている.なお,本稿で示す
耐故障性アプリケーションはどれも,プレース 0 は消失しないと
いう前提で作成されている.
3.1 MontePi
図 4 は,モンテカルロ法を用いて π(の近似値)を求め
る分散アプリケーションに耐故障性を追加した例である.
計算の中心部分は 12∼15 行目の for ループで,乱数生成
した座標(x, y)が半径 1.0 の円内かどうかを繰り返し判
定している.ITERS 回の試行が終了すると,プレース 0 の
result というデータペアに試行結果が足し込まれる(17∼
20 行目).この一連の処理が各プレースで並列・分散に行
われ,すべて終了するとプレース 0 で result の中身を取
り出して集計が行われる(26∼27 行目).
計算中にノードが故障した場合は,そのノードで実行さ
れているプレースが消失し,9 行目の at 文が DeadPlace-
Exception で終了する.この例外は 22 行目の catch 文で
捕捉され,単純に無視される.消失したプレースでは 17
行目の at 文が実行されず,試行結果が result に反映され
ない *2 .そのため,正常ノードの試行結果だけを集計する
ことができる.モンテカルロ法の試行回数が減り精度が下
*2
17 行目の at 文に到達後にプレースが消失した場合は集計に加え
られる.この at 文の中身はプレース 0 で実行されるので,元の
プレースが消失していても足し込む処理は正常に行われる.
4
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ResilientKMeans {
static val POINTS = 10000000; // 点の数
:
public static def main(args:Rail[String]) {
/* 点の集合を用意し,各プレースに配る */
for (iter in 1..ITERATIONS) { // 結果が収束するまで繰り返す
/* 現在のクラスタ情報を各プレースに配る */
// 点を生きている各プレースで分担して計算
val numAvail = Place.MAX_PLACES - Place.numDead();
val div = POINTS / numAvail; // 各プレースが担当する点の数
val rem = POINTS % numAvail; // プレース 0 の追加担当分
var start:Long = 0; // 次に処理すべき点の番号
try {
finish for (pl in Place.places()) {
if (pl.isDead()) continue; // 消失プレースはスキップ
var end:Long = start+div; if (pl==place0) end+=rem;
at (pl) async { // 生きている各プレースで並列に処理
/* [start,end) の番号の点について処理を行い,
集計に必要な結果だけをプレース 0 に戻す */
}
start = end;
} // finish 文の終わり,全プレースの処理が終わるまで待つ
} catch (es:MultipleExceptions) {
for (e in es.exceptions()) { // プレース消失の通知は無視
if (!(e instanceof DeadPlaceException)) throw e; }
}
/* 新しいクラスタ値を計算し,収束した場合はループを抜ける */
} // for (iter) ここまで
/* 結果を出力 */
} // main ここまで
}
は省略した部分の処理概要を示している.また,アプリ
ケーションの全容は付録の図 A·1 および A·2 に掲載して
いる).作業の再分配は,9∼17 行目のコードにより行わ
れる.9 行目で生きているプレースの数を計算し,10∼11
行目で各プレースが分担すべき点の数を求める.そして,
14∼17 行目で,消失したプレースをスキップして(15 行
目)生きているプレースだけで担当部分の点の処理を並列
に行っている.
この処理の途中でプレースが消失した場合は,14 行目の
finish 文に対して DeadPlaceException が(MultipleExceptions に包まれて)通知され,23 行目の catch 文で
捕捉されるが,このプログラム例では単に無視しており,
他のプレースからの結果の破棄やリトライなどは行ってい
ない.これは,KMeans 計算では成功した部分の処理結果
だけでも利用した方が収束を早くできるためである.ただ
し,結果を正確にするため,収束したかどうかの判定はす
べての点が処理された場合のみ行っている.
なお,この例では,耐故障性サポートのために追加した
コードは図 5 では省略されている部分を含めても 10 行程
度であった.
4. 耐故障性のある X10 ライブラリ
図5
耐故障性のある KMeans 計算(スケルトン)
Fig. 5 Fault tolerant KMeans (skeleton).
前節であげた MontePi と KMeans はいずれも X10 の基
本機能だけを使って書かれたアプリケーションであったが,
がる可能性はあるが,計算結果が間違ったものになるわけ
大規模アプリケーションを効率よく開発するにはライブラ
ではない.このプログラムでは,参考情報として試行回数
リのサポートが不可欠である.分散処理のためのライブラ
も出力するようにしている.
リとして,X10 では分散配列(DistArray)が提供されてい
この例では,耐故障性サポートのために追加したコード
る.これは,要素が各プレースに分散した配列で,CAF の
は 8 行目の try 文と 22∼24 行目の catch 文だけである.
Co-Array などと似たデータ構造である.本節では,これ
をノード故障時にも動作するように拡張しスナップショッ
3.2 KMeans
KMeans はクラスタリング分析の代表的なアルゴリズム
ト機構を追加した「Resilient DistArray」と,それを使っ
たアプリケーションの例について紹介する.これにより,
で,d 次元空間上の n 個の点を K 個のクラスタに分類す
(c)定期スナップショットから実行を再開する
る *3 .その基本処理は,
(1)クラスタの中心となる K 個の
手法での耐故障性サポートが容易に実現可能になる.
座標に対して n 個の点を分類し,(2)各クラスタ内の点
の平均座標を新たなクラスタ中心座標とする,という作業
4.1 Resilient DistArray
の繰り返しである.クラスタ中心座標の変動が閾値以下と
分散アプリケーションの多くは,各計算ノードが大きな
なったら収束したと判断して終了する.分散環境で処理を
データの一部分について同じ処理を行うという SPMD 型
行う場合,n 個の点の一部を各ノードが受け持つことで(1)
の構造をしている.DistArray はそのような処理に適し
の計算を並列に行うことができる.そして,
(2)の処理の
たデータ構造で,要素が各プレースに散らばった配列であ
ために必要な情報だけを集計し,収束するまで処理を繰り
る.各要素はそれが存在するプレース上でのみアクセスで
返す.n 個の点の座標は最初に与えられて以降変化しない
き,そのプレース(上のアクティビティ)が処理を担当す
ので,あらかじめ配っておくことができる.
る.要素をプレースにどのように配置するかは Dist とい
このようなプログラムでは,ノード故障時に残りのノー
ドに作業を「再分配」することで耐故障性を実現することが
できる.図 5 は,その手法で耐故障性を追加した KMeans
アプリケーションのスケルトンである(図中,「/* ∼ */」
*3
本稿のプログラム例では,d = 4,n = 10, 000, 000,K = 4.
うデータで指定することができるが,詳細は X10 のドキュ
メント [14] や解説記事 [22] などを参考にしてほしい.
ノードが故障すると DistArray のそのプレース上の要
素はアクセス不能になってしまうため,SPMD 型の処理を
継続することができなくなる.処理を継続するためには,
5
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package x10.regionarray;
public class ResilientDistArray[T] ... {
public static def make[T](dist:Dist, init:(Point)=>T)
: ResilientDistArray[T];
public static def make[T](dist:Dist){T haszero}
: ResilientDistArray[T];
public final operator this(pt:Point) : T; // 要素アクセス
public final operator this(pt:Point)=(v:T) : T; // 値の設定
public final def map[S,U](dst:ResilientDistArray[S],
src:ResilientDistArray[U], filter:Region, op:(T,U)=>S)
: ResilientDistArray[S];
public final def reduce(op:(T,T)=>T, unit:T) : T;
:
// スナップショットを作成
public def snapshot() { snapshot_try();snapshot_commit(); }
public def snapshot_try() : void; // DPE が起きる可能性あり
public def snapshot_commit() : void;
// 新しい Dist で DistArray を作り直す
public def restore(newDist:Dist) : void;
public def remake(newDist:Dist, init:(Point)=>T) : void;
public def remake(newDist:Dist){T haszero} : void;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
図 6 スナップショット機能を持つ DistArray のインタフェース
Fig. 6 Interface of the DistArray with snapshot mechanisms.
24
25
26
27
28
生きているノード上に分散配列を再構成し,データを復元
しなければならない.ResilientDistArray はそのために
必要な機能を追加した,耐故障性のある分散配列である
*4 .
現在のバージョンは DistArray を内部に持つ別クラスと
して実装されており,そのサイズは 200 行程度である.将
来的には標準の分散配列に統合することも考えている.
29
30
31
class ResilientHeatTransfer {
static val N = 20; // 格子のサイズ
static val livePlaces = new ArrayList[Place]();
static val restore_needed = new Cell[Boolean](false);
:
public static def main(args:Rail[String]) {
for (pl in Place.places()) livePlaces.add(pl);
// Region と Dist の初期化
val BigR = Region.make(0..(N+1), 0..(N+1)); // 周囲を含む
var BigD:Dist(2) = Dist.makeBlock(BigR, 0,
new SparsePlaceGroup(livePlaces.toRail()));
// 分散配列の作成と初期化(各要素が温度を保持する)
val A = ResilientDistArray.make[Double](BigD, ...);
A.snapshot(); // 初期化時点のスナップショットを作成
for (iter in 1..ITERATIONS) { // 結果が収束するまで繰り返す
try {
if (restore_needed()) { // 消失したプレースがあった場合
BigD = Dist.makeBlock(BigR, 0, // Dist を作り直し,
new SparsePlaceGroup(livePlaces.toRail()));
A.restore(BigD); // スナップショットからデータを復元
restore_needed() = false;
}
finish ateach (z in D_Base) { // 分散配列の処理
/* A の自プレースにある点について,新たな温度を求める */
}
/* 収束した場合は for ループを抜ける */
if (iter % 10 == 0) A.snapshot(); // スナップショット
} catch (e:Exception) { processException(e); }
} // for (iter) ここまで
/* 結果を出力 */
} // main ここまで
32
private static def processException(e:Exception) {//例外処理
if (e instanceof DeadPlaceException) {
val deadPlace = (e as DeadPlaceException).place;
livePlaces.remove(deadPlace); restore_needed() = true;
} else ... /* MultiPlaceExceptions は再帰的に処理する */
}
33
34
35
36
37
38
図 6 に,試作した ResilientDistArray のインタフェー
39
}
スを示す([T] は型パラメータ,(T)=>U は関数の型,コ
ロンの右側は引数や返り値の型を表す).その基本機能は
DistArray と同じで,make メソッドにより生成でき,要
素をプレースにどう配置するかは第 1 引数の Dist により
図7
耐故障性のある HeatTransfer 計算(スケルトン)
Fig. 7 Fault tolerant HeatTransfer (skeleton).
4.2 HeatTransfer
指定される.この時,初期値設定用の関数を渡すこともで
前節の ResilientDistArray を使用するアプリケーショ
きる.要素へのアクセスは,それが存在するプレース上
ンの例として,HeatTransfer プログラムを取りあげる.こ
で「A(pt)」の形で行う.map や reduce などの集計処理メ
れは温度の伝播をシミュレートするアプリケーションで,
ソッドも使用可能である.
二次元配列の各要素で格子点の温度をあらわし,周囲 4 点
耐故障性サポートのための拡張は 14 行目以降の部分
の値を平均して新たな温度を求めるというステンシル計
で,snapshot メソッドにより分散配列の内容を Resilient
算処理が,収束するまで繰り返される.DistArray を用い
Storage に書き出すことができる.このスナップショット
ることで,複数の計算ノードを用いる分散処理として実装
は restore メソッドにより読み込むことができ,その際に
することが可能で,X10 の配布版にもいくつかのバリエー
Dist 情報を指定して要素を再配置することが可能である.
ションがサンプルとして含まれている.
スナップショットを読み込まず要素配置の変更と初期化を
図 7 は,ResilientDistArray を用いて耐故障性を追加
したい場合は,remake メソッドが使用できる.これらの機
した HeatTransfer アプリケーションのスケルトンである
能を用いて,必要なデータのスナップショットを定期的に
(アプリケーションの全容は付録の図 A·3 に掲載している)
.
とっておき,ノード故障時に,残りの「生きている」プレー
15∼29 行目の for ループが収束するまで繰り返されるが,
スの上に分散配列を再構成することで,スナップショット
その中では分散配列の各点について新しい温度を求める処
の時点から処理を再開することが可能になる.
理が並列・分散実行されている(23∼25 行目).耐故障性
*4
X10 の分散配列には現在,複雑な記述が可能な x10.regionarray
パッケージと,単純だが高速な x10.array パッケージの 2 種類
の実装が存在する.耐故障性拡張は両者について試作済みである
が,本稿では主に前者を用いて説明を行う.
サポートのために,10 回のイテレーションごとに分散配列
A のスナップショットが作成されている(27 行目).
計算中にノードが故障した場合は,(MultipleExcep-
6
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
tions に 包 ま れ た )DeadPlaceException が 28 行 目 の
catch 文で捕捉され,processException メソッドが呼び出
される.このメソッドでは死亡したプレースを livePlaces
リストから取り除き,restore needed というフラグを立
てる(36 行目).このフラグは毎回のイテレーションの先
頭でチェックされ(17 行目),立っていた場合は,生きて
いるプレース(livePlaces)だけを使用するように分散配
列 A を再構成し,スナップショットからデータを復元する
(18∼20 行目)
.これにより実行は少し巻き戻るが,故障し
たノードを除いて途中結果から処理を継続することが可能
になる.
1.6
1.4
1.2
1.0
0.8
0.6
0.4
0.2
0.0
Base on X10
FT on X10
Base on ResX10
MontePi
FT on ResX10
KMeans
図8
6.1 6.5
HeatTransfer
実行時間の比較
Fig. 8 Relative execution times.
この例では,耐故障性サポートのための変更は,Dist-
Array を ResilientDistArray に置き換えるほかには,図
れを,従来の X10 と Resilient X10 の上で実行し,実行時間
7 で省略されている部分を含めても 25 行程度であった.こ
R
を比較した.実験は,2.7 GHz の Intel⃝
Xeon⃝R プロセッ
のアプリケーションは単純なものなので相対的には多く見
サ E5-2680 を 2 個(合計 16 コア)搭載した x86 64 マシン
えるが,大規模な分散配列アプリケーションでも基本的に
R
である IBM⃝
BladeCenter⃝R HS23(7875-C5J)を 4 台用い
は同じ仕組みで対応可能であり,その場合の変更比率は下
て行った.マシン同士は 40 Gbps の InfiniBand で接続さ
がると期待できる.また,変更量の半分弱は例外処理コー
R
れており,それぞれ Red Hat Enterprise Linux⃝
Server 6.3
ド processException なので,この部分をライブラリとし
が動いている.X10 処理系は Native X10 2.4.2 で,通信は
て提供することも考えられる.
ソケット通信レイヤ,Resilient Storage にはプレース 0 を
5. 評価
次 に ,前 節 ま で で 述 べ た 3 つ の ア プ リ ケ ー シ ョ ン ,
使用している.各マシンで 2 つずつのプレースを動かし,
合計 8 プレースの環境で実行している.各アプリケーショ
ンは,
「x10c++ -O -NO CHECKS」でコンパイルし,それぞ
MontePi,KMeans,HeatTransfer への耐故障性の追加に
れ最外の for 文の実行にかかった時間を測定した.内部の
ついて,いくつかの観点から評価を行う.
print 文はすべてオフにしてある.結果として,10 回測定
したうちのベストスコアを使用している.
5.1 コード変更量
図 8 が実験結果で,それぞれのアプリケーションについ
各節の説明でも示したとおり,耐故障性を追加したアプ
て,左の 2 本がベースコードと耐故障版の従来の X10 上で
リケーションやライブラリの,ベースとなるコードからの
の実行時間,右の 2 本が Resilient X10 上での実行時間であ
変更は非常にわずかである.MontePi は 4 行, KMeans
る.アプリケーションごとに,ベースコードを従来の X10
は 10 行程度, HeatTransfer は 25 行程度 のコード追加で,
で動かした場合の実行時間を 1 として正規化している.
耐故障性サポートを行えた.また,分散配列へのスナップ
まず,従来の X10 上で動かした場合の性能を比較する
ショット機能追加は 200 行程度で可能であった.なお,変
と,MontePi と KMeans については,耐故障性を追加した
更したプログラムはどれも,従来の X10 上でも(ノードが
ことによるオーバーヘッドはほとんど見られない.一方,
故障しない限りは)実行可能である.
HeatTransfer では耐故障性バージョンは 9%程度時間が余
このように,Resilient X10 では少ない変更量で既存の
分散アプリケーションに耐故障性を追加することができ
計にかかっている.これは,定期的にスナップショット作
成が行われるためである.
ている.ただし,どのようなアプローチで変更を行うかに
次に,各アプリケーションのベースコードを従来の X10
ついては各アプリケーションの処理構造に関する理解が
と Resilient X10 で動かした場合の性能を比較する.Re-
必要となる.本稿では,
(a)無視して正常ノードの結果だ
silient X10 はアクティビティの状態などを Resilient Storage
けを使う MontePi,
(b)残りのノードに作業を再分配する
に保持する必要があるため,at や async の処理コストが
KMeans,(c)定期スナップショットから実行を再開する
従来の X10 より高い.そのため,耐故障性のないベース
HeatTransfer,の 3 つのアプローチを示した.
コードの実行も従来の X10 より若干遅くなってしまう.
MontePi は at 処理が各プレースごとに 2 回だけであるた
5.2 実行性能
め,実行時間の増加は 2%程度であるが,KMeans では収束
次に,耐故障性の追加がどの程度性能に影響するのかに
するまで複数回の at 処理が行われるので,8%程度時間が余
ついての評価を行った.3 つのアプリケーションについて,
計にかかっている.さらに,HeatTransfer は Resilient X10
ベースとなるコードと耐故障性を追加したコードのそれぞ
上では 6 倍も時間がかかってしまう.これは,内部のス
7
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
テンシル計算の際に at 処理が大量に行われているためで
うにライブラリレベルで行われることが多い.その場合,
ある.
計算ノードの故障はデータの送受信やリモート呼び出しが
これら 2 つのオーバーヘッドを総合すると,ベースとな
失敗するという低いレベルのエラーとして現れる.そのた
るコードを従来の X10 で動かした場合と比べ,耐故障性を
め,故障への対応は各リモート呼び出しについてのエラー
備えたアプリケーションを Resilient X10 上で動かした場
処理という形で各アプリケーションが逐一行わなければな
合のオーバーヘッドは,MontePi で 2.2%,KMeans では
らない.これに対し Resilient X10 では PGAS の特徴を利
9.0%程度であった.HeatTransfer は,今のところ 6.5 倍の
用し,ノード故障を計算モデルの一部として扱えるように
時間がかかっているが,これはスナップショットを取る頻
している [2][3].アプリケーションの変更は依然として必
度によっても左右される.また必要以上に at 処理を行わ
要であるが,本稿で述べたいくつかのパターンを活用する
ないようにプログラムを改良することで改善できる可能性
ことで定型的に対応できる可能性が高い点や,言語の基本
があると考えている.
機能とすることでアプリケーションに最初から組み込んで
おける点などが有用であると考えている.
5.3 耐故障性
計算ノードの故障に対する対策は,特に HPC アプリケー
前節の性能比較はいずれも,ノード故障が起きなかった
ションで重要である.最も一般的なものはアプリケーショ
場合の測定結果である.最後に,ノードが故障した場合の
ンによる「チェックポインティング」で,定期的に処理中
挙動についての評価を示す.
のデータのスナップショットをとり,ノード故障が発生し
まず,プレース 2 を担当するマシンの X10 プロセスを
た場合は正常ノードでアプリケーションを再起動しスナッ
外部から kill コマンドで強制終了した場合の挙動を調
プショットを読み込んで処理を再開するというものであ
べた.従来の X10 では,動いているアプリケーションの
る [4].我々が第 3 のアプローチとして示した HeatTransfer
耐故障性の有無にかかわらず,その時点で全体の処理が
の耐故障性対応は,同様の考えに基づいたものである.た
終了してしまった.Resilient X10 上であっても,耐故障
だし,スナップショット機構の大部分を分散処理ライブラ
性を持たないベースアプリケーションを実行していた場
リ DistArray 内に閉じ込め再利用しやすくしている点や,
合は,全体が DeadPlaceException(もしくはそれを含む
再開をアプリケーションの再起動でなくアプリケーション
MultipleExceptions)で終了してしまった.一方,耐故
内で行っている点などに特徴がある.Hadoop [20] では,
障性を追加したアプリケーションを Resilient X10 上で動
各段階の処理結果を外部ストレージ(HDFS)に書き出す
かした場合,プレースが消失しても処理が継続され,結果
ことで,各ステップ単位での耐故障性が実現されている.
が出力されることを確認できた.これは,耐故障性のある
これは,チェックポインティングを標準的に行っていると
アプリケーションと Resilient X10 の組み合わせによって
考えることができる.
ノード故障に耐えられる処理環境が達成できていることを
最近では,大規模な処理をクラウドなどの仮想化された
示している.参考として,HeatTransfer のプレース消失を
環境の上で行うことも一般的である.その場合,仮想マシ
含む実行例を付録の図 A·4 に掲載した.
ンのスナップショット [18] やマイグレーション [1][8] の
次に,プレース消失の影響について調査した.MontePi
機能をうまく利用すれば,耐故障性を実現できる可能性が
では,プレース消失は精度の低下につながる.8 プレース
ある [11].しかし,仮想マシン全体を保存・移動するには
のうちプレース 4∼7 を途中で消失させた場合,アプリケー
オーバーヘッドがあり,分散アプリケーションで利用する
ション自体は正しく終了し実行時間も変わらないが,総試
には工夫が必要であろう.Resilient X10 やその上のアプリ
行回数が減るため求められた π の値の誤差が 0.0008%から
ケーションがこれらの機能をどう活用できるかについては
0.002%に拡大した.KMeans や HeatTransfer では,プレー
今後研究していきたい.
スが消失すると処理の一部が失われたりスナップショット
まで処理が巻き戻るため,収束までのイテレーション数が
7. おわりに
増える.また,その後の 1 プレースあたりの処理量も増え
本稿では,分散プログラミング言語 X10 の拡張機能で
るので,時間が余計にかかるようになる.17 回目のイテ
ある「Resilient X10」を用いて耐故障性のあるアプリケー
レーションの途中でプレース 2 を消失させた場合,実行は
ションをどのように構成するかについて述べた.故障が起
正常に終了するが,KMeans では 11%,HeatTransfer では
きた場合のアプリケーションの対応として,
(a)無視して
14%実行時間が増加した.
正常ノードの結果だけを使う,
(b)残りのノードに作業を
6. 関連研究
複数の計算ノードを使う分散処理のためのサポートは,
MPI [9] や RMI [5],DB や Web サーバへのアクセスのよ
再分配する,(c)定期スナップショットから実行を再開す
る,の 3 手法を紹介したが,いずれの場合もアプリケーショ
ンの変更はごくわずかで済んだ.また,スナップショット
機能を備えた分散配列ライブラリについても紹介した.
8
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
従来の X10 で耐故障性のないアプリケーションを動か
した場合と比べ,Resilient X10 上で耐故障性のあるアプリ
ケーションを動かした場合のオーバーヘッドは,プレース
[8]
移動がそれほど頻繁でない場合は 2.2∼9.0%程度であった.
一方,プレース移動が頻繁に行われるプログラムでは実行
時間が 6.5 倍になるケースがあることも判明した.主な要
因は,定期スナップショットのコストと,at 処理のコスト
[9]
[10]
増加である.しかしこれらの追加コストを支払うことで,
耐故障性を備えたアプリケーションは,プレースが消失し
ても処理を継続できることを確認できた.8 プレースのう
ち 1 つが消失した場合の実行時間増加は,そのタイミング
[11]
にもよるが 11∼14%程度であった.
今後の作業として,Resilient X10 およびアプリケーショ
ン自身の改良によるオーバーヘッドの削減,耐故障性を備
えたライブラリの整備などを考えている.また,[10] や [15]
で紹介している,より大規模で実用的な分散 X10 アプリ
ケーションへの耐故障性の追加も検討中である.
謝辞
普段より有益な議論とコメントをいただいている,IBM
[12]
[13]
[14]
[15]
T. J. ワトソン研究所と東京基礎研究所の X10 プロジェク
トのメンバーに感謝します.なお,Resilient X10 の研究
の一部は,U. S. Air Force Office of Scientific Research
[16]
(Contract No. FA8750-13-C-0052)によって援助されてい
ます.
参考文献
[1]
[2]
[3]
[4]
[5]
[6]
[7]
Clark, C., Fraser, K., Hand, S., Hansen, J. G., Jul, E.,
Limpach, C., Pratt, I. and Warfield, A.: Live Migration
of Virtual Machines, Proceedings of the 2nd Conference on
Symposium on Networked Systems Design and Implementation (NSDI ’05), pp. 273–286 (2005).
Crafa, S., Cunningham, D., Saraswat, V., Shinnar, A. and
Tardieu, O.: Semantics of (Resilient) X10, Proceedings of
the 28th European Conference on Object-Oriented Programming (ECOOP ’14) (2014, to appear).
Cunningham, D., Grove, D., Herta, B., Iyengar, A.,
Kawachiya, K., Murata, H., Saraswat, V., Takeuchi, M.
and Tardieu, O.: Resilient X10: Efficient Failure-Aware
Programming, Proceedings of the 19th ACM SIGPLAN
Symposium on Principles and Practice of Parallel Programming (PPoPP ’14), pp. 67–80 (2014).
Elnozahy, E. N., Alvisi, L., Wang, Y.-M. and Johnson, D. B.: A Survey of Rollback-Recovery Protocols
in Message-Passing Systems, ACM Computing Survey,
Vol. 34, No. 3, pp. 375–408 (2002).
Grosso, W.: Java RMI, O’Reilly (2001).
Kawachiya, K., Takeuchi, M., Zakirov, S. and Onodera,
T.: Distributed Garbage Collection for Managed X10,
Proceedings of the 2012 ACM SIGPLAN X10 Workshop
(X10 ’12) (2012).
Kumar, S., Mamidala, A. R., Faraj, D. A., Smith, B.,
Blocksome, M., Cernohous, B., Miller, D., Parker, J., Ratterman, J., Heidelberger, P., Chen, D. and SteinmacherBurow, B.: PAMI: A Parallel Active Message Interface
[17]
[18]
[19]
[20]
[21]
[22]
for the Blue Gene/Q Supercomputer, Proceedings of the
2012 IEEE 26th International Parallel and Distributed Processing Symposium (IPDPS ’12) (2012).
Medina, V. and Garcia, J. M.: A Survey of Migration
Mechanisms of Virtual Machines, ACM Computing Survey, Vol. 46, No. 3 (2014).
MPI Forum: Message Passing Interface Forum, http:
//www.mpi-forum.org/.
Murata, H., Horie, M., Shirahata, K., Doi, J., Tai, H.,
Takeuchi, M. and kawachiya, K.: Writing HPC Applications in X10 Parallel Distributed Programming Language, Manuscript for presentation at 98th IPSJ SIG Programming Workshop (2014).
Nagarajan, A. B., Mueller, F., Engelmann, C. and Scott,
S. L.: Proactive Fault Tolerance for HPC with Xen Virtualization, Proceedings of the 21st Annual International
Conference on Supercomputing (ICS ’07), pp. 23–32 (2012).
OpenMP.org: The OpenMP: API Specification for Parallel Programming, http://www.openmp.org/.
PGAS: Partitioned Global Address Space, http://www.
pgas.org/.
Saraswat, V., Bloom, B., Peshansky, I., Tardieu,
O. and Grove, D.: X10 Language Specification,
http://x10.sourceforge.net/documentation/
languagespec/x10-latest.pdf.
Shinnar, A., Cunningham, D., Herta, B. and Saraswat,
V.: M3R: Increased Performance for In-Memory Hadoop
Jobs, Proceedings of the VLDB Endowment, Vol. 5, No. 12,
pp. 1736–1747 (2012).
Takeuchi, M., Makino, Y., Kawachiya, K., Horii, H.,
Suzumura, T., Suganuma, T. and Onodera, T.: Compiling X10 to Java, Proceedings of the 2011 ACM SIGPLAN
X10 Workshop (X10 ’11) (2011).
The High Performance Computing Laboratory: Unified Parallel C at George Washington University, http:
//upc.gwu.edu/.
VMware Knowledge Base: Understanding Virtual
Machine Snapshots in VMware ESXi and ESX,
http://kb.vmware.com/selfservice/microsites/
search.do?cmd=displayKC&externalId=1015180.
Wallcraft, A.: Co-Array Fortran Homepage, http://
www.co-array.org/.
White, T.: Hadoop: The Definitive Guide, 3rd Edition,
O’Reilly Media / Yahoo Press (2012).
X10 Project: X10: Performance and Productivity at
Scale, http://x10-lang.org/.
河内谷清久仁:マルチコア時代のプログラミング言語
「X10」,情報処理, Vol. 52, No. 3, pp. 342–356 (2011).
IBM および BladeCenter は,世界の多くの国で登録された International Business Machines Corporation の 商 標 で す .他 の 製 品
名およびサービス名等は,それぞれ IBM または各社の商標であ
る 場 合 が あ り ま す .現 時 点 で の IBM の 商 標 リ ス ト に つ い て は ,
www.ibm.com/legal/copytrade.shtml をご覧ください.Java およ
びすべての Java 関連の商標およびロゴは Oracle やその関連会社の
米国およびその他の国における商標または登録商標です.Intel およ
び Xeon は Intel Corporation または子会社の米国およびその他の国
における登録商標です.Linux は Linus Torvalds の米国およびその
他の国における登録商標です.
9
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1
2
3
4
5
6
import x10.regionarray.*;
class ResilientKMeans {
static val DIM = 4n;
static val POINTS = 10000000;
static val CLUSTERS = 4;
static val ITERATIONS = 1000;
//
//
//
//
4 次元の点を扱う
点の数
いくつのクラスタに分けるか
最大イテレーション数
7
8
9
10
11
12
13
14
15
public static def main(args:Rail[String]) {
val place0 = here;
// クラスタリングする点の集合を用意(i 番目の点の座標は,[pt(i,0),pt(i,1),pt(i,2),pt(i,3)]) ※点の集合は変化しない
val points_region = Region.make(0..(POINTS-1), 0..(DIM-1)), rnd = new x10.util.Random(0);
val points_master = new Array[Float](points_region, (p:Point)=>rnd.nextFloat());
val points_local = PlaceLocalHandle.make[Array[Float]](PlaceGroup.WORLD, ()=>points_master); // 点の集合を各プレースに送信
// クラスタ情報を保持する配列(k 番目のクラスタの中心座標は,[cl(k*4),cl(k*4+1),cl(k*4+2),cl(k*4+3)])
val central_clusters = new Rail[Float](CLUSTERS*DIM, (i:Long)=>points_master(i/DIM, i%DIM)); // 初期値は i 番目の点の座標
16
// 計算処理用のデータ構造
val old_central_clusters = new Rail[Float](CLUSTERS*DIM); // 一つ前のクラスタ情報を保持する配列
val central_cluster_counts = new Rail[Long](CLUSTERS); // 各クラスタに属する点の数
val processed_points = new Cell[Long](0); // 処理された点の数
// 他プレースからアクセスするためのグローバル参照を用意
val central_clusters_gr = GlobalRef(central_clusters);
val central_cluster_counts_gr = GlobalRef(central_cluster_counts);
val processed_points_gr = GlobalRef(processed_points);
// 各プレースにローカル処理用の配列を 3 つずつ用意
val local_curr_clusters = PlaceLocalHandle.make[Rail[Float]](PlaceGroup.WORLD, ()=>new Rail[Float](CLUSTERS*DIM));
val local_new_clusters = PlaceLocalHandle.make[Rail[Float]](PlaceGroup.WORLD, ()=>new Rail[Float](CLUSTERS*DIM));
val local_cluster_counts = PlaceLocalHandle.make[Rail[Long]](PlaceGroup.WORLD, ()=>new Rail[Long](CLUSTERS));
17
18
19
20
21
22
23
24
25
26
27
28
29
図 A·1
耐故障性のある KMeans 計算(データ構造の定義)
Fig. A·1 Fault tolerant KMeans (data structures).
付
録
A.1 耐故障性のある KMeans と HeatTransfer
の全容
本文で述べた耐故障性を備えた KMeans と HeatTransfer
アプリケーションの全体コードを,図 A·1 および A·2 と
図 A·3 に示す.これらのコードは現時点での最新リリース
である X10 2.4.2 上でコンパイル・実行可能である.また,
MontePi や Resilient DistArray の複数の実装も合わせ,同
等のコードが samples/resiliency/ディレクトリの下に
収録されている.詳しい実行方法などについては,同ディ
レクトリの README.txt ファイルを参照してほしい.
A.2 耐故障性のあるアプリケーションの実行例
図 A·4 は,サンプルとして含まれている耐故障性のある
HeatTranafer プログラムを Native X10 でコンパイルし,
10x10 のサイズについて 8 プレース上で実行した場合の挙
動である.10 イテレーションごとにスナップショットが作
成されている(42 行目など).
このプログラムでは,分散配列の生成および再構成時に,
各点がどのプレースに割り当てられているかを表示する
ようにしている.実行中に外部からの kill コマンドでプ
レース 2 と 7 を消失させているが,その際には DeadPlace-
Exception が適切に通知され,次のイテレーションで点の
配置の調整とスナップショットからの復元が行われている
ことが見てとれる(74 行目と 101 行目)
.
10
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
for (iter in 1..ITERATIONS) { Console.OUT.println("Iteration " + iter); // 結果が収束するまで以下の処理を繰り返す
// 1. 現在のクラスタ情報を各プレースに配る
try {
finish for (pl in Place.places()) { if (pl.isDead()) continue; // 消失したプレースはスキップ
at (pl) async { // 生きている各プレースで並列に処理
for (var j:Long = 0; j < CLUSTERS*DIM; ++j) {
local_curr_clusters()(j) = central_clusters(j); local_new_clusters()(j) = 0f; }
for (var j:Long = 0; j < CLUSTERS; ++j) local_cluster_counts()(j) = 0;
} }
} catch (es:MultipleExceptions) {
for (e in es.exceptions()) { if (!(e instanceof DeadPlaceException)) throw e; } // プレース消失の通知は単に無視する
}
// 2. 現在のクラスタ情報を退避し,ゼロクリア
for (var j:Long = 0; j < CLUSTERS*DIM; ++j) { old_central_clusters(j) = central_clusters(j); central_clusters(j) = 0f; }
for (var j:Long = 0; j < CLUSTERS; ++j) central_cluster_counts(j) = 0; processed_points() = 0;
// 3. 生きている各プレースで,割り当てられた点がどのクラスタに属するかを並列に計算する
val numAvail = Place.MAX_PLACES - Place.numDead(); // 生きているプレースの数
val div = POINTS / numAvail, rem = POINTS % numAvail; // 各プレースが担当する点の数と,プレース 0 の追加担当分
var start:Long = 0; // 次に処理すべき点の番号
try {
finish for (pl in Place.places()) { if (pl.isDead()) continue; // 消失したプレースはスキップ
var end:Long = start + div; if (pl==place0) end += rem; // [start,end) の番号の点がこのプレースで処理される
val s = start, e = end;
at (pl) async { // 生きている各プレースで並列に処理
for (var j:Long = s; j < e; ++j) { val p = j; // p 番目の点についての処理
val points = points_local(); var closest:Long = -1, closest_dist:Float = Float.MAX_VALUE;
for (var k:Long = 0; k < CLUSTERS; ++k) { // 現在のクラスタのうちで最も近いものを探す
var dist:Float = 0f;
for (var d:Long = 0; d < DIM; ++d) { // k 番目のクラスタとの距離を計算
val tmp = points(p,d) - local_curr_clusters()(k*DIM+d); dist += tmp * tmp; }
if (dist < closest_dist) { closest_dist = dist; closest = k; }
}
local_cluster_counts()(closest)++; // 最も近いクラスタに点の座標値を加算
for (var d:Long = 0; d < DIM; ++d) local_new_clusters()(closest*DIM+d) += points(p,d);
} // 担当する点の処理,ここまで
val tmp_new_clusters = local_new_clusters(), tmp_cluster_counts = local_cluster_counts(), tmp_processed_points = e-s;
at (place0) atomic { // 結果をマスターに戻す
for (var j:Long = 0; j < CLUSTERS*DIM; ++j) central_clusters_gr()(j) += tmp_new_clusters(j);
for (var j:Long = 0; j < CLUSTERS; ++j) central_cluster_counts_gr()(j) += tmp_cluster_counts(j);
processed_points_gr()() += tmp_processed_points;
} }
start = end;
} // finish 文の終わり,全プレースの処理が終わるまで待つ
} catch (es:MultipleExceptions) {
for (e in es.exceptions()) { if (!(e instanceof DeadPlaceException)) throw e; } // プレース消失の通知は単に無視する
}
// 4. 新しいクラスタ値を計算し,収束したかどうかをチェックする
for (var k:Long = 0; k < CLUSTERS; ++k)
for (var d:Long = 0; d < DIM; ++d) central_clusters(k*DIM+d) /= central_cluster_counts(k);
if (processed_points() == POINTS) { // すべての点が処理された時だけ,収束チェックを行う
var b:Boolean = true;
for (var j:Long = 0; j < CLUSTERS*DIM; ++j)
if (Math.abs(old_central_clusters(j) - central_clusters(j)) > 0.0001) { b = false; break; }
if (b) break; // 収束した場合,処理を終了
}
} // for (iter) ここまで
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// central_clusters に入っている結果を出力
for (var d:Long = 0; d < DIM; ++d) {
for (var k:Long = 0; k < CLUSTERS; ++k) Console.OUT.printf("%10.8f ", central_clusters(k*DIM+d));
Console.OUT.println("<--- dim" + d);
}
} // main ここまで
87
88
89
90
91
92
93
}
図 A·2
耐故障性のある KMeans 計算
Fig. A·2 Fault tolerant KMeans.
11
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1
2
3
4
5
6
import x10.regionarray.*;
class ResilientHeatTransfer {
static val N = 20;
// 格子のサイズ
static val ITERATIONS = 1000; // 最大イテレーション数
static val livePlaces = new x10.util.ArrayList[Place](); // 生きているプレースの集合
static val restore_needed = new Cell[Boolean](false);
// リストアが必要かどうか
7
public static def main(args:Rail[String]) {
for (pl in Place.places()) livePlaces.add(pl);
val BigR = Region.make(0..(N+1), 0..(N+1)); // 周囲を含んだ二次元領域
val SmallR = Region.make(1..N, 1..N);
// 周囲を含まない NxN の二次元領域
val LastRow = Region.make(0..0, 1..N);
// 一番上の熱源部分の領域
// プレース消失時に再定義されるデータの初期化
var BigD:Dist(2) = Dist.makeBlock(BigR, 0, new SparsePlaceGroup(livePlaces.toRail()));
var SmallD:Dist(2) = BigD|SmallR;
var D_Base:Dist = Dist.makeUnique(SmallD.places());
// 分散配列の作成と初期化(各要素が温度を保持する,LastRow 部分は常に温度 1.0)
val A = ResilientDistArray.make[Double](BigD, (p:Point)=>{ LastRow.contains(p) ? 1.0 : 0.0 });
val Temp = ResilientDistArray.make[Double](BigD); // 新しい結果を一時的に保持する分散配列
val Scratch = ResilientDistArray.make[Double](BigD);
A.snapshot(); // 初期化時点のスナップショットを作成
for (iter in 1..ITERATIONS) { Console.OUT.println("Iteration " + iter); // 結果が収束するまで以下の処理を繰り返す
try {
// 1. 必要ならスナップショットからのリストアを行う
if (restore_needed()) {
// 使用可能なノードから Dist を再作成
BigD = Dist.makeBlock(BigR, 0, new SparsePlaceGroup(livePlaces.toRail()));
SmallD = BigD|SmallR; D_Base = Dist.makeUnique(SmallD.places());
A.restore(BigD); // 新しい分散で配列を作り直し,スナップショットからデータを復元
Temp.remake(BigD); Scratch.remake(BigD);
restore_needed() = false;
}
// 2. 計算の中心部分,温度の伝播処理
val D = SmallD;
finish ateach (z in D_Base) { // 各プレースで並列に以下の処理を行う
for (p:Point(2) in D|here) { // 自プレースにある点について処理を行う
val [x,y] = p; // ステンシル計算,周囲の温度の平均値を自分の温度にする
Temp(p) = ( (at (A.dist(x-1,y)) A(x-1,y)) + (at (A.dist(x+1,y)) A(x+1,y))
+ (at (A.dist(x,y-1)) A(x,y-1)) + (at (A.dist(x,y+1)) A(x,y+1)) ) / 4;
} }
// 3. 新しい結果と前回の差の最大値を求め,収束していたら処理を終了
val delta = A.map(Scratch, Temp, D.region, (a:Double,b:Double)=>Math.abs(a-b))
.reduce((a:Double,b:Double)=>Math.max(a,b), 0.0);
Temp.map(A, Temp, D.region, (a:Double,b:Double)=>a); // 新しい結果を Temp から A に分散コピー
if (delta <= 0.0001) break; // 収束した場合,処理を終了
// 4. 10 イテレーションごとにスナップショット作成
if (iter % 10 == 0) A.snapshot();
} catch (e:Exception) { processException(e); } // 例外の処理
} // for (iter) ここまで
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 分散配列 A に入っている結果を出力
for ([x] in A.region.projection(0)) {
for ([y] in A.region.projection(1)) Console.OUT.printf("%5.3f ", at (A.dist(x,y)) A(x,y));
Console.OUT.println();
}
} // main ここまで
51
52
53
54
55
56
57
// 例外の処理,プレース消失の通知に対して livePlaces を更新し restore_needed フラグをセット
private static def processException(e:Exception) {
if (e instanceof DeadPlaceException) {
val deadPlace = (e as DeadPlaceException).place;
livePlaces.remove(deadPlace); restore_needed() = true;
} else if (e instanceof MultipleExceptions) {
val exceptions = (e as MultipleExceptions).exceptions();
for (ec in exceptions) processException(ec);
} else throw e; // DeadPlaceException 以外の例外はそのまま投げる
}
58
59
60
61
62
63
64
65
66
67
68
}
図 A·3
耐故障性のある HeatTransfer 計算
Fig. A·3 Fault tolerant HeatTransfer.
12
2013-5-(1): 情報処理学会プログラミング研究会 発表資料 2014 年 3 月 17 日
1
2
3
$ cd X10/242/samples/resiliency
$ x10c++ -O -NO_CHECKS ResilientHeatTransfer.x10 \
-o ResilientHeatTransfer
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
$ X10_NPLACES=8 X10_RESILIENT_MODE=1 \
runx10 ResilientHeatTransfer 10
HeatTransfer for 10x10, epsilon=1.0E-5
0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1
2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2
3 3 3 3 3 3 3 3 3 3 3 3
3 3 3 3 3 3 3 3 3 3 3 3
4 4 4 4 4 4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 5 5 5 5
6 6 6 6 6 6 6 6 6 6 6 6
7 7 7 7 7 7 7 7 7 7 7 7
X10_RESILIENT_STORE_MODE=0
X10_RESILIENT_STORE_VERBOSE=0
---- Iteration: 1
delta=0.25
---- Iteration: 2
delta=0.125
---- Iteration: 3
delta=0.078125
---- Iteration: 4
delta=0.0546875
---- Iteration: 5
delta=0.046875
---- Iteration: 6
delta=0.040283203125
---- Iteration: 7
delta=0.0345458984375
---- Iteration: 8
delta=0.030227661132812
---- Iteration: 9
delta=0.026214599609375
---- Iteration: 10
delta=0.023305892944336
Create a snapshot at iteration 10
---- Iteration: 11
delta=0.020451545715332
---- Iteration: 12
delta=0.01840215921402
---- Iteration: 13
delta=0.017098233103752
---- Iteration: 14
delta=0.015642106533051
---- Iteration: 15
delta=0.014587569981813
---- Iteration: 16
delta=0.013376099523157
---- Iteration: 17
delta=0.012489892076701
---- Iteration: 18
delta=0.01148970273789
---- Iteration: 19
delta=0.010746632506198
---- Iteration: 20
delta=0.009920609742949
Create a snapshot at iteration 20
---- Iteration: 21
delta=0.009296156255459
---- Iteration: 22
delta=0.008611284095025
---- Iteration: 23
delta=0.008084273527686
:
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
図 A·4
---- Iteration: 38
delta=0.003633990233121
---- Iteration: 39
<-------- ここで Place 2
Place 2 exited unexpectedly with signal: Terminated
MultipleExceptions size=2
DeadPlaceException thrown from Place(2)
DeadPlaceException thrown from Place(2)
---- Iteration: 40
Create new Dist over available 7 places
0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1
3 3 3 3 3 3 3 3 3 3 3 3
3 3 3 3 3 3 3 3 3 3 3 3
4 4 4 4 4 4 4 4 4 4 4 4
4 4 4 4 4 4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 5 5 5 5
5 5 5 5 5 5 5 5 5 5 5 5
6 6 6 6 6 6 6 6 6 6 6 6
7 7 7 7 7 7 7 7 7 7 7 7
Restore from a snapshot at iteration 30
delta=0.005177850168275
Create a snapshot at iteration 40
---- Iteration: 41
delta=0.004930303185298
:
---- Iteration: 84
delta=7.5E-4
---- Iteration: 85
<-------- ここで Place 7
Place 7 exited unexpectedly with signal: Terminated
MultipleExceptions size=1
DeadPlaceException thrown from Place(7)
---- Iteration: 86
Create new Dist over available 6 places
0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1
3 3 3 3 3 3 3 3 3 3 3 3
3 3 3 3 3 3 3 3 3 3 3 3
4 4 4 4 4 4 4 4 4 4 4 4
4 4 4 4 4 4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 5 5 5 5
5 5 5 5 5 5 5 5 5 5 5 5
6 6 6 6 6 6 6 6 6 6 6 6
6 6 6 6 6 6 6 6 6 6 6 6
Restore from a snapshot at iteration 80
delta=8.5E-4
---- Iteration: 87
delta=8.1E-4
:
---- Iteration: 193
delta=1.0E-5
---- Iteration: 194
delta=9.7E-6
Result converged
---- Result
0.000 1.000 1.000 1.000 1.000 1.000 1.000 1.000 1.000 1.000 1.000
0.000 0.491 0.679 0.761 0.799 0.815 0.815 0.799 0.761 0.679 0.491
0.000 0.285 0.463 0.566 0.622 0.646 0.646 0.622 0.566 0.463 0.285
0.000 0.184 0.324 0.419 0.475 0.501 0.501 0.475 0.419 0.324 0.184
0.000 0.126 0.232 0.309 0.358 0.382 0.382 0.358 0.309 0.232 0.126
0.000 0.089 0.167 0.227 0.267 0.287 0.287 0.267 0.227 0.167 0.089
0.000 0.064 0.121 0.166 0.197 0.212 0.212 0.197 0.166 0.121 0.064
0.000 0.045 0.086 0.118 0.141 0.153 0.153 0.141 0.118 0.086 0.045
0.000 0.031 0.058 0.081 0.097 0.105 0.105 0.097 0.081 0.058 0.031
0.000 0.019 0.036 0.051 0.061 0.066 0.066 0.061 0.051 0.036 0.019
0.000 0.009 0.017 0.024 0.029 0.032 0.032 0.029 0.024 0.017 0.009
0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000
が消失
が消失
0.000
0.000
0.000
0.000
0.000
0.000
0.000
0.000
0.000
0.000
0.000
0.000
ノード故障が起きた場合の HeatTransfer の挙動
Fig. A·4 Behavior of HeatTransfer with node failures.
13