# HG changeset patch # User kazz # Date 1326597450 -32400 # Node ID 433e601a8e28484c7a146d6986ec1b65cbf4b4ee # Parent 45e98e74db96f194a15f66c205f9a0f176f8bdd9 network bug fix diff -r 45e98e74db96 -r 433e601a8e28 src/alice/daemon/AcceptThread.java --- a/src/alice/daemon/AcceptThread.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Sun Jan 15 12:17:30 2012 +0900 @@ -4,11 +4,16 @@ import java.net.ServerSocket; import java.net.Socket; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + import alice.datasegment.DataSegment; public class AcceptThread extends Thread { private ServerSocket ss; + private Logger log = Logger.getLogger(AcceptThread.class); + public AcceptThread(ServerSocket ss, String name) { super(name); @@ -19,9 +24,8 @@ public void run() { while (true) { try { - System.out.println("wait accept..."); Socket socket = ss.accept(); - System.out.println("accepted!"); + log.info("Accept " + socket.getInetAddress().getHostName() + ":" + socket.getPort()); Connection connection = new Connection(socket); new IncomingTcpConnection(connection, DataSegment.get("local")).start(); new OutboundTcpConnection(connection).start(); diff -r 45e98e74db96 -r 433e601a8e28 src/alice/daemon/CommandMessage.java --- a/src/alice/daemon/CommandMessage.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/CommandMessage.java Sun Jan 15 12:17:30 2012 +0900 @@ -5,11 +5,13 @@ @Message public class CommandMessage { - int type; - int index; - int seq; - String key; - Value val; + public int type = 0; + public int index = 0; + public int seq = 0; + public String key = null; + public Value val = null; + + public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key, Value val) { this.type = type; diff -r 45e98e74db96 -r 433e601a8e28 src/alice/daemon/IncomingTcpConnection.java --- a/src/alice/daemon/IncomingTcpConnection.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Sun Jan 15 12:17:30 2012 +0900 @@ -28,11 +28,11 @@ while (true) { SocketChannel ch = connection.socket.getChannel(); ByteBuffer buf = ByteBuffer.allocateDirect(4); // for int - try { int allReadLen = 0; do { int readLen = ch.read(buf); + if (readLen < 0) return; allReadLen += readLen; } while (allReadLen < 4); buf.rewind(); @@ -41,12 +41,15 @@ ByteBuffer msgBuf = ByteBuffer.allocateDirect(msgLen); do { int readLen = ch.read(msgBuf); + if (readLen < 0) return; allReadLen += readLen; } while (allReadLen < msgLen); - msgBuf.rewind(); + msgBuf.flip(); CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); + msgBuf.flip(); + System.out.println(msgpack.read(msgBuf)); CommandType type = CommandType.getCommandTypeFromId(msg.type); - LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); + LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); switch (type) { case UPDATE: diff -r 45e98e74db96 -r 433e601a8e28 src/alice/daemon/OutboundTcpConnection.java --- a/src/alice/daemon/OutboundTcpConnection.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Sun Jan 15 12:17:30 2012 +0900 @@ -28,6 +28,7 @@ ByteBuffer buffer = ByteBuffer.allocateDirect(4 + buf.length); buffer.putInt(buf.length); buffer.put(buf); + buffer.flip(); connection.socket.getChannel().write(buffer); } catch (InterruptedException e) { e.printStackTrace(); diff -r 45e98e74db96 -r 433e601a8e28 src/alice/datasegment/Command.java --- a/src/alice/datasegment/Command.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/datasegment/Command.java Sun Jan 15 12:17:30 2012 +0900 @@ -19,6 +19,7 @@ public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { this.type = cmdType; this.argKey = argKey; + this.key = key; this.val = val; this.index = index; this.seq = seq; diff -r 45e98e74db96 -r 433e601a8e28 src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 12:17:30 2012 +0900 @@ -12,6 +12,9 @@ } public DataSegmentKey getDataSegmentKey(String key) { + if (key == null) { + return null; + } DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); if (dataSegmentKey == null) { diff -r 45e98e74db96 -r 433e601a8e28 src/alice/test/codesegment/RemoteIncrement.java --- a/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 12:17:30 2012 +0900 @@ -11,7 +11,7 @@ public void run() { DataSegmentValue data = ids.get("num"); int num = data.val.asIntegerValue().getInt(); - System.out.println(num++); + System.out.println("[CodeSegment] " + num++); if (num == 10) System.exit(0); CodeSegment cs = new RemoteIncrement(); diff -r 45e98e74db96 -r 433e601a8e28 src/alice/test/codesegment/TestCodeSegment.java --- a/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Sun Jan 15 12:17:30 2012 +0900 @@ -8,6 +8,8 @@ public class TestCodeSegment extends CodeSegment { + DataSegmentValue arg1; + @Override public void run() { DataSegmentValue data = ids.get("arg1"); diff -r 45e98e74db96 -r 433e601a8e28 src/alice/test/codesegment/TestRemoteAlice.java --- a/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 01:19:54 2012 +0900 +++ b/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 12:17:30 2012 +0900 @@ -2,6 +2,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import alice.codesegment.CodeSegment; @@ -39,10 +40,8 @@ RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); DataSegment.regist(conf.key, manager); connect = false; - System.out.println("connected"); } catch (IOException e) { try { - System.out.println("wait"); Thread.sleep(500); } catch (InterruptedException e1) { e1.printStackTrace(); diff -r 45e98e74db96 -r 433e601a8e28 src/log4j.xml --- a/src/log4j.xml Sun Jan 15 01:19:54 2012 +0900 +++ b/src/log4j.xml Sun Jan 15 12:17:30 2012 +0900 @@ -1,13 +1,13 @@ - + - +