view paper/chapter3.tex @ 22:b0c6d082b264

about process of CodeSegment dependency
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Wed, 08 Feb 2012 23:26:08 +0900
parents ef806c34566e
children 93d86a33d684
line wrap: on
line source

\chapter{分散フレームワーク Alice の実装}

前章では、分散ネットワークアプリケーションフレームワーク Alice の設計について示した。
本章では、それらの設計を踏まえ、 Java でどのように実装を行うのがよいかということから始め、具体的にどのような実装を行ったかについて、詳細に記すことにする。

\section{Java}

Federated Linda は Java を用いて実装が行われていたが、シングルスレッドを用いた設計であったがために、並列処理が含まれていなかった。今回の実装を行う前に並列処理の観点から、 Java について改めて見直し、フレームワークの実装方法について考えることにした。

\subsection{Java の選定}
Alice の実装に Java を選んだ理由は、充実したライブラリ群の存在である。
java.io、 java.nio 等の入出力ライブラリを始め、 java.net 等のネットワークライブラリも揃っている。また、並列処理ライブラリである java.util.concurrent が非常に強力である。ここでは java.util.concurrent の非常に便利な部品を挙げる。

\subsubsection{java.util.concurrent.BlockingQueue}
BlockingQueue は、並列で使用できるキュー構造である。複数のスレッドがデータの追加、取り出しを行なっても、問題なく動作する。
また、キュー内にデータがない場合、取り出しを行うときにブロッキングが入る。(ソースコード \ref{src:blockingQueue})そのため、データが来るまで待つという処理を記述することができる。

\begin{lstlisting}[label=src:blockingQueue, caption=キューにデータがない場合、処理がブロックされる]
// 別スレッドで blockingQueue.put(value); されるまでブロック
blockingQueue.take();
\end{lstlisting}

そういった機能を持つ故に BlockingQueue は異なるスレッド間で通信を行うのにしばしば用いられる。例えば、 SEDA アーキテクチャで作られた Cassandra の実装においても、ステージスレッド間でパイプライン処理をする際、データの受け渡しに使用されている。

\subsubsection{java.util.concurrent.ConcurrentMap}
ConcurrentMap は並列で使用できるハッシュ構造である。複数のスレッドがデータを追加、読み込み、削除を行なっても問題なく動作する。

キーを指定してデータの存在を確認し、データが無ければ追加するといった場合を考える。(ソースコード \ref{src:concurrentMap1})他のスレッドが同時に同じキーで異なるデータを書きこみを行おうとした場合、通常は排他処理ができていないため、不整合が発生する。しかし、 ConcurrentMap の putIfAbsent メソッドを用いれば、原子的に確認とデータ操作を行うため、その問題を解消することができる。(ソースコード \ref{src:concurrentMap2})

\begin{lstlisting}[label=src:concurrentMap1, caption=キーの確認とデータ操作が排他処理されていない]
if (!map.containsKey(key))
	return map.put(key, value);
else
	return map.get(key);
\end{lstlisting}

\begin{lstlisting}[label=src:concurrentMap2, caption=キーの確認とデータ操作が内部で原子的に実行される]
return map.putIfAbsent(key, value);
\end{lstlisting}

\subsubsection{java.util.concurrent.Executor}
Executor は受け取った Runnable タスクを実行するオブジェクトである。(ソースコード \ref{src:executor})
Executor で様々な種類のスレッドプールを作成することができる。

\begin{lstlisting}[label=src:executor, caption=スレッドプールに Runnable を投入]
executor.execute(runnable);
\end{lstlisting}

\subsubsection{java.util.concurrent.atomic.AtomicInteger}
AtomicInteger は、原子的なデータ更新が可能な int 値である。

例えば、データを取得してその後にインクリメントを行うという処理を考える。(ソースコード \ref{src:increment1})
synchronized ブロックで囲み、排他制御を行わないと、データの不整合を起こす。
そこで、 AtomicInteger の getAndIncrement()を用いることで、取得と変更の処理を原子的に行うことが可能になる。(ソースコード \ref{src:increment2})
 
\begin{lstlisting}[label=src:increment1, caption=synchronized を用いて排他制御を行う必要がある]
synchronized (i) {
    num = i++;
}
\end{lstlisting}

\begin{lstlisting}[label=src:increment2, caption=getAndIncrement() を用いると原子的に実行される。]
num = i.getAndIncrement();
\end{lstlisting}

この機能は、複数のスレッドで共有された ID を重複なく連番発行する処理等で用いられる。

このような並列処理において利便性が高く性能のよいライブラリが揃っているため、 Java を実装に用いるプログラミング言語に選定した。

\section{Data Segment の詳細な設計と実装}
まず、 Alice の実装を行うにあたって、データベース機能はもちろんのこと、タスク間通信やネットワーク間通信の要となる Data Segment の設計が重要である。本節では、 Data Segment の詳細な設計と実装を記す。
 
\subsection{Data Segment Manager}
大量の Data Segment を管理するのが Data Segment Manager である。
Data Segment Manager は、文字列のキーで Data Segment を整理する。
キーごとにデータを出し入れすることになる。
各キーごとに、キュー構造を持っている。(図 \ref{fig:datasegmentKey})
それらを Data Segment API を用いて操作する。

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=60mm]{./images/datasegment_key.pdf}
  \end{center}
  \caption{キーごとにデータがキュー構造で管理される}
  \label{fig:datasegmentKey}
\end{figure}

また、データの読み出し("peek" または "take")時に、希望のデータがなかった場合、ブロッキングを行う機能を持つ。
しかし、ブロッキングといってもそこで同期するわけではない。
非同期でデータを通信する。
そのため、 "peek" と "take" は他の API とは違い、レスポンスが発生する。(図 \ref{fig:datasegmentReceiver})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=80mm]{./images/datasegment_receiver.pdf}
  \end{center}
  \caption{"peek" や "take" に対して、レスポンスが発生する}
  \label{fig:datasegmentReceiver}
\end{figure}

\subsection{Data Segment API}
Data Segment Manager をユーザーが操作できるインタフェース、それが Data Segment API である。
まずは以下のとおり Data Segment API を詳細に定義した。

\begin{itemize}
\item {\ttfamily void put(String key, Value val)}
\item {\ttfamily void update(String key, Value val)}
\item {\ttfamily void peek(Receiver receiver, String key, int id)}
\item {\ttfamily void take(Receiver receiver, String key, int id)}
\end{itemize}

\subsubsection{"put"}
"put" はデータを追加するための API である。

"put" は受け取ったデータ val を Data Segment 内のキューに対してエンキューする。
この時、キーごとに重複しない連番の ID を受け取った順に振る。(図 \ref{fig:put})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=100mm]{./images/put.pdf}
  \end{center}
  \caption{"put" は重複しない ID を振りながらデータを追加する}
  \label{fig:put}
\end{figure}

\subsubsection{"update"}
"update" はデータを置き換えるための API である。

"update" はキューの先頭にあるデータをひとつだけ削除する。
その後は "put" と同じく、 受け取ったデータ val を Data Segment 内のキューに対してエンキューする。
この時、キーごとに重複しない連番の ID を受け取った順に振る。(図 \ref{fig:update})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=100mm]{./images/update.pdf}
  \end{center}
  \caption{"update" は先頭データを取り除き、重複しない ID を振りながらデータを追加する}
  \label{fig:update}
\end{figure}

\subsubsection{"peek"}
"peek" はデータを読み込むための API である。

"peek" は前回読み込んだデータの id を引数で指定する。省略した場合は、 0 が id として渡される。
id よりも値の大きい id のデータがキューに含まれていれば、そのデータを receiver に返す。
もし id 以下のデータしか無いならば、データの更新が前回の "peek" 発行時から更新が無いものと考え、リストに格納されて保留される。(図 \ref{fig:peek})

"take" や "update" によりデータの更新があれば、 "peek" が直ちに実行される。

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=90mm]{./images/peek.pdf}
  \end{center}
  \caption{"peek" はデータを receiver に読み込む。希望のデータがない場合は保留する}
  \label{fig:peek}
\end{figure}

\subsubsection{"take"}
"take" もデータを読み込むための API である。
基本的な id に関する部分は "peek" と同じである。

"peek" との決定的な違いは、読み込まれたデータは Data Segment 内のキューから取り除かれるということである。(図 \ref{fig:take})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=70mm]{./images/take.pdf}
  \end{center}
  \caption{"take" はデータを receiver に読み込む。その際、読み込んだデータは削除される}
  \label{fig:take}
\end{figure}

\subsection{コマンドを処理する流れ}
これらの API から発行されたコマンドを Data Segment Manager は複数のスレッドから受け取る。
その後、 ConcurrentHashMap で文字列であるキーから Data Segment を解決する。

キーが異なればデータセグメント間に依存関係は全く無いので、別スレッドでこれらの API を処理する事ができる。(図 \ref{fig:datasegmentKeyThread})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=60mm]{./images/datasegment_key_thread.pdf}
  \end{center}
  \caption{キーごとに Data Segment を処理する Thread を持つ}
  \label{fig:datasegmentKeyThread}
\end{figure}

Data Segment Manager から各キーの Thread へのコマンドの受け渡しには、\\
java.util.concurrent.LinkedBlockingQueue
が使われる。これをコマンドキューと呼ぶことにする。
各キーの Thread では、コマンドキューが空になるまでコマンドを繰り返し取り出す。
その取り出したコマンドに従って処理が行われる。
キューが空になったときは、次のコマンドが挿入されるまでブロックされる。

Data Segment Manager はユーザーが API を使うと、コマンドを作成し、キーから Data Segment を探し、そのコマンドキューに挿入する。(図 \ref{fig:datasegmentCommandQueue})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=100mm]{./images/datasegment_command_queue.pdf}
  \end{center}
  \caption{Data Segment Manager はコマンドを作成し、コマンドキューに渡す}
  \label{fig:datasegmentCommandQueue}
\end{figure}

\subsection{Data Segment のデータ表現}
Data Segment のデータ表現には MessagePack を利用する。

Java 版の MessagePack の実装 MessagePack for Java を利用して実装を行う。

MessagePack に関して Java におけるデータ表現は以下の3段階ある。
これらのデータ表現は、型の種類等の制限を伴うが互いに変換可能である。

\begin{enumerate}
\item {一般的な Java のクラスオブジェクト}
\item {MessagePack for Java の {\ttfamily Value} オブジェクト}
\item {{\ttfamily byte[]} で表現されたバイナリ}
\end{enumerate}

Data Segment API では、この MessagePack for Java の Value オブジェクトを用いてデータを表現する。
MessagePack は Java のように静的に型付けされたオブジェクトではなく、自己記述的なデータ形式である。
MessagePack for Java の Value オブジェクトは MessagePack のバイナリにシリアライズできる型のみで構成された Java のオブジェクトである。
そのため、 Value も自己記述式のデータ形式となっている。
例えば、 ArrayValue を用いれば、ユーザーがデータを後付けで繋げたりすることも可能になる。

この Value オブジェクトは通信に関わる時は、シリアライズ・デシリアライズを高速に行うことができ、ユーザーは便利なメソッドを用いてオブジェクト内部のデータを閲覧、編集できる。例えば、 Value の toString メソッドは、 JSON 形式で出力してくれる。(ソースコード \ref{src:msgpack2}) このように MessagePack Value は Java の静的な型付けを脱却しようとした形式である。

また、ユーザーは一般的なクラスを IDL (Interface Definition Language) のように用いてデータを表現してもよい。
そのように使う場合には、クラス宣言時に {\ttfamily @Message} というアノテーションを付けるだけでよい。(ソースコード \ref{src:msgpack1})もちろん、 MessagePack で扱うことができるデータのみをフィールドに入れなくてはならない。

\begin{lstlisting}[label=src:msgpack1, caption=一般的なクラスを IDL のように使用]
import org.msgpack.annotation.Message

@Message
public class MessagePackTest {
    public String key;
    public int val;
}
\end{lstlisting}

\begin{lstlisting}[label=src:msgpack2, caption=一般クラスオブジェクトから Value に逆変換して JSON 形式で出力]
msgpackTest = new MessagePackTest();
msgpackTest.key = "Test";
msgpackTest.val = 1000;

MessagePack msgpack = new MessagePack();
Value value = msgpack.unconvert(msgpackTest);

System.out.println(value); // ["Test",1000]
\end{lstlisting}

MessagePack for Java は内部で {\ttfamily @Message} アノテーションが付けられたクラスを変換する時に、 Javassist を用いて動的にテンプレートを生成してコンパイルしている。そのため高速に Value オブジェクトと一般クラスオブジェクトの変換、逆変換を行うことができる。

また、 MessagePack は Packer と Unpacker を用いることにより、次から次へとストリームからシーケンシャルにバイナリをシリアライズ・デシリアライズすることもできる。
そのため、通信を行うときの入出力部分のコード記述もシンプルになる。(ソースコード \ref{src:msgpack3})
従来のプロトコルでは、通信を行う際、パケットサイズ等を含んだ固定長のヘッダーを作成しなくてはいけなかったが、 MessagePack は自己記述的なデータ形式なので、先頭にデータの長さが含まれているため、プログラマーが管理する固定長ヘッダーは必要無くなる。

\begin{lstlisting}[label=src:msgpack3, caption=Unpacker を用いると通信入力部分の記述がシンプルになる]
MessagePack msgpack = new MessagePack();
Unpacker unpacker = msgpack.createUnpacker(socket.getInputStream());

while (true) {
    CommandMessage msg = unpacker.read(CommandMessage.class); // block
    // メッセージを受け取った後の処理を記述するだけでよい。
}
\end{lstlisting}

\subsection{Remote Data Segment Manager}
これまで説明してきた Data Segment Manager はローカルで動作する Data Segment Manager である。これからリモート接続版の Data Segment Manager へと拡張するにあたって区別のためにローカルで動作する Data Segment Manager を Local Data Segment Manager とする。これに対し、リモート接続したホスト上の Local Data Segment Manager の操作を行う機構を Remote Data Segment Manager とする。(図 \ref{fig:remoteDatasegment})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=80mm]{./images/remote_datasegment.pdf}
  \end{center}
  \caption{Remote Data Segment Manager は、他のマシン上の Local Data Segment Manager を操作できる}
  \label{fig:remoteDatasegment}
\end{figure}

ローカルでもリモートでも、 Data Segment Manager の API は共通なので、継承して実装する。(図 \ref{fig:datasegmentInheritance})そのことにより、 Remote Data Segment Manager にアクセスするときもローカルと同じようにアクセスすることができる。

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=70mm]{./images/datasegment_inheritance.pdf}
  \end{center}
  \caption{Data Segment Manager を継承して、 Local DSM と Remote DSM を作成する}
  \label{fig:datasegmentInheritance}
\end{figure}

\section{Code Segment}
前節では、 Data Segment の詳細な設計と実装を示した。
本節では、 それら Data Segment API を用いて記述する Code Segment の使い方やそれらの実装方法について示す。 

Code Segment は、タスクのことである。 Code Segment をユーザーが記述するときに、記述している Code Segment 内で使用する Data Segment を記述する。すなわち、 Code Segment と Data Segment の依存関係を記述する。

具体的に使用する Data Segment は、 Code Segment の入出力に相当する。それらを、それぞれ Input Data Segment、 Output Data Segment とする。 Input Data Segment、 Output Data Segment はそれぞれ複数記述できる。

\subsection{基本的な Code Segment の実行方法}
Code Segment の記述方法であるが、 Start Code Segment と呼ばれる、開始 Code Segment を記述するところから始まる。 Start Code Segment は、最初の Code Segment なので、どの Data Segment にも依存しない。すなわち、 Input Data Segment はない。

この Start Code Segment を main メソッド内で new することで、 Code Segment の実行を開始させることができる。 Input Data Segment がない Code Segment を実行するためには、 execute メソッドを実行する。(ソースコード \ref{src:codesegment1})

\begin{lstlisting}[label=src:codesegment1, caption=最初の Code Segment を実行させる方法]
public class TestLocalAlice {

	public static void main(String args[]) {
		new StartCodeSegment().execute();
	}
	
}
\end{lstlisting}

Start Code Segment は CodeSegment を継承して作成する。 StartCodeSegment の例(ソースコード \ref{src:codesegment2})では、次の Code Segment である、 TestCodeSegment (ソースコード \ref{src:codesegment3})を生成している。この TestCodeSegment は input1 という Input Data Segment を "peek" している。

\begin{lstlisting}[label=src:codesegment2, caption=Start Code Segment の例]
public class StartCodeSegment extends CodeSegment {

	@Override
	public void run() {
		System.out.println("run StartCodeSegment");
		
		TestCodeSegment cs = new TestCodeSegment();
		cs.input1.setKey("local", "key1");

		System.out.println("create TestCodeSegment");
		
		ods.update("local", "key1", "String data");
 	}

}
\end{lstlisting}

\begin{lstlisting}[label=src:codesegment3, caption=Code Segment の例]
public class TestCodeSegment extends CodeSegment {
	
	// create input datasegment arg1
	Receiver input1 = ids.create(CommandType.PEEK);
	
	@Override
	public void run() {
		System.out.println("index = " + input1.index);
		System.out.println("data = " + input1.val);
		
		if (input1.index == 10)
			System.exit(0);
		
		TestCodeSegment cs = new TestCodeSegment();
		cs.input1.setKey("local", "key1", input1.index);
		
		ods.update("local", "key1", "String data");
	}

}
\end{lstlisting}

この input1 に対して、具体的な Data Segment のキーを割り当てているのが、 setKey メソッドである。この例では、 TestCodeSegment の input1 が local.key1 に依存するように設定している。

TestCodeSegment の内部では、情報を標準出力に出力した後、再度、自信と同じである TestCodeSegment を生成し、先ほどと同じようにキーを割り当てている。 input1.index の値は、 Data Segment 内部で "put" や "update" が実行されたときに、インクリメントしているシーケンシャルでユニークな ID 値である。

このように、今回の例題では、10回ほど TestCodeSegment をループしていることがわかる。(図 \ref{fig:startcodesegment})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=70mm]{./images/startcodesegment.pdf}
  \end{center}
  \caption{StartCodeSegment 以降の実行過程}
  \label{fig:startcodesegment}
\end{figure}

ここでは、 Code Segment の大まかな利用方法を示した。次は、ここで利用している API の詳細である。

\subsection{Code Segment の API}
新たな Code Segment をユーザーが記述するときは、 CodeSegment を継承して記述する。 
その CodeSegment は、 InputDataSegmentManager と OutputDataSegmentManager を利用する事ができる。

\subsubsection{InputDataSegmentManager}
InputDataSegmentManager は、 ids という、 CodeSegment のフィールドを使うことでアクセスすることができる。

\begin{itemize}
\item {\ttfamily Receiver create(CommandType type)}
\end{itemize}

InputDataSegmentManager の基本機能は、 create である。
create で新しい Data Segment の Receiver を生成することができる。
create の引数は、 CommandType である。ここで指定できる CommandType は PEEK か TAKE である。
ここで定義しているのは、 "peek" や "take" を実行した結果を格納する受け皿であるので、実際に "peek" や "take" を実行しているわけではない。実際にコマンドが実行されるのは、 Receiver 内にある setKey メソッドである。

\begin{itemize}
\item {\ttfamily void setKey(String managerKey, String key, int id)}
\end{itemize}

setKey メソッドにより、どこの Data Segment のなんというキーを実行するかということを指定する。それにより、 "peek" や "take" のすべての引数が揃い、実行される。
そして、そのコマンドの結果がレスポンスとして Receiver に届き次第、 Code Segment の実行が行われる。(図 \ref{fig:datasegmentreceiver})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=140mm]{./images/datasegmentreceiver.pdf}
  \end{center}
  \caption{Data Segment の依存関係の設定と解決と Code Segment が実行されるまで}
  \label{fig:datasegmentreceiver}
\end{figure}