changeset 16:433e601a8e28

network bug fix
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Sun, 15 Jan 2012 12:17:30 +0900
parents 45e98e74db96
children bb075e103cd3
files src/alice/daemon/AcceptThread.java src/alice/daemon/CommandMessage.java src/alice/daemon/IncomingTcpConnection.java src/alice/daemon/OutboundTcpConnection.java src/alice/datasegment/Command.java src/alice/datasegment/LocalDataSegmentManager.java src/alice/test/codesegment/RemoteIncrement.java src/alice/test/codesegment/TestCodeSegment.java src/alice/test/codesegment/TestRemoteAlice.java src/log4j.xml
diffstat 10 files changed, 30 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/src/alice/daemon/AcceptThread.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/daemon/AcceptThread.java	Sun Jan 15 12:17:30 2012 +0900
@@ -4,11 +4,16 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
 import alice.datasegment.DataSegment;
 
 public class AcceptThread extends Thread {
 
 	private ServerSocket ss;
+	private Logger log = Logger.getLogger(AcceptThread.class);
+
 
 	public AcceptThread(ServerSocket ss, String name) {
 		super(name);
@@ -19,9 +24,8 @@
 	public void run() {
 		while (true) {
 			try {
-				System.out.println("wait accept...");
 				Socket socket = ss.accept();
-				System.out.println("accepted!");
+				log.info("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort());
 				Connection connection = new Connection(socket);
 				new IncomingTcpConnection(connection, DataSegment.get("local")).start();
 				new OutboundTcpConnection(connection).start();
--- a/src/alice/daemon/CommandMessage.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/daemon/CommandMessage.java	Sun Jan 15 12:17:30 2012 +0900
@@ -5,11 +5,13 @@
 
 @Message
 public class CommandMessage {
-	int type;
-	int index;
-	int seq;
-	String key;
-	Value val;
+	public int type = 0;
+	public int index = 0;
+	public int seq = 0;
+	public String key = null;
+	public Value val = null;
+	
+	public CommandMessage() {}
 	
 	public CommandMessage(int type, int index, int seq, String key, Value val) {
 		this.type = type;
--- a/src/alice/daemon/IncomingTcpConnection.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/daemon/IncomingTcpConnection.java	Sun Jan 15 12:17:30 2012 +0900
@@ -28,11 +28,11 @@
 		while (true) {
 			SocketChannel ch = connection.socket.getChannel();
 			ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int
-			
 			try {
 				int allReadLen = 0;
 				do {
 					int readLen = ch.read(buf);
+					if (readLen < 0) return;
 					allReadLen += readLen;
 				} while (allReadLen < 4);
 				buf.rewind();
@@ -41,12 +41,15 @@
 				ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen);
 				do {
 					int readLen = ch.read(msgBuf);
+					if (readLen < 0) return;
 					allReadLen += readLen;
 				} while (allReadLen < msgLen);
-				msgBuf.rewind();
+				msgBuf.flip();
 				CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class);
+				msgBuf.flip();
+				System.out.println(msgpack.read(msgBuf));
 				CommandType type = CommandType.getCommandTypeFromId(msg.type);
-				LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); 
+				LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local");
 				DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key);
 				switch (type) {
 				case UPDATE:
--- a/src/alice/daemon/OutboundTcpConnection.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/daemon/OutboundTcpConnection.java	Sun Jan 15 12:17:30 2012 +0900
@@ -28,6 +28,7 @@
 				ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length);
 				buffer.putInt(buf.length);
 				buffer.put(buf);
+				buffer.flip();
 				connection.socket.getChannel().write(buffer);
 			} catch (InterruptedException e) {
 				e.printStackTrace();
--- a/src/alice/datasegment/Command.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/datasegment/Command.java	Sun Jan 15 12:17:30 2012 +0900
@@ -19,6 +19,7 @@
 	public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue<Command> replyQueue, CodeSegment cs) {
 		this.type = cmdType;
 		this.argKey = argKey;
+		this.key = key;
 		this.val = val;
 		this.index = index;
 		this.seq = seq;
--- a/src/alice/datasegment/LocalDataSegmentManager.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/datasegment/LocalDataSegmentManager.java	Sun Jan 15 12:17:30 2012 +0900
@@ -12,6 +12,9 @@
 	}
 	
 	public DataSegmentKey getDataSegmentKey(String key) {
+		if (key == null) {
+			return null;
+		}
 		DataSegmentKey newDataSegmentKey = new DataSegmentKey();
 		DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey);
 		if (dataSegmentKey == null) {
--- a/src/alice/test/codesegment/RemoteIncrement.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/test/codesegment/RemoteIncrement.java	Sun Jan 15 12:17:30 2012 +0900
@@ -11,7 +11,7 @@
 	public void run() {
 		DataSegmentValue data = ids.get("num");
 		int num = data.val.asIntegerValue().getInt();
-		System.out.println(num++);
+		System.out.println("[CodeSegment] " + num++);
 		if (num == 10) System.exit(0);
 
 		CodeSegment cs = new RemoteIncrement();
--- a/src/alice/test/codesegment/TestCodeSegment.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/test/codesegment/TestCodeSegment.java	Sun Jan 15 12:17:30 2012 +0900
@@ -8,6 +8,8 @@
 
 public class TestCodeSegment extends CodeSegment {
 	
+	DataSegmentValue arg1;
+	
 	@Override
 	public void run() {
 		DataSegmentValue data = ids.get("arg1");
--- a/src/alice/test/codesegment/TestRemoteAlice.java	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/alice/test/codesegment/TestRemoteAlice.java	Sun Jan 15 12:17:30 2012 +0900
@@ -2,6 +2,7 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 
 import alice.codesegment.CodeSegment;
@@ -39,10 +40,8 @@
 				RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection);
 				DataSegment.regist(conf.key, manager);
 				connect = false;
-				System.out.println("connected");
 			} catch (IOException e) {
 				try {
-					System.out.println("wait");
 					Thread.sleep(500);
 				} catch (InterruptedException e1) {
 					e1.printStackTrace();
--- a/src/log4j.xml	Sun Jan 15 01:19:54 2012 +0900
+++ b/src/log4j.xml	Sun Jan 15 12:17:30 2012 +0900
@@ -1,13 +1,13 @@
 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
    <appender name="Appender1" class="org.apache.log4j.FileAppender">
-      <param name="File" value="alice.log" />
+      <param name="File" value="alice.log" ></param>
       <layout class="org.apache.log4j.PatternLayout">
          <param name="ConversionPattern" value="%d %-5p %c - %m [%t] (%F:%L)%n"/>
       </layout>
    </appender>
    <root>
-      <level value="warn" />
+      <level value="debug" />
       <appender-ref ref="Appender1" />
    </root>
 </log4j:configuration>