# HG changeset patch # User kazz # Date 1326556585 -32400 # Node ID e3f1b21718b00d5df5f254a72a00f2e61113b215 # Parent 30f97d776a3e001ea978ad5af5475198d4877307 implements RemoteDataSegment diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/daemon/AcceptThread.java --- a/src/alice/daemon/AcceptThread.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/daemon/AcceptThread.java Sun Jan 15 00:56:25 2012 +0900 @@ -4,6 +4,8 @@ import java.net.ServerSocket; import java.net.Socket; +import alice.datasegment.DataSegment; + public class AcceptThread extends Thread { private ServerSocket ss; @@ -19,7 +21,7 @@ try { Socket socket = ss.accept(); Connection connection = new Connection(socket); - new IncomingTcpConnection(connection).start(); + new IncomingTcpConnection(connection, DataSegment.get("local")).start(); new OutboundTcpConnection(connection).start(); } catch (IOException e) { e.printStackTrace(); diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/daemon/AliceDaemon.java --- a/src/alice/daemon/AliceDaemon.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/daemon/AliceDaemon.java Sun Jan 15 00:56:25 2012 +0900 @@ -11,8 +11,8 @@ private Config conf; private AcceptThread acceptThread; - public AliceDaemon(String[] args) { - this.conf = new Config(args); + public AliceDaemon(Config conf) { + this.conf = conf; } public void listen() { diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/daemon/Config.java --- a/src/alice/daemon/Config.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/daemon/Config.java Sun Jan 15 00:56:25 2012 +0900 @@ -3,11 +3,20 @@ public class Config { public int port = 10000; + public String hostname; + public int connectPort = 10000; + public String key; public Config(String[] args) { for (int i = 0; i< args.length; i++) { if ("-p".equals(args[i])) { port = Integer.parseInt(args[++i]); + } else if ("-h".equals(args[i])) { + hostname = args[++i]; + } else if ("-cp".equals(args[i])) { + connectPort = Integer.parseInt(args[++i]); + } else if ("-key".equals(args[i])) { + } } } diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/daemon/Connection.java --- a/src/alice/daemon/Connection.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/daemon/Connection.java Sun Jan 15 00:56:25 2012 +0900 @@ -13,5 +13,13 @@ public Connection(Socket socket) { this.socket = socket; } + + public void sendCommand(Command cmd) { + try { + sendQueue.put(cmd); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/daemon/IncomingTcpConnection.java --- a/src/alice/daemon/IncomingTcpConnection.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/daemon/IncomingTcpConnection.java Sun Jan 15 00:56:25 2012 +0900 @@ -10,15 +10,16 @@ import alice.datasegment.CommandType; import alice.datasegment.DataSegment; import alice.datasegment.DataSegmentKey; +import alice.datasegment.DataSegmentManager; import alice.datasegment.LocalDataSegmentManager; -import alice.datasegment.SocketDataSegmentManager; public class IncomingTcpConnection extends Thread { public Connection connection; - public SocketDataSegmentManager manager; + public DataSegmentManager manager; - public IncomingTcpConnection(Connection connection) { + public IncomingTcpConnection(Connection connection, DataSegmentManager manager) { + this.manager = manager; this.connection = connection; } @@ -45,28 +46,28 @@ msgBuf.rewind(); CommandMessage msg = msgpack.read(msgBuf, CommandMessage.class); CommandType type = CommandType.getCommandTypeFromId(msg.type); - LocalDataSegmentManager manager = (LocalDataSegmentManager)DataSegment.get("local"); - DataSegmentKey dsKey = manager.getDataSegmentKey(msg.key); + LocalDataSegmentManager lmanager = (LocalDataSegmentManager)DataSegment.get("local"); + DataSegmentKey dsKey = lmanager.getDataSegmentKey(msg.key); switch (type) { case UPDATE: - dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null)); + dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null)); break; case PUT: - dsKey.addCommand(new Command(type, null, msg.val, 0, 0, null, null)); + dsKey.addCommand(new Command(type, null, null, msg.val, 0, 0, null, null)); break; case PEEK: //Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { - dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null)); + dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null)); break; case TAKE: - dsKey.addCommand(new Command(type, null, null, msg.index, msg.seq, connection.sendQueue, null)); + dsKey.addCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null)); break; case REMOVE: - dsKey.addCommand(new Command(type, null, null, 0, 0, null, null)); + dsKey.addCommand(new Command(type, null, null, null, 0, 0, null, null)); break; case REPLY: try { - manager.replyQueue.put(new Command(type, null, msg.val, msg.index, msg.seq, null, null)); + manager.replyQueue.put(new Command(type, null, null, msg.val, msg.index, msg.seq, null, null)); } catch (InterruptedException e) { e.printStackTrace(); } diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/daemon/OutboundTcpConnection.java --- a/src/alice/daemon/OutboundTcpConnection.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/daemon/OutboundTcpConnection.java Sun Jan 15 00:56:25 2012 +0900 @@ -16,7 +16,7 @@ } public CommandMessage convert(Command cmd) { - return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, null, cmd.val); + return new CommandMessage(cmd.type.id, cmd.index, cmd.seq, cmd.key, cmd.val); } public void run() { diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/datasegment/Command.java --- a/src/alice/datasegment/Command.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/datasegment/Command.java Sun Jan 15 00:56:25 2012 +0900 @@ -8,6 +8,7 @@ public class Command { public CommandType type; + public String key; public String argKey; public Value val; public int index; @@ -15,7 +16,7 @@ public BlockingQueue replyQueue; public CodeSegment cs; - public Command(CommandType cmdType, String argKey, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { + public Command(CommandType cmdType, String argKey, String key, Value val, int index, int seq, BlockingQueue replyQueue, CodeSegment cs) { this.type = cmdType; this.argKey = argKey; this.val = val; diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/datasegment/DataSegmentKey.java --- a/src/alice/datasegment/DataSegmentKey.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/datasegment/DataSegmentKey.java Sun Jan 15 00:56:25 2012 +0900 @@ -43,7 +43,7 @@ LinkedList removeList = new LinkedList(); for (Command waitCmd : waitList) { if (waitCmd.index < index) { - waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, cmd.val, index, waitCmd.seq, null, null)); + waitCmd.replyQueue.put(new Command(CommandType.REPLY, null, null, cmd.val, index, waitCmd.seq, null, null)); removeList.add(waitCmd); if (waitCmd.type == CommandType.TAKE) { // delete data, if it run take cmd. dataList.remove(dsv); @@ -62,7 +62,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null)); break; } } @@ -75,7 +75,7 @@ } for (DataSegmentValue data : dataList) { if (data.index > cmd.index) { - cmd.replyQueue.put(new Command(CommandType.REPLY, null, data.val, data.index, cmd.seq, null, null)); + cmd.replyQueue.put(new Command(CommandType.REPLY, null, null, data.val, data.index, cmd.seq, null, null)); dataList.remove(data); break; } diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/datasegment/DataSegmentManager.java --- a/src/alice/datasegment/DataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/datasegment/DataSegmentManager.java Sun Jan 15 00:56:25 2012 +0900 @@ -2,15 +2,34 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.msgpack.type.Value; import alice.codesegment.CodeSegment; public abstract class DataSegmentManager { + protected ConcurrentHashMap dataSegments = new ConcurrentHashMap(); protected ConcurrentHashMap seqHash = new ConcurrentHashMap(); - public LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); + public LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); + protected AtomicInteger seq = new AtomicInteger(1); + protected Runnable replyThread = new Runnable() { + + @Override + public void run() { + while (true) { + try { + Command reply = replyQueue.take(); + Command cmd = seqHash.get(reply.seq); + cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + }; public abstract void put(String key, Value val); public abstract void update(String key, Value val); diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Fri Jan 13 19:04:59 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Sun Jan 15 00:56:25 2012 +0900 @@ -1,7 +1,5 @@ package alice.datasegment; -import java.util.concurrent.atomic.AtomicInteger; - import org.msgpack.type.Value; import alice.codesegment.CodeSegment; @@ -9,24 +7,6 @@ public class LocalDataSegmentManager extends DataSegmentManager { - private AtomicInteger seq = new AtomicInteger(1); - private Runnable replyThread = new Runnable() { - - @Override - public void run() { - while (true) { - try { - Command reply = replyQueue.take(); - Command cmd = seqHash.get(reply.seq); - cmd.cs.ids.reply(cmd.argKey, new DataSegmentValue(reply.index, reply.val)); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - }; - public LocalDataSegmentManager() { new Thread(replyThread).start(); } @@ -44,20 +24,20 @@ @Override public void put(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.PUT, null, val, 0, 0, replyQueue, null)); + dataSegmentKey.addCommand(new Command(CommandType.PUT, null, null, val, 0, 0, replyQueue, null)); } @Override public void update(String key, Value val) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, val, 0, 0, replyQueue, null)); + dataSegmentKey.addCommand(new Command(CommandType.UPDATE, null, null, val, 0, 0, replyQueue, null)); } @Override public void take(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.TAKE, argKey, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.TAKE, argKey, null, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -66,7 +46,7 @@ public void peek(String argKey, String key, int index, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); int seq = this.seq.getAndIncrement(); - Command cmd = new Command(CommandType.PEEK, argKey, null, index, seq, replyQueue, cs); + Command cmd = new Command(CommandType.PEEK, argKey, null, null, index, seq, replyQueue, cs); seqHash.put(seq, cmd); dataSegmentKey.addCommand(cmd); } @@ -74,7 +54,7 @@ @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); - dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, 0, 0, replyQueue, null)); + dataSegmentKey.addCommand(new Command(CommandType.REMOVE, null, null, null, 0, 0, replyQueue, null)); } } diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/datasegment/RemoteDataSegment.java --- a/src/alice/datasegment/RemoteDataSegment.java Fri Jan 13 19:04:59 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,39 +0,0 @@ -package alice.datasegment; - -import org.msgpack.type.Value; - -import alice.codesegment.CodeSegment; - -public class RemoteDataSegment extends DataSegmentManager { - - @Override - public void put(String key, Value val) { - // TODO Auto-generated method stub - - } - - @Override - public void update(String key, Value val) { - // TODO Auto-generated method stub - - } - - @Override - public void take(String argKey, String key, int index, CodeSegment cs) { - // TODO Auto-generated method stub - - } - - @Override - public void peek(String argKey, String key, int index, CodeSegment cs) { - // TODO Auto-generated method stub - - } - - @Override - public void remove(String key) { - // TODO Auto-generated method stub - - } - -} diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/datasegment/RemoteDataSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/datasegment/RemoteDataSegmentManager.java Sun Jan 15 00:56:25 2012 +0900 @@ -0,0 +1,52 @@ +package alice.datasegment; + +import org.msgpack.type.Value; + +import alice.codesegment.CodeSegment; +import alice.daemon.Connection; +import alice.daemon.IncomingTcpConnection; +import alice.daemon.OutboundTcpConnection; + +public class RemoteDataSegmentManager extends DataSegmentManager { + + Connection connection; + + public RemoteDataSegmentManager(Connection connection) { + this.connection = connection; + new IncomingTcpConnection(connection, this).start(); + new OutboundTcpConnection(connection).start(); + new Thread(replyThread).start(); + } + + @Override + public void put(String key, Value val) { + connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null)); + } + + @Override + public void update(String key, Value val) { + connection.sendCommand(new Command(CommandType.UPDATE, null, key, val, 0, 0, null, null)); + } + + @Override + public void take(String argKey, String key, int index, CodeSegment cs) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.TAKE, argKey, key, null, index, seq, replyQueue, cs); + seqHash.put(seq, cmd); + connection.sendCommand(cmd); + } + + @Override + public void peek(String argKey, String key, int index, CodeSegment cs) { + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, argKey, key, null, index, seq, replyQueue, cs); + seqHash.put(seq, cmd); + connection.sendCommand(cmd); + } + + @Override + public void remove(String key) { + connection.sendCommand(new Command(CommandType.REMOVE, null, key, null, 0, 0, null, null)); + } + +} diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/test/codesegment/RemoteIncrement.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/RemoteIncrement.java Sun Jan 15 00:56:25 2012 +0900 @@ -0,0 +1,24 @@ +package alice.test.codesegment; + +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.DataSegmentValue; + +public class RemoteIncrement extends CodeSegment { + + @Override + public void run() { + DataSegmentValue data = ids.get("num"); + int num = data.val.asIntegerValue().getInt(); + System.out.println(num++); + if (num == 10) System.exit(0); + + CodeSegment cs = new RemoteIncrement(); + cs.ids.take("num", "remote", "num"); + cs.ids.execute(); + + ods.put("local", "num", ValueFactory.createIntegerValue(num)); + } + +} diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/test/codesegment/RemoteStartCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/RemoteStartCodeSegment.java Sun Jan 15 00:56:25 2012 +0900 @@ -0,0 +1,20 @@ +package alice.test.codesegment; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; + +public class RemoteStartCodeSegment extends CodeSegment { + + @Override + public void run() { + CodeSegment cs = new RemoteIncrement(); + cs.ids.take("num", "remote", "num"); + cs.ids.execute(); + + Value num = ValueFactory.createIntegerValue(0); + ods.put("local", "num", num); + } + +} diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/test/codesegment/TestAlice.java --- a/src/alice/test/codesegment/TestAlice.java Fri Jan 13 19:04:59 2012 +0900 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,11 +0,0 @@ -package alice.test.codesegment; - -import alice.codesegment.CodeSegment; - -public class TestAlice { - public static void main(String args[]) { - CodeSegment cs = new StartCodeSegment(); - cs.ids.execute(); - } - -} diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/test/codesegment/TestLocalAlice.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/TestLocalAlice.java Sun Jan 15 00:56:25 2012 +0900 @@ -0,0 +1,11 @@ +package alice.test.codesegment; + +import alice.codesegment.CodeSegment; + +public class TestLocalAlice { + public static void main(String args[]) { + CodeSegment cs = new StartCodeSegment(); + cs.ids.execute(); + } + +} diff -r 30f97d776a3e -r e3f1b21718b0 src/alice/test/codesegment/TestRemoteAlice.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/TestRemoteAlice.java Sun Jan 15 00:56:25 2012 +0900 @@ -0,0 +1,42 @@ +package alice.test.codesegment; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; + +import alice.codesegment.CodeSegment; +import alice.daemon.AliceDaemon; +import alice.daemon.Config; +import alice.daemon.Connection; +import alice.datasegment.DataSegment; +import alice.datasegment.RemoteDataSegmentManager; + +public class TestRemoteAlice { + + public static void main(String[] args) { + Config conf = new Config(args); + AliceDaemon daemon = new AliceDaemon(conf); + daemon.listen(); + boolean connect = true; + do { + try { + SocketChannel sc = SocketChannel.open(new InetSocketAddress(conf.hostname, conf.connectPort)); + Connection connection = new Connection(sc.socket()); + RemoteDataSegmentManager manager = new RemoteDataSegmentManager(connection); + DataSegment.regist(conf.key, manager); + connect = false; + System.out.println("connected"); + } catch (IOException e) { + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } while (connect); + + CodeSegment cs = new RemoteStartCodeSegment(); + cs.ids.execute(); + } + +}