view paper/chapter3.tex @ 15:af43166c9c70

add put api
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 07 Feb 2012 13:53:30 +0900
parents a4b219c84f8a
children 5682c4b7bd8d
line wrap: on
line source

\chapter{分散フレームワーク Alice の実装}

前章では、分散ネットワークアプリケーションフレームワーク Alice の設計について示した。
本章では、それらの設計を踏まえ、 Java でどのように実装を行うのがよいかということから始め、具体的にどのような実装を行ったかについて、詳細に記すことにする。

\section{Java}

Federated Linda は Java を用いて実装が行われていたが、シングルスレッドを用いた設計であったがために、並列処理が含まれていなかった。今回の実装を行う前に並列処理の観点から、 Java について改めて見直し、フレームワークの実装方法について考えることにした。

\subsection{Java の選定}
Alice の実装に Java を選んだ理由は、充実したライブラリ群の存在である。
java.io、 java.nio 等の入出力ライブラリを始め、 java.net 等のネットワークライブラリも揃っている。また、並列処理ライブラリである java.util.concurrent が非常に強力である。ここでは java.util.concurrent の非常に便利な部品を挙げる。

\subsubsection{java.util.concurrent.BlockingQueue}
BlockingQueue は、並列で使用できるキュー構造である。複数のスレッドがデータの追加、取り出しを行なっても、問題なく動作する。
また、キュー内にデータがない場合、取り出しを行うときにブロッキングが入る。(ソースコード \ref{src:blockingQueue})そのため、データが来るまで待つという処理を記述することができる。

\begin{lstlisting}[label=src:blockingQueue, caption=キューにデータがない場合、処理がブロックされる]
// 別スレッドで blockingQueue.put(value); されるまでブロック
blockingQueue.take();
\end{lstlisting}

そういった機能を持つ故に BlockingQueue は異なるスレッド間で通信を行うのにしばしば用いられる。例えば、 SEDA アーキテクチャで作られた Cassandra の実装においても、ステージスレッド間でパイプライン処理をする際、データの受け渡しに使用されている。

\subsubsection{java.util.concurrent.ConcurrentMap}
ConcurrentMap は並列で使用できるハッシュ構造である。複数のスレッドがデータを追加、読み込み、削除を行なっても問題なく動作する。

キーを指定してデータの存在を確認し、データが無ければ追加するといった場合を考える。(ソースコード \ref{src:concurrentMap1})他のスレッドが同時に同じキーで異なるデータを書きこみを行おうとした場合、通常は排他処理ができていないため、不整合が発生する。しかし、 ConcurrentMap の putIfAbsent メソッドを用いれば、原子的に確認とデータ操作を行うため、その問題を解消することができる。(ソースコード \ref{src:concurrentMap2})

\begin{lstlisting}[label=src:concurrentMap1, caption=キーの確認とデータ操作が排他処理されていない]
if (!map.containsKey(key))
	return map.put(key, value);
else
	return map.get(key);
\end{lstlisting}

\begin{lstlisting}[label=src:concurrentMap2, caption=キーの確認とデータ操作が内部で原子的に実行される]
return map.putIfAbsent(key, value);
\end{lstlisting}

\subsubsection{java.util.concurrent.Executor}
Executor は受け取った Runnable タスクを実行するオブジェクトである。(ソースコード \ref{src:executor})
Executor で様々な種類のスレッドプールを作成することができる。

\begin{lstlisting}[label=src:executor, caption=スレッドプールに Runnable を投入]
executor.execute(runnable);
\end{lstlisting}

\subsubsection{java.util.concurrent.atomic.AtomicInteger}
AtomicInteger は、原子的なデータ更新が可能な int 値である。

例えば、データを取得してその後にインクリメントを行うという処理を考える。(ソースコード \ref{src:increment1})
synchronized ブロックで囲み、排他制御を行わないと、データの不整合を起こす。
そこで、 AtomicInteger の getAndIncrement()を用いることで、取得と変更の処理を原子的に行うことが可能になる。(ソースコード \ref{src:increment2})
 
\begin{lstlisting}[label=src:increment1, caption=synchronized を用いて排他制御を行う必要がある]
synchronized (i) {
    num = i++;
}
\end{lstlisting}

\begin{lstlisting}[label=src:increment2, caption=getAndIncrement() を用いると原子的に実行される。]
num = i.getAndIncrement();
\end{lstlisting}

この機能は、複数のスレッドで共有された ID を重複なく連番発行する処理などで用いられる。

このような並列処理において利便性が高く性能のよいライブラリが揃っているため、 Java を実装に用いるプログラミング言語に選定した。

\section{Data Segment の詳細な設計と実装}
まず、 Alice の実装を行うにあたって、データベース機能はもちろんのこと、タスク間通信やネットワーク間通信の要となる Data Segment の設計が重要である。本節では、 Data Segment の詳細な設計と実装を記す。
 
\subsection{Data Segment Manager}
大量の Data Segment を管理するのが Data Segment Manager である。
Data Segment Manager は、文字列のキーで Data Segment を整理する。
キーごとにデータを出し入れすることになる。
各キーごとに、キュー構造を持っている。(図 \ref{fig:datasegmentKey})
それらを Data Segment API を用いて操作する。

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=60mm]{./images/datasegment_key.pdf}
  \end{center}
  \caption{キーごとにデータがキュー構造で管理される}
  \label{fig:datasegmentKey}
\end{figure}

また、データの読み出し("peek" または "take")時に、希望のデータがなかった場合、ブロッキングを行う機能を持つ。
しかし、ブロッキングといってもそこで同期するわけではない。
非同期でデータを通信する。
そのため、 "peek" と "take" は他の API とは違い、レスポンスが発生する。(図 \ref{fig:datasegmentReceiver})

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=80mm]{./images/datasegment_receiver.pdf}
  \end{center}
  \caption{"peek" や "take" に対して、レスポンスが発生する}
  \label{fig:datasegmentReceiver}
\end{figure}

\subsection{Data Segment API}
Data Segment Manager をユーザーが操作できる API、それが Data Segment API である。
まずは以下のとおり Data Segment API を詳細に定義した。

\begin{itemize}
\item {\ttfamily void put(String key, Value val)}
\item {\ttfamily void update(String key, Value val)}
\item {\ttfamily void peek(Receiver receiver, String key, int id)}
\item {\ttfamily void take(Receiver receiver, String key, int id)}
\end{itemize}

\subsubsection{"put"}

\begin{figure}[htbp]
  \begin{center}
    \includegraphics[width=100mm]{./images/put.pdf}
  \end{center}
  \caption{"put" は重複しない id を振りながらデータを追加する}
  \label{fig:put}
\end{figure}


\subsubsection{"update"}

\subsubsection{"take"}

\subsubsection{"peek"}




\subsection{コマンドを処理する流れ}
これらの API から発行されたコマンドを Data Segment Manager は複数のスレッドから受け取る。
その後、 ConcurrentHashMap で文字列である key から Data Segment を解決する。