changeset 19:0243987383b7

Meta Protocol Engine and sample implementation of event logger. ComDebug_Client needs fixes.
author kono
date Tue, 19 Aug 2008 05:33:32 +0900
parents 2cbd98257d61
children a0fd653d1121
files src/fdl/AcceptHandler.java src/fdl/ComDebug_Client.java src/fdl/CommDebugHook.java src/fdl/FDLindaServ.java src/fdl/FederatedLinda.java src/fdl/IOHandler.java src/fdl/IOHandlerHook.java src/fdl/MetaEngine.java src/fdl/MetaLinda.java src/fdl/MetaReply.java src/fdl/NullIOHandlerHook.java src/fdl/PSX.java src/fdl/PSXLinda.java src/fdl/PSXLindaInterface.java src/fdl/TestPSXLinda.java src/fdl/TupleSpace.java
diffstat 16 files changed, 587 insertions(+), 429 deletions(-) [+]
line wrap: on
line diff
--- a/src/fdl/AcceptHandler.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/AcceptHandler.java	Tue Aug 19 05:33:32 2008 +0900
@@ -1,73 +1,37 @@
 
 package fdl;
 import java.io.IOException;
-//import java.net.InetAddress;
-//import java.net.InetSocketAddress;
-//import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 
 public class AcceptHandler implements TupleHandler {
-	public Tuple[] tuple_space;
+	
+	public TupleSpace tupleSpace;
 	
-	public static int user = 0;
-	public byte userchar[] = new byte[2];
-	public final int MAX_TUPLE = TupleHandler.MAX_TUPLE;
-	public Tuple tmpTuple;
-	
-	public AcceptHandler(Tuple[] _tuple_space) {
+	public AcceptHandler(TupleSpace tupleSpace) {
     	// 読みこんだデータを格納するためのリストの初期化
-        tuple_space = _tuple_space;        
+        this.tupleSpace = tupleSpace;        
     }
 	
 	public void handle(SelectionKey key) 
                 throws ClosedChannelException, IOException {
         ServerSocketChannel serverChannel
             = (ServerSocketChannel)key.channel();
-        
+
         // アクセプト処理
         SocketChannel channel = serverChannel.accept();
         channel.configureBlocking(false);
         System.out.println("Server: accepted "+channel.socket());
 
-        //初期生成        
-        if((tmpTuple = tuple_space[MAX_TUPLE-1]) == null) {
-        	tmpTuple = tuple_space[MAX_TUPLE-1] = new Tuple();
-        	tmpTuple.next = null;
-        } else {
-        	while(tmpTuple.next != null) tmpTuple = tmpTuple.next;
-        	tmpTuple.next = new Tuple();
-        	tmpTuple = tmpTuple.next;
-        	tmpTuple.next = null;
-        }
-        
-        user++;
-        
-        ByteBuffer data = ByteBuffer.allocate(2);
-        data.clear();
-        userchar[0] = (byte) (user/10 + '0');
-        userchar[1] = (byte) (user%10 + '0');
-
-        data.put(userchar[0]);
-        data.put(userchar[1]);
-        
-        data.rewind();
-        tmpTuple.setData(data);
-        //Tuple
-        int id = MAX_TUPLE-1;
-        tmpTuple.setTuple('o', id, 0, data.limit(), data);
-
-
-        System.out.println("Server: assign id "+user);
+        tupleSpace.newUser();
 
         // 入出力用のハンドラを生成し,アタッチする
         // 監視する操作は読み込みのみ        
-        TupleSpace handler = new IOHandler(tuple_space);
         channel.register(key.selector(),
                          SelectionKey.OP_READ,
-                         handler);
+                         new IOHandler(tupleSpace,key));
 	}
+
 }
--- a/src/fdl/ComDebug_Client.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/ComDebug_Client.java	Tue Aug 19 05:33:32 2008 +0900
@@ -2,11 +2,11 @@
 
 
 import java.io.IOException;
-//import java.nio.ByteBuffer;
-//import java.nio.CharBuffer;
-import java.nio.CharBuffer;
 
-
+/*
+ * それぞれのLinda Serverのmeta protocolに接続して、順々にモニタデータを
+ * 受け取って表示する。
+ */
 public class ComDebug_Client {
 
 	static int id;
@@ -14,7 +14,7 @@
 
 	public static void main(String[] args) {
 		FederatedLinda fdl;
-		PSXLinda psx;
+		PSXLindaInterface psx;
 		String host  = "localhost";
 		int port = 10000;
 		int connect_num = 1;
@@ -59,7 +59,7 @@
 		    psx.in(PSX.PRIVILEGED_ID_START+connect_num);
 		    connect_num++;
 		    while(true) {
-		    	fdl.sync_com(1000);
+		    	fdl.sync(1000);
 		    }
 		} catch (IOException nfex) {
 			nfex.printStackTrace();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/CommDebugHook.java	Tue Aug 19 05:33:32 2008 +0900
@@ -0,0 +1,72 @@
+package fdl;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.LinkedList;
+
+public class CommDebugHook implements IOHandlerHook {
+
+	public LinkedList<String> logs = new LinkedList<String>();
+
+	public void closeHook(SelectionKey key) {
+		if (key!=null) logs.add(closeLog(key));
+	}
+
+
+	@Override
+	public void checkHook(SelectionKey key, int id, int seq, char mode) {
+		if (key!=null) logs.add(log(key, id, seq, mode, null));
+	}
+
+
+	@Override
+	public void inHook(SelectionKey key, int id, int seq, char mode) {
+		if (key!=null) logs.add(log(key, id, seq, mode, null));
+	}
+
+
+	@Override
+	public void outHook(SelectionKey key, int id, int seq, char mode, ByteBuffer data) {
+		if (key==null) return;
+		String sendtext = PSX.getdataString(data);
+		logs.add(log(key, id, seq, mode, sendtext));
+	}
+
+
+	@Override
+	public void waitReadHook(SelectionKey key, int id, int seq, char mode) {
+		if (key!=null) logs.add(log(key, id, seq, mode, null));
+	}
+
+	public String log(SelectionKey key,int id, int seq,char mode, String sendtext) {
+		//通信ログ Hostname:port 'mode' =number  形式でインクリメント
+		int cnt = 0;
+		if (sendtext==null) sendtext="none";
+
+		IOHandler io = (IOHandler) key.attachment();
+		String ComKey = io.localString + "--" + io.remoteString + " " + mode;
+		
+		io.cnt++;
+		long time = System.currentTimeMillis();
+		return (time+" "+ComKey+"="+cnt+" id="+id+" seq="+seq+" data="+sendtext);
+	}
+
+	private String closeLog(SelectionKey key) {
+		String close_Channel = key.channel().toString();
+		String[] split = close_Channel.split("/");
+		String local[] = split[1].split(" ");
+		String remote[] = split[2].split("]");
+
+		String localAddress = local[0];
+		String remoteAddress = remote[0];
+
+		return  "CloseInfo >"+localAddress+"--"+remoteAddress;
+	}
+	
+	public ByteBuffer getLog() {
+		String log = logs.poll();
+		if (log==null) return ByteBuffer.allocate(0);
+		return PSX.string2ByteBuffer(log);
+	}
+
+}
--- a/src/fdl/FDLindaServ.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/FDLindaServ.java	Tue Aug 19 05:33:32 2008 +0900
@@ -4,10 +4,8 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-//import java.nio.ByteOrder;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.spi.AbstractSelector;
 import java.nio.channels.spi.SelectorProvider;
@@ -15,11 +13,8 @@
 public class FDLindaServ  {
 	static final int MAX_REQ = 1;
 	static final int FAIL = (-1);
-	static final int MAX_UAER = 4;
-	static final int MAX_TUPLE = 65536;
 	static final int DEF_PORT = 10000;
 	//public static final int TIMEOUT = 5*1000;
-	public Tuple[] tuple_space;
 	public int port = DEF_PORT;
 	private AbstractSelector selector;
 	private ServerSocketChannel ssChannel;
@@ -52,32 +47,29 @@
 	
 	private void mainLoop() {
 		while(true) {
-			checkTuple(selector);
+			checkTuple();
 		}
 	}
 
 	public FDLindaServ(int port) throws IOException {
 		this.port = port;
-		tuple_space = new Tuple[MAX_TUPLE];
-        	//セレクタを生成
-    		selector = SelectorProvider.provider().openSelector();		
-        	//ソケット・チャネルを生成・設定
-        	ssChannel = SelectorProvider.provider().openServerSocketChannel();
-        	InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
-        	//ssChannel.socket().bind(new InetSocketAddress(port));
-        	ssChannel.socket().bind(address);
-        	ssChannel.configureBlocking(false);
-        	//ssChannel.socket().setReuseAddress(true);
-        	System.out.println("Server: litening at "+ssChannel);
-        	//セレクタにチャンネルを登録
-        	//ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space));
-            //ssChannel.register(selector, ssChannel.validOps(), new AcceptHandler(tuple_space));
-            ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tuple_space));
- 
+		//セレクタを生成
+		selector = SelectorProvider.provider().openSelector();		
+		//ソケット・チャネルを生成・設定
+		ssChannel = SelectorProvider.provider().openServerSocketChannel();
+		InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
+		ssChannel.socket().bind(address);
+		ssChannel.configureBlocking(false);
+		//ssChannel.socket().setReuseAddress(true);
+		System.out.println("Server: litening at "+ssChannel);
+		//セレクタにチャンネルを登録
+        TupleSpace tupleSpace = new TupleSpace();
+		ssChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler(tupleSpace));
+
 
 	}
 
-	public void checkTuple(Selector selector) {
+	public void checkTuple() {
 		// セレクタによる監視    
 		try {
 			if (selector.select()>0) {
@@ -91,4 +83,19 @@
 		} catch (IOException e) {
 		}
 	}
+	
+	public void checkTuple(long timeout) {
+		// セレクタによる監視    
+		try {
+			if (selector.select(timeout)>0) {
+				for(SelectionKey s:selector.selectedKeys()) {
+					TupleHandler handler = (TupleHandler)s.attachment();
+					handler.handle(s);
+				}
+			}
+		} catch (ClosedChannelException e) {
+			// we have to do something...
+		} catch (IOException e) {
+		}
+	}
 }
--- a/src/fdl/FederatedLinda.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/FederatedLinda.java	Tue Aug 19 05:33:32 2008 +0900
@@ -15,7 +15,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.CharBuffer;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -46,7 +45,7 @@
 	public int tid;
 	public int seq;
 	public int qsize;
-	public PSXLinda linda;
+	public PSXLindaInterface linda;
 
 	public Selector selector;
 
@@ -68,12 +67,12 @@
 		seqHash =  new Hashtable<Integer, PSXReply>();
 	}
 
-	public PSXLinda open(String _host,int _port) 
+	public PSXLindaInterface open(String _host,int _port) 
 	throws IOException {
 		tid++;
 		// System.out.print("Tid = ");
 		// System.out.println(tid);
-		PSXLinda newlinda = new PSXLinda(this,tid,_host,_port);
+		PSXLindaInterface newlinda = new PSXLinda(this,tid,_host,_port);
 		linda = newlinda.add(linda);
 		return linda;
 	}
@@ -173,118 +172,6 @@
 		return key_num;
 	}
 
-	public int sync_com(long mtimeout) 
-	throws IOException {
-		int key_num = 0;
-		Set<SelectionKey> keys;
-
-		while (q_top != null){
-			PSXQueue c = q_top;
-			c.Send();
-			q_top = c.next;
-			// psx_free(c);
-			// q_top = c = t;
-			qsize--;
-		}
-
-		try {
-			key_num = selector.select(mtimeout);
-			keys = selector.selectedKeys();
-			for (SelectionKey key : keys) {
-				SocketChannel sock = (SocketChannel)key.channel();
-				chkCom(sock);
-			}
-		} catch (IOException e) {
-			e.printStackTrace();
-		} catch (ClosedSelectorException e) {
-			e.printStackTrace();
-		}
-
-		return key_num;
-	}
-
-//	should be in PSXLinda, but sock->linda is unknown here
-
-	private void chkCom(SocketChannel sock)  throws IOException {
-
-		int length;
-		ByteBuffer command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
-		command.order(ByteOrder.BIG_ENDIAN);
-		debug = false;
-
-		sock.read(command);
-		command.rewind();
-		length =  command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
-		if (length>0) {
-			ByteBuffer data = ByteBuffer.allocate(length);
-			int read = length;
-			if (debug) {
-				System.out.print("reading:");
-				System.out.println(length);
-			}
-
-			data.order(ByteOrder.BIG_ENDIAN);
-			while(read>0) {
-				read -= sock.read(data);
-			}
-			data.rewind();
-
-			if (debug) {
-				PSX.printCommand(command, data);
-			}
-			//if (debug_com) {
-			String comdata ="";
-			CharBuffer chardata = data.asCharBuffer();
-			comdata = chardata.toString();
-
-			//System.out.println("Com_data =>");
-			System.out.println(comdata);
-			//}
-			/***if (debug) {
-    	    	System.out.print("header:");
-    	    	for(int i=0;i<LINDA_HEADER_SIZE;i++) {
-    	    		System.out.println(command.get(i));
-    	    	}
-    	    	System.out.print("data:");
-    	    	for(int i=0;i<length;i++) {
-    	    		System.out.println(data.get(i));
-    	    	}
-    	    	data.rewind();
-    	    }***/
-
-			int rseq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			int mode = command.get(PSX.LINDA_MODE_OFFSET);
-			Integer a;
-			/***
-    	    if (debug) {
-    	    	System.out.print("mode = ");
-    	    	System.out.println(mode);
-    	    	System.out.print("seq = ");
-    	    	System.out.println(rseq);
-    	    }***/
-			try {
-				PSXReply r = seqHash.get((a = new Integer(rseq)));
-				seqHash.put(a, null);
-				if (debug) {
-					System.out.print("hash value = ");
-					System.out.println(a.hashCode());
-				}
-
-				r.setAnswer(mode,command,data);
-
-				if (r.callback != null ) {
-					r.callback.callback(data);
-				}
-			} catch (NullPointerException e ) {
-				if (debug) {
-					System.out.println("hashed reply not found");
-				}
-				// can't happen
-				return ;
-			}
-		}
-	}
-
 	private void chkServe(SocketChannel sock) 
 	throws IOException {
 		int length;
@@ -335,7 +222,7 @@
 	    }***/
 			try {
 				PSXReply r = seqHash.get((a = new Integer(rseq)));
-				seqHash.put(a, null); // should be clear or delete
+				seqHash.remove(a);
 				if (debug) {
 					System.out.print("hash value = ");
 					System.out.println(a.hashCode());
--- a/src/fdl/IOHandler.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/IOHandler.java	Tue Aug 19 05:33:32 2008 +0900
@@ -1,33 +1,45 @@
 
 package fdl;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.Hashtable;
-import java.util.LinkedList;
 
-
-public class IOHandler extends TupleSpace {
+public class IOHandler {
     static final boolean debug = false;
-	public Tuple[] tuple_space;     
-	public ComDebug com_debug;
+    public TupleSpace tupleSpace;
 
-    public IOHandler(Tuple[] _tuple_space) {
-		super(_tuple_space);
-	}
+	String remoteString;
+	String localString;
+	public int cnt = 0;
     
-    public void handle(SelectionKey key)
-                    throws ClosedChannelException, IOException {        
+    public IOHandler(TupleSpace tupleSpace,SelectionKey key) {
+    	this.tupleSpace = tupleSpace;
+
+		SocketChannel ch = (SocketChannel) key.channel();
+		remoteString = getRemoteHostAndPort(ch);
+		localString =  getLocalHostAndPort(ch);
+    }
+
+    public void handle(SelectionKey key) {
         // 書き込み可であれば,読み込みを行う
         if (key.isReadable()) {
-            read(key);
+            try {
+				read(key);
+			} catch (ClosedChannelException e) {
+				tupleSpace.hook.closeHook(key);
+			} catch (IOException e) {
+				tupleSpace.hook.closeHook(key);
+			}
         }
     }
 
-    private void read(SelectionKey key)
+     void read(SelectionKey key)
                     throws ClosedChannelException, IOException {
         SocketChannel channel = (SocketChannel)key.channel();
 
@@ -45,18 +57,12 @@
         		System.out.print("reading command...");
         	}
         	count = channel.read(command);
-
-        	if(count < 0) {        		
-        		LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist;
-        		ClosewishComDebug(key, command, reportCh_list);
-            	readsize = -1;
-            	return;
-            }        	
+        	if(count < 0) throw new IOException();        		
         	readsize -= count;
     	}
         command.rewind();
                 
-        int len = command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
+        command.getInt(PSX.LINDA_PACKET_LENGTH_OFFSET);
         int datalen = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
         
         ByteBuffer data = ByteBuffer.allocate(datalen);
@@ -79,82 +85,81 @@
 
         if (debug) {
         	PSX.printData(command);
-        }
-    	manager_run(key, command, data, len);    	    	
-        
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ );
+        }	
+        // I believe we don't need this
+        //key.interestOps(key.interestOps() | SelectionKey.OP_READ );
+        assert((key.interestOps()& SelectionKey.OP_READ) !=0);
     }
 
 	public void manager_run(SelectionKey key, ByteBuffer command, ByteBuffer data, int len) throws IOException {
     	command.order(ByteOrder.BIG_ENDIAN);
 	    int mode = command.get(PSX.LINDA_MODE_OFFSET);
-	    char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
-	    int id = (int)idc;
-		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
 	    command.rewind();
-	    String sendtext = "none";
-	    
-	    
-	    com_debug = new ComDebug();
-	    Hashtable<String, Integer> com_Loggingtable = ComDebug.Com_Hashtable;
-	    LinkedList<SocketChannel> reportCh_list = ComDebug.Report_Channellist;
-		
+
     	if (debug) {
         	System.out.println("data from : "+key.channel());
     	}
 		if((mode == '!') || (len == 0)) {
-    		ClosewishComDebug(key, command, reportCh_list);
-		}
-		else if(mode == PSX.PSX_CHECK) {
-            sendtext = Check(key, command);
+			tupleSpace.hook.closeHook(key); 
+		} else if(mode == PSX.PSX_CHECK) {
+            tupleSpace.Check(key, command);
 	    }
     	else if(mode == PSX.PSX_IN || mode == PSX.PSX_RD){
-    		sendtext = In_Rd(key, command, mode);
+    		tupleSpace.In_Rd(key, command, mode); 
     	} else if (mode == PSX.PSX_WAIT_RD) {	
-			Wait_Rd(key, command, mode);						
+			tupleSpace.Wait_Rd(key, command, mode); 			
     	} else if(mode == PSX.PSX_OUT) {
-	    	sendtext = Out(command, data);	    	
+	    	tupleSpace.Out(key, command, data);		
 	    } else {
-    		System.out.println("Uncorrect buffer");
+	    	tupleSpace.hook.closeHook(key);
+    		System.out.println("Incorrect tuple operation");
     		System.exit(1);
 	    }
-		
-		//COM_DEBUG
-	   	if(id > PSX.PRIVILEGED_ID_START && id < PSX.PRIVILEGED_ID_END){
-			ComDebug.addChannel(key, reportCh_list);
-		}
-		//DEBUG用カウンタ
-        String debug_rep = ComDebug.Com_inc(key, com_Loggingtable, mode, id, seq, sendtext);
- 		//DEBUG用レポート
-		ComDebug.Report(reportCh_list, command, debug_rep);
-        
-		if (key.interestOps()
-	            != (SelectionKey.OP_READ)) {
-	            // 読み込み操作に対する監視を行う
-	            key.interestOps(SelectionKey.OP_READ );
-		}
     	
     }
 
-	private void ClosewishComDebug(SelectionKey key, ByteBuffer command,
-			LinkedList<SocketChannel> reportCh_list) throws IOException {
-		String close_Channel = key.channel().toString();
-		String[] split = close_Channel.split("/");
-		String local[] = split[1].split(" ");
-		String remote[] = split[2].split("]");
-		
-		String localAddress = local[0];
-		String remoteAddress = remote[0];
-		
-		Connection_Close(key);
-		com_debug.reportCh_remove(key, reportCh_list);
-		ComDebug.Report(reportCh_list, command, "CloseInfo >"+localAddress+"--"+remoteAddress);
-	}
-
-	private void Connection_Close(SelectionKey key) throws IOException {
+	void Connection_Close(SelectionKey key) throws IOException {
 		System.out.println("Connection closed by "+key.channel());
 		SocketChannel channel = (SocketChannel)key.channel();
 		key.cancel();
 		channel.close();
 	}
+
+	private static String getRemoteHostAndPort(SocketChannel channel) {
+		String socketString = channel.socket().getRemoteSocketAddress().toString();
+		String[] split = socketString.split("/");
+		int length = split.length;
+		String hostAndPort = split[length-1];
+		split = hostAndPort.split(":");
+		String host = split[0];
+		String port = split[1];
+		int portnum = Integer.parseInt(port);
+		try {
+			InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), portnum);
+			host = address.getHostName().toString();
+            return (host +":"+port);
+        }
+        catch( UnknownHostException e ){
+        	return hostAndPort;
+        }		
+	}
+	
+	private static String getLocalHostAndPort(SocketChannel channel) {
+		String socketString = channel.socket().getLocalSocketAddress().toString();
+		String[] split = socketString.split("/");
+		int length = split.length;
+		String hostAndPort = split[length-1];
+		split = hostAndPort.split(":");
+		String host = split[0];
+		String port = split[1];
+		try {
+            InetAddress localhost = InetAddress.getLocalHost();
+            host = localhost.getHostName();            
+            return (host +":"+port);
+        }
+        catch( UnknownHostException e ){
+        	return (host +":"+port);
+        }
+	}
+	
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/IOHandlerHook.java	Tue Aug 19 05:33:32 2008 +0900
@@ -0,0 +1,16 @@
+package fdl;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+
+public interface IOHandlerHook {
+	void closeHook(SelectionKey key);
+
+	void inHook(SelectionKey key, int id, int seq, char mode);
+
+	void outHook(SelectionKey key, int id, int seq, char mode, ByteBuffer data);
+
+	void waitReadHook(SelectionKey key, int id, int seq, char mode);
+
+	void checkHook(SelectionKey key, int id, int seq, char mode);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/MetaEngine.java	Tue Aug 19 05:33:32 2008 +0900
@@ -0,0 +1,35 @@
+package fdl;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author kono
+ *    Meta Protocol Engine for each Linda Server
+ */
+
+public class MetaEngine {
+	MetaLinda meta;
+	boolean running = true;
+	CommDebugHook commDebug;
+
+	PSXCallback monitor_callback_start =
+			new PSXCallback() {public void callback(ByteBuffer reply) { 
+				meta.ts.hook = commDebug = new CommDebugHook();
+				ByteBuffer data = ByteBuffer.allocate(0) ;
+				meta.out(PSX.META_MONITOR_DATA, data, 0);
+				meta.in(PSX.META_MONITOR,monitor_callback);
+	}};
+	PSXCallback monitor_callback =
+		new PSXCallback() {public void callback(ByteBuffer reply) { 
+			ByteBuffer data = commDebug.getLog();
+			meta.out(PSX.META_MONITOR_DATA, data, data.limit());
+			meta.in(PSX.META_MONITOR,monitor_callback);
+		}};
+	
+	void mainLoop() {
+		meta.in(PSX.META_MONITOR,monitor_callback_start);
+		meta.in(PSX.META_STOP, new PSXCallback() {public void callback(ByteBuffer reply) { running = false;}});
+		while(running) 
+			meta.sync();
+	}
+}
--- a/src/fdl/MetaLinda.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/MetaLinda.java	Tue Aug 19 05:33:32 2008 +0900
@@ -12,10 +12,12 @@
 
 package fdl;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 
 /**
- PSXLinda 
+ MetaLinda 
  *
  * @author Shinji Kono
  *
@@ -26,11 +28,15 @@
 
 public class MetaLinda implements PSXLindaInterface {
 
-
-	public PSXLinda next;
+	public TupleSpace ts;
+	public FDLindaServ fds;
+	public FederatedLinda fdl;
+	public PSXLindaInterface next;
+	public LinkedList<MetaReply> replies;
 
-	public MetaLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) {
-
+	public MetaLinda(TupleSpace ts,FDLindaServ fdl) {
+		this.ts = ts;
+		this.fds = fdl;
 	}
 
 	public PSXReply in(int id) {
@@ -38,47 +44,69 @@
 	}
 
 	public void in(int id, PSXCallback callback) {
+		replies.add(new MetaReply(PSX.PSX_IN,id,ts, callback));
 	}
 
 	public PSXReply ck(int id) {
-		PSXReply r = null;
+		MetaReply r = new MetaReply(PSX.PSX_CHECK,id,ts);
 		return r;
 	}
 
 	public void ck(int id, PSXCallback callback) {
+		replies.add(new MetaReply(PSX.PSX_CHECK,id,ts,callback));
 	}
 
 	public PSXReply out(int id, ByteBuffer data,int size) {
-		return null;
+		MetaReply r = new MetaReply(PSX.PSX_OUT,id,ts,data,null);
+		replies.add(r);
+		return r;
 	}
 
 	public PSXReply update(int id, ByteBuffer data,int size) {
-		return null;
+		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,null);
+		return r;
 	}
 
 	public void update(int id, ByteBuffer data,int size,PSXCallback callback) {
+		MetaReply r = new MetaReply(PSX.PSX_UPDATE,id,ts,data,callback);
+		replies.add(r);
 	}
 
 	public PSXReply rd(int id) {
-		return null;
+		MetaReply r = new MetaReply(PSX.PSX_RD,id,ts);
+		return r;
 	}
 
 	public void rd(int id, PSXCallback callback) {
+		replies.add(new MetaReply(PSX.PSX_RD,id,ts,callback));
 	}
 
-	public PSXLinda add(PSXLinda linda) {
+	public PSXLindaInterface add(PSXLindaInterface linda) {
 		next = linda;
-		return null;
+		return this;
 	}
 
 	public void send(ByteBuffer command,ByteBuffer data) {
 	}
 
 	public int sync() {
-		return 0;
+		return sync(0);
 	}
 
-	public int sync(long mtime) { 
+	public int sync(long timeout) {
+		fds.checkTuple(timeout);
+		for(MetaReply r: replies) {
+			if (r.ready()) {
+				replies.remove();
+			}
+		}
+		if (fdl!=null) {
+			try {
+				fdl.sync(timeout);
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
 		return 0;
 	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/MetaReply.java	Tue Aug 19 05:33:32 2008 +0900
@@ -0,0 +1,59 @@
+package fdl;
+
+import java.nio.ByteBuffer;
+
+public class MetaReply extends PSXReply {
+
+	public int id;
+	public TupleSpace ts;
+
+	public MetaReply(int mode, int id,TupleSpace ts) {
+		this.mode = mode;
+		this.id = id;
+		this.ts = ts;
+		this.command = ByteBuffer.allocate(PSX.LINDA_HEADER_SIZE);
+	}
+
+	public MetaReply(int mode, int id, TupleSpace ts,PSXCallback callback) {
+		this(mode,id,ts);
+		this.callback = callback;
+	}
+
+	public MetaReply(int mode, int id, TupleSpace ts,ByteBuffer data,
+			PSXCallback callback) {
+		this(mode,id,ts);
+		this.data = data;
+		this.callback = callback;
+	}
+
+	void checkTuple() {
+		ByteBuffer data = ts.IN(id, mode, command);
+		if (data!=null) {
+			this.data = data;
+			mode = PSX.PSX_ANSWER;
+		}
+	}
+	
+	public boolean ready() {
+		switch(mode) {
+		case PSX.PSX_IN:
+		case PSX.PSX_RD:
+			checkTuple();
+			break;
+		case PSX.PSX_CHECK:
+			data = ts.check1(null, command);
+			if (data!=null) {
+				mode = PSX.PSX_ANSWER;
+			}
+			break;
+		case PSX.PSX_OUT:
+			ts.Out(null, command, data);
+			return true;
+		case PSX.PSX_UPDATE:
+			// not implemented
+			break;
+		}
+		return mode==PSX.PSX_ANSWER;
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fdl/NullIOHandlerHook.java	Tue Aug 19 05:33:32 2008 +0900
@@ -0,0 +1,30 @@
+package fdl;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+
+public class NullIOHandlerHook implements IOHandlerHook {
+
+	@Override
+	public void checkHook(SelectionKey key, int id, int seq, char mode) {
+	}
+
+	@Override
+	public void closeHook(SelectionKey key) {
+	}
+
+	@Override
+	public void inHook(SelectionKey key, int id, int seq, char mode) {
+	}
+
+	@Override
+	public void outHook(SelectionKey key, int id, int seq, char mode,
+			ByteBuffer data) {
+	}
+
+	@Override
+	public void waitReadHook(SelectionKey key, int id, int seq, char mode) {
+	}
+
+
+}
--- a/src/fdl/PSX.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/PSX.java	Tue Aug 19 05:33:32 2008 +0900
@@ -14,6 +14,10 @@
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
 
 
 
@@ -47,6 +51,9 @@
     
     static final int PRIVILEGED_ID_START   = 32768;
     static final int PRIVILEGED_ID_END   = 36864;
+	static final int META_STOP = PRIVILEGED_ID_START;
+	static final int META_MONITOR = PRIVILEGED_ID_START+1;
+	static final int META_MONITOR_DATA = PRIVILEGED_ID_START+2;
     
     // this method should be removed
 	static void setReportCommand(ByteBuffer command, String report_txt) {
@@ -100,6 +107,33 @@
 		command.putInt(LINDA_SEQ_OFFSET, seq);
 		command.rewind();
 	}
+
+	public static ByteBuffer string2ByteBuffer(String log) {
+		ByteBuffer blog = ByteBuffer.allocate(log.length()*2); // this is incorrect...
+		for(int i=0;i<log.length();i++) {
+			blog.putChar(log.charAt(i));
+		}
+		blog.flip();
+		return blog;
+	}
+
+	public static String getdataString(ByteBuffer data) {
+		String sendtext;
+		data.rewind();
+		//Decode UTF-8 to System Encoding(UTF-16) 
+		Charset charset = Charset.forName("UTF-8");
+		CharsetDecoder decoder = charset.newDecoder();
+		CharBuffer cb = null;
+		try {
+			cb = decoder.decode(data);
+		} catch (CharacterCodingException e) {
+			e.printStackTrace();
+		}
+		cb.rewind();
+	
+		sendtext = cb.toString();
+		return sendtext;
+	}
     
 
 }
--- a/src/fdl/PSXLinda.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/PSXLinda.java	Tue Aug 19 05:33:32 2008 +0900
@@ -40,7 +40,7 @@
 	public String host;
 	public int port;
 	public int mytsid;
-	public PSXLinda next;
+	public PSXLindaInterface next;
 	static final boolean debug = false;	
 
 	public PSXLinda(FederatedLinda _fdl,int _mytsid,String _host,int _port) 
@@ -136,7 +136,7 @@
 		fdl.psx_queue(this, id, null, 0, PSX.PSX_RD, callback);
 	}
 
-	public PSXLinda add(PSXLinda linda) {
+	public PSXLindaInterface add(PSXLindaInterface linda) {
 		next = linda;
 		return this;
 	}
--- a/src/fdl/PSXLindaInterface.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/PSXLindaInterface.java	Tue Aug 19 05:33:32 2008 +0900
@@ -24,7 +24,7 @@
 
 	public void rd(int id, PSXCallback callback) ;
 
-	public PSXLinda add(PSXLinda linda) ;
+	public PSXLindaInterface add(PSXLindaInterface linda) ;
 
 	public void send(ByteBuffer command,ByteBuffer data) throws IOException ;
 
--- a/src/fdl/TestPSXLinda.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/TestPSXLinda.java	Tue Aug 19 05:33:32 2008 +0900
@@ -31,7 +31,7 @@
     public static void main (String args[]) {
 
 	FederatedLinda fdl;
-	PSXLinda psx;
+	PSXLindaInterface psx;
 	String host  = "localhost";
 	int port = 10000;
 	PSXReply r;
--- a/src/fdl/TupleSpace.java	Mon Aug 18 09:36:13 2008 +0900
+++ b/src/fdl/TupleSpace.java	Tue Aug 19 05:33:32 2008 +0900
@@ -2,56 +2,75 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
 
 public class TupleSpace {
     static final boolean debug = true;
     static final int CAPSIZE = 4194304;
+	public static int user = 0;
+	public byte userchar[] = new byte[2];
     public Tuple[] tuple_space;
+    public IOHandlerHook hook = new NullIOHandlerHook();
     
-	public TupleSpace(Tuple[] _tuple_space) {
-		super();
+	public TupleSpace() {
 		// 読みこんだデータを格納するためのリストの初期化
-        tuple_space = _tuple_space;      
+        tuple_space = new Tuple[TupleHandler.MAX_TUPLE];      
 	}
     
-	public TupleSpace() {
-		// TODO Auto-generated constructor stub
+
+	public void newUser() {
+		Tuple tmpTuple;
+		//初期生成        
+        if((tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1]) == null) {
+        	tmpTuple = tuple_space[TupleHandler.MAX_TUPLE-1] = new Tuple();
+        	tmpTuple.next = null;
+        } else {
+        	while(tmpTuple.next != null) tmpTuple = tmpTuple.next;
+        	tmpTuple.next = new Tuple();
+        	tmpTuple = tmpTuple.next;
+        	tmpTuple.next = null;
+        }
+        
+        user++;
+        
+        ByteBuffer data = ByteBuffer.allocate(2);
+        data.clear();
+        userchar[0] = (byte) (user/10 + '0');
+        userchar[1] = (byte) (user%10 + '0');
+
+        data.put(userchar[0]);
+        data.put(userchar[1]);
+        
+        data.rewind();
+        tmpTuple.setData(data);
+        //Tuple
+        int id = TupleHandler.MAX_TUPLE-1;
+        tmpTuple.setTuple('o', id, 0, data.limit(), data);
+        System.out.println("Server: assign id "+user);
 	}
-
-	protected String Out(ByteBuffer command, ByteBuffer data) throws IOException {
+	
+	protected void Out(SelectionKey key,ByteBuffer command, ByteBuffer data) {
 		Tuple tuple;
 		int id;
 		int datasize;
 		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
 		command.rewind();
 		id = (int)idc;
-		String sendtext = "none";
-		
+
 		datasize = command.getInt(PSX.LINDA_DATA_LENGTH_OFFSET);
 		command.rewind();
 		
 		System.out.println("*** out command : id = "+id +" ***");
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		hook.outHook(key,id,seq,'o',data);    
 		
 		while((tuple_space[id] != null) &&
 				((tuple_space[id].mode == PSX.PSX_WAIT_RD)||(tuple_space[id].mode == PSX.PSX_RD))) {
 			PSX.setAnserCommand(command, tuple_space[id].getSeq());
-			//if(debug){
-				//int sendsize = tmpTuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
-				//System.out.println("send size "+sendsize+" : mode = "+(char)mode);
-			//}
-			//ByteBuffer sendcommand = tmpTuple.getCommand();
-			//ByteBuffer senddata = tmpTuple.getData();
 			send(tuple_space[id].ch, command, data);
 
-			sendtext = getdataString(data);
-
-			
 			removeTuple(id);
 			tuple = null;
 		}
@@ -62,12 +81,7 @@
 				int sendsize = datasize+PSX.LINDA_HEADER_SIZE;
 				System.out.println("send size "+sendsize+" : mode = "+(char)'a');
 			}
-			//ByteBuffer sendcommand = tmpTuple.getCommand();
-			//ByteBuffer senddata = tmpTuple.getData();
 			send(tuple_space[id].ch, command, data);
-
-			sendtext = getdataString(data);
-			
 			removeTuple(id);
 			tuple = null;
 		} else if ((tuple_space[id] == null)|| (tuple_space[id].getMode() == PSX.PSX_OUT)) {
@@ -83,8 +97,6 @@
 			}
 	
 			tuple.setMode('o');
-			int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			command.rewind();
 			tuple.setSeq(seq);
 			tuple.setData(data);
 			tuple.setDataLength(datasize);
@@ -98,7 +110,6 @@
 			data.clear();
 			System.exit(1);
 		}
-		return sendtext;
 	}
 
 	private void removeTuple(int id) {
@@ -124,6 +135,9 @@
 		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
 		command.rewind();
 		tuple.setSeq(seq);
+		
+		hook.waitReadHook(key,id,seq,(char)mode);
+		
 		tuple.ch = (SocketChannel) key.channel();
 		tuple.setDataLength(0);
 		ByteBuffer buff = ByteBuffer.allocate(0);
@@ -135,7 +149,7 @@
 		}
 	}
 
-	protected String In_Rd(SelectionKey key, ByteBuffer command, int mode)
+	protected void In_Rd(SelectionKey key, ByteBuffer command, int mode)
 			throws IOException {
 		Tuple tuple = read_in_1(key, command, mode);
 
@@ -145,8 +159,6 @@
 			ByteBuffer senddata = tuple.getData();
 			send(key,sendcommand, senddata);
 		}
-		String sendtext = getdataString(tuple.getData());
-		return sendtext;
 	}
 
 	private Tuple read_in_1(SelectionKey key, ByteBuffer command, int mode) {
@@ -159,10 +171,12 @@
 		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
 		command.rewind();
 		id = (int)idc;
-		
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
 		
 		System.out.println("*** "+(char)mode+" command : id = "+ id +" ***\n");    		
-		
+		hook.inHook(key,id,seq,(char)mode);
+
 		tuple = tuple_space[id];
 			
 		//wを無視
@@ -172,79 +186,106 @@
 		}
 		
 		if (tuple != null && (tuple.mode == 'o')){
-			//tmpTuple = new Tuple((SocketChannel)key.channel());
-			int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			command.rewind();
-			tuple.setCommand('a', seq);
-			
-			if(debug){
-				int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
-				System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode());
-			}
-				
-
-			
-			//INの場合はremoveする
-			if(mode == PSX.PSX_IN) {
-				if(tuple.data != null){
-					//ByteBuffer buff = ByteBuffer.allocate(0);
-					//tmpTuple.setData(buff);
-					tuple.data = null;
-				}
-				if(temp != null){
-					temp.next = tuple.next;
-				}
-				else {
-					tuple_space[id] = tuple.next;
-				}
-			}
+			tupleIsAvailable(command, mode, tuple, id, temp);
 		} else {
-			if(tuple == null) {
-				//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
-				tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
-				tuple.next = null;
-			}else {
-				while(tuple.next !=null) tuple =tuple.next;
-				tuple.next= new Tuple((SocketChannel)key.channel());
-				tuple = tuple.next;
-				tuple.next = null;
-			}
-			
-			tuple.setMode(mode);
-			int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
-			command.rewind();
-			tuple.setSeq(seq2);
-			tuple.ch = (SocketChannel) key.channel();
-			tuple.setDataLength(0);
-			ByteBuffer buff = ByteBuffer.allocate(0);
-			buff.rewind();
-			tuple.setData(buff);
-			tuple = null;
-				
-			if(debug){
-				System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
-			}
+			tuple = setupWait(key, command, mode, tuple, id);
 		}
 		return tuple;
 	}
 
-	protected String Check(SelectionKey key, ByteBuffer command) throws IOException {
-		String sendtext;
-		ByteBuffer data = check1(command);
-		send(key, command, data);
+	public ByteBuffer IN(int id,int mode, ByteBuffer command) {
+		Tuple tuple,temp=null;
+		tuple = tuple_space[id];
+
+		//wを無視
+		while(tuple != null && tuple.next != null && (tuple.mode == 'w')){
+			temp = tuple;
+			tuple = tuple.next;
+		}
+
+		if (tuple != null && (tuple.mode == 'o')){
+			ByteBuffer data = tuple.data;
+			tupleIsAvailable(command, mode, tuple, id, temp);
+			return data;
+		} 
+		return null;
+	}
+	
+	private void tupleIsAvailable(ByteBuffer command, int mode, Tuple tuple,
+			int id, Tuple temp) {
+		//tmpTuple = new Tuple((SocketChannel)key.channel());
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		tuple.setCommand('a', seq);
+
+		if(debug){
+			int sendsize = tuple.getdataLength()+PSX.LINDA_HEADER_SIZE;
+			System.out.println("send size "+sendsize+" : mode = "+(char)tuple.getMode());
+		}
+			
+
 		
-		sendtext = getdataString(data);
-	    
-	    return sendtext;
+		//INの場合はremoveする
+		if(mode == PSX.PSX_IN) {
+			if(tuple.data != null){
+				//ByteBuffer buff = ByteBuffer.allocate(0);
+				//tmpTuple.setData(buff);
+				tuple.data = null;
+			}
+			if(temp != null){
+				temp.next = tuple.next;
+			}
+			else {
+				tuple_space[id] = tuple.next;
+			}
+		}
 	}
 
-	private ByteBuffer check1(ByteBuffer command) {
+	private Tuple setupWait(SelectionKey key, ByteBuffer command, int mode,
+			Tuple tuple, int id) {
+		if(tuple == null) {
+			//ServerSocketChannel sc = (ServerSocketChannel)key.channel();
+			tuple = tuple_space[id] = new Tuple((SocketChannel)key.channel());
+			tuple.next = null;
+		}else {
+			while(tuple.next !=null) tuple =tuple.next;
+			tuple.next= new Tuple((SocketChannel)key.channel());
+			tuple = tuple.next;
+			tuple.next = null;
+		}
+		
+		tuple.setMode(mode);
+		int seq2 = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		tuple.setSeq(seq2);
+		tuple.ch = (SocketChannel) key.channel();
+		tuple.setDataLength(0);
+		ByteBuffer buff = ByteBuffer.allocate(0);
+		buff.rewind();
+		tuple.setData(buff);
+		tuple = null;
+			
+		if(debug){
+			System.out.println("data inserted insert seq = "+seq2 +", id = "+id);
+		}
+		return tuple;
+	}
+
+	protected void Check(SelectionKey key, ByteBuffer command) throws IOException {
+		ByteBuffer data = check1(key,command);
+		send(key, command, data);
+	}
+
+	public ByteBuffer check1(SelectionKey key,ByteBuffer command) {
 		ByteBuffer data;
 		Tuple tmpTuple;
 		int id;
 		char idc = (char)command.getShort(PSX.LINDA_ID_OFFSET);
 		command.rewind();
 		id = (int)idc;
+		int seq = command.getInt(PSX.LINDA_SEQ_OFFSET);
+		command.rewind();
+		hook.checkHook(key,id,seq,'c');
 		
 		tmpTuple = tuple_space[id];
 		while(tmpTuple != null && tmpTuple.next != null && (tmpTuple.mode == 'w')){
@@ -263,64 +304,44 @@
 		return data;
 	}
 
-	public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data)
-			throws IOException {
-				if (debug) {
-			        if (command == null) {
-			            System.out.println("Manager_run: command is null");
-			        }
-			        if (data == null) {
-			            System.out.println("Manager_run: data is null");
-			        }
-				}
-				int send_size = PSX.LINDA_HEADER_SIZE;
-				int count = 0;
-			
-				//command Send
-				command.rewind();
-				while(send_size > 0){
-					count = ch.write(command);
-					if(count < 0){
-						System.out.println("Write Falied! close ch:"+ch);
-						ch.close();
-						return;
-					}
-					send_size -= count;
-				}
-				
-				//data Send
-				data.rewind();
-				if(data != null) {
-					data.rewind();
-					ch.write(data);
-				}
+	public void send(SocketChannel ch, ByteBuffer command, ByteBuffer data) {
+		if (debug) {
+			if (command == null) {
+				System.out.println("Manager_run: command is null");
+			}
+			if (data == null) {
+				System.out.println("Manager_run: data is null");
+			}
+		}
+		int send_size = PSX.LINDA_HEADER_SIZE;
+		int count = 0;
+
+		try {
+			//command Send
+			command.rewind();
+			while(send_size > 0){
+				count = ch.write(command);
+				if(count < 0) throw new IOException();
+				send_size -= count;
 			}
 
-	public void send(SelectionKey key, ByteBuffer command, ByteBuffer data)
-			throws IOException {
-				SocketChannel ch = (SocketChannel)key.channel();
-				send(ch,command,data);
+			if (data==null) return;
+			//data Send
+			data.rewind();
+			while(data.remaining() > 0){
+				count = ch.write(data);
+				if(count < 0) throw new IOException();
 			}
-
-	private String getdataString(ByteBuffer data) {
-		String sendtext;
-		data.rewind();
-		//set sendtext
-		//CharBuffer chardata = data.asCharBuffer();
-		
-		//Decode UTF-8 to System Encoding(UTF-16) 
-		Charset charset = Charset.forName("UTF-8");
-		CharsetDecoder decoder = charset.newDecoder();
-		CharBuffer cb = null;
-		try {
-			cb = decoder.decode(data);
-		} catch (CharacterCodingException e) {
-			e.printStackTrace();
+		} catch (IOException e) {
+			System.out.println("Write Falied on:"+ch);
+			return;
 		}
-		cb.rewind();
-		
-		sendtext = cb.toString();
-		return sendtext;
 	}
 
+	public void send(SelectionKey key, ByteBuffer command, ByteBuffer data) {
+		SocketChannel ch = (SocketChannel)key.channel();
+		send(ch,command,data);
+	}
+
+
 }
\ No newline at end of file