# HG changeset patch # User Nozomi Teruya # Date 1428572186 -32400 # Node ID 145c425db88d2a6b1aebbfdf71f7fe67db7a55ff # Parent b8fe79f0c764fbd0105ee788635b5a21eb8268f2 add CompressedLDSM diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/codesegment/CodeSegment.java --- a/src/main/java/alice/codesegment/CodeSegment.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/codesegment/CodeSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -10,7 +10,7 @@ public InputDataSegment ids = new InputDataSegment(this); public OutputDataSegment ods = new OutputDataSegment(); private ArrayList list = new ArrayList(); - private int priority = Thread.NORM_PRIORITY; + private int priority = Thread.NORM_PRIORITY;//? public void execute() { ids.receive(); @@ -18,9 +18,9 @@ public void register(Receiver receiver) { list.add(receiver); - } + }//Receiverを作成? - public void recycle(){ + public void recycle(){//idsのリセット ids.init(); ids.setCounter(list.size()); for (Receiver receiver : list) { diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/codesegment/InputDataSegment.java --- a/src/main/java/alice/codesegment/InputDataSegment.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/codesegment/InputDataSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -9,6 +9,7 @@ import alice.datasegment.SendOption; /** + * RemoteかLocalかで分けて処理する。ここに圧縮DSMへ投げる処理を追加。 * InputDataSegment Manager * keep tracking unbound/bound count * @author kazz @@ -19,6 +20,7 @@ public CodeSegment cs; private AtomicInteger count = new AtomicInteger(1); // 1 for no input data segments private AtomicInteger keyCount = new AtomicInteger(0); // number of DataSegments + public InputDataSegment(CodeSegment cs) { this.cs = cs; } @@ -28,39 +30,60 @@ keyCount = new AtomicInteger(0); } - public void quickPeek(Receiver receiver) { + public void quickPeek(Receiver receiver) {//SEDAで実行 cs.register(receiver); - if (receiver.managerKey==null){ - DataSegment.getLocal().peek(receiver, cs, null); + + if (receiver.compressedFlag){ + if (receiver.managerKey == null){//localの場合 + DataSegment.getCompressedLocal().peek(receiver, cs, true); + } else { + if (DataSegment.contains(receiver.managerKey)) {//remoteの場合 + DataSegment.get(receiver.managerKey).peek(receiver, cs, true);//remoteはあとで考える。DataSegment.connect + } + } } else { - if (DataSegment.contains(receiver.managerKey)) { - SendOption option = new SendOption(true, false); - DataSegment.get(receiver.managerKey).peek(receiver, cs, option); + if (receiver.managerKey == null){ + DataSegment.getLocal().peek(receiver, cs, true); + } else { + if (DataSegment.contains(receiver.managerKey)) { + DataSegment.get(receiver.managerKey).peek(receiver, cs, true); + } } } + } public void peek(Receiver receiver) { cs.register(receiver); - if (receiver.managerKey==null){ - DataSegment.getLocal().peek(receiver, cs, null); + + if (receiver.compressedFlag){ + if (receiver.managerKey==null){ + DataSegment.getCompressedLocal().peek(receiver, cs, false); + } else { + if (DataSegment.contains(receiver.managerKey)) { + DataSegment.get(receiver.managerKey).peek(receiver, cs, false); + } + } } else { - if (DataSegment.contains(receiver.managerKey)) { - SendOption option = new SendOption(false, false); - DataSegment.get(receiver.managerKey).peek(receiver, cs, option); + if (receiver.managerKey==null){ + DataSegment.getLocal().peek(receiver, cs, false); + } else { + if (DataSegment.contains(receiver.managerKey)) { + DataSegment.get(receiver.managerKey).peek(receiver, cs, false); + } } } + } public void quickTake(Receiver receiver) { cs.register(receiver); if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs, null); + DataSegment.getLocal().take(receiver, cs, true); } else { if (DataSegment.contains(receiver.managerKey)) { - SendOption option = new SendOption(true, false); - DataSegment.get(receiver.managerKey).take(receiver, cs, option); + DataSegment.get(receiver.managerKey).take(receiver, cs, true); } } } @@ -68,11 +91,10 @@ public void take(Receiver receiver) { cs.register(receiver); if (receiver.managerKey==null){ - DataSegment.getLocal().take(receiver, cs, null); + DataSegment.getLocal().take(receiver, cs, false); } else { if (DataSegment.contains(receiver.managerKey)) { - SendOption option = new SendOption(false, false); - DataSegment.get(receiver.managerKey).take(receiver, cs, option); + DataSegment.get(receiver.managerKey).take(receiver, cs, false); } } } @@ -108,7 +130,10 @@ */ public Receiver create(CommandType type) { return new Receiver(this, type); - } + }//Receiverを作成 + public Receiver create(CommandType type, boolean compressFlag) {//追加 + return new Receiver(this, type, compressFlag); + }//Receiverを作成 public void recommand(Receiver receiver) { // TODO why only local? diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/codesegment/OutputDataSegment.java --- a/src/main/java/alice/codesegment/OutputDataSegment.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/codesegment/OutputDataSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -7,23 +7,26 @@ import alice.datasegment.SendOption; public class OutputDataSegment { - private boolean compressFlag = false; + private boolean compressFlag = false;//圧縮するかどうか /** * for local */ + /** + * input→ds変更→outputのときコピーを防ぐ + */ public void flip(Receiver receiver) { - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false); } public void flip(Receiver receiver, CommandType type) { switch (type) { case PUT: - DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), null); + DataSegment.getLocal().put(receiver.key, receiver.getReceiveData(), false);//localなら全部false。 break; case UPDATE: - DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), null); + DataSegment.getLocal().update(receiver.key, receiver.getReceiveData(), false); break; default: break; @@ -31,21 +34,17 @@ } public void put(String key, ReceiveData rData) { - DataSegment.getLocal().put(key, rData, null); + DataSegment.getLocal().put(key, rData, false); } public void put(String key, Object val) { ReceiveData rData = new ReceiveData(val, false, false); - put(key, rData); - } - - public void update(String key, ReceiveData rData) { - DataSegment.getLocal().update(key, rData, null); + DataSegment.getLocal().put(key, rData, false); } public void update(String key, Object val) { ReceiveData rData = new ReceiveData(val, false, false); - update(key, rData); + DataSegment.getLocal().update(key, rData, false); } /** @@ -53,58 +52,70 @@ */ public void put(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(false, compressFlag()); - DataSegment.get(managerKey).put(key, rData, option); + DataSegment.get(managerKey).put(key, rData, false); } else { put(key, rData); } } public void put(String managerKey, String key, Object val) { - ReceiveData rData = new ReceiveData(val, false, false); - put(managerKey, key, rData); + if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + ReceiveData rData = new ReceiveData(val, true, false);///false false + DataSegment.get(managerKey).put(key, rData, false); + } else { + put(key, val); + } } public void quickPut(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(true, compressFlag()); - DataSegment.get(managerKey).put(key, rData, option); + DataSegment.get(managerKey).put(key, rData, true); } else { put(key, rData); } } public void quickPut(String managerKey, String key, Object val) { - ReceiveData rData = new ReceiveData(val, false, false); - quickPut(managerKey, key, rData); + if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + ReceiveData rData = new ReceiveData(val, false, false); + DataSegment.get(managerKey).put(key, rData, true); + } else { + put(key, val); + } } public void update(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(false, compressFlag()); - DataSegment.get(managerKey).update(key, rData, option); + DataSegment.get(managerKey).update(key, rData, false); } else { update(key, rData); } } public void update(String managerKey, String key, Object val) { - ReceiveData rData = new ReceiveData(val, false, false); - update(managerKey, key, rData); + if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + ReceiveData rData = new ReceiveData(val, true, false);///false, false + DataSegment.get(managerKey).update(key, rData, false); + } else { + update(key, val); + } } public void quickUpdate(String managerKey, String key, ReceiveData rData) { if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ - SendOption option = new SendOption(true, compressFlag()); - DataSegment.get(managerKey).update(key, rData, option); + DataSegment.get(managerKey).update(key, rData, true); } else { update(key, rData); } } public void quickUpdate(String managerKey, String key, Object val) { - ReceiveData rData = new ReceiveData(val, false, false); - quickUpdate(managerKey, key, rData); + if (!managerKey.equals("local") && DataSegment.contains(managerKey)){ + ReceiveData rData = new ReceiveData(val, false, false); + DataSegment.get(managerKey).update(key, rData, true); + } else { + update(key, val); + } } /** @@ -134,7 +145,7 @@ * "Ping Response" return in this "key" * * @param managerKey - * @param key + * @param returnKey */ public void ping(String managerKey, String returnKey) { if (DataSegment.contains(managerKey)) diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/daemon/CommandMessage.java --- a/src/main/java/alice/daemon/CommandMessage.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/daemon/CommandMessage.java Thu Apr 09 18:36:26 2015 +0900 @@ -2,24 +2,27 @@ import org.msgpack.annotation.Message; +/** + * DSのヘッダー + */ @Message public class CommandMessage { - public int type; - public int index; - public int seq; - public String key; - public boolean quickFlag = false; - public boolean serialized = false; - public boolean compressed = false; + public int type;//PUT, PEEKなどのコマンドタイプ + public int index;//キューの中でのDSの位置? + public int seq;//DSの待ち合わせを行っているCSを表すunique number + public String key;//DS key + public boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか + public boolean serialized = false;//シリアライズされているかどうか + public boolean compressed = false;//圧縮されているかどうか - public boolean setTime = false; - public long time; - public int depth; + public boolean setTime = false;//? + public long time;//? + public int depth;//? public CommandMessage() {} public CommandMessage(int type, int index, int seq, String key - , boolean qFlag, boolean sFlag, boolean cFlag) { + , boolean qFlag, boolean sFlag, boolean cFlag) {//コンストラクタ. setter. this.type = type; this.index = index; this.seq = seq; diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/daemon/Connection.java --- a/src/main/java/alice/daemon/Connection.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/daemon/Connection.java Thu Apr 09 18:36:26 2015 +0900 @@ -62,7 +62,6 @@ if (sendManager) { SendOption option = new SendOption(false, false); DataSegment.get("manager").put("_DISCONNECTNODE", rData, option); - sendManager = false; } } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/daemon/IncomingUdpConnection.java --- a/src/main/java/alice/daemon/IncomingUdpConnection.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/daemon/IncomingUdpConnection.java Thu Apr 09 18:36:26 2015 +0900 @@ -72,7 +72,7 @@ break; case RESPONSE: rData = new ReceiveData(new RespondData(reverseKey, System.currentTimeMillis()), false, false); - DataSegment.getLocal().put(msg.key, rData, null); + DataSegment.getLocal().put(msg.key, rData, false); break; default: break; diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/Command.java --- a/src/main/java/alice/datasegment/Command.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/Command.java Thu Apr 09 18:36:26 2015 +0900 @@ -7,6 +7,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; +import org.apache.log4j.Logger; import org.msgpack.MessagePack; import alice.codesegment.CodeSegment; @@ -14,20 +15,28 @@ import alice.daemon.CommandMessage; import alice.daemon.Connection; +/** + * DSMで使われる各コマンドのセット(ReceiveDataからのDSの読み込み) + */ public class Command { - public CommandType type; + public CommandType type;//PEEK, PUTなどのコマンドタイプ public String key; public Receiver receiver; public ReceiveData rData; - public int index; - public int seq; + public int index;//使ってない。アクセス用のindex。負の遺産。 + public int seq;//DSの待ち合わせを行っているCSを表すunique number。リモート用。対応コマンドを表す。 public Connection connection; // for remote - public BlockingQueue replyQueue; + public BlockingQueue replyQueue;//PEEK/TAKE必要な返り値? public CodeSegment cs; - public String reverseKey; - private boolean quickFlag = false; - private boolean compressFlag = false; + public String reverseKey;//どこからput/updateされたか + private boolean quickFlag = false;//SEDAを挟まずに処理を行うかどうか。trueだとリモート + private boolean compressFlag = false;//trueだったら圧縮する必要がある + private Logger logger = Logger.getLogger("MessagePack"); + + /** + * for PEEK/TAKE + */ public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, BlockingQueue replyQueue, CodeSegment cs, String reverseKey) { this.type = cmdType; this.receiver = receiver; @@ -40,6 +49,9 @@ this.reverseKey = reverseKey; } + /** + * for PUT/UPDATE/REPLY/PING/RESPONSE + */ public Command(CommandType cmdType, Receiver receiver, String key, ReceiveData rData, int index, int seq, CodeSegment cs, String reverseKey, Connection connection) { this.type = cmdType; this.receiver = receiver; @@ -52,6 +64,10 @@ this.reverseKey = reverseKey; } + /** + * String型でコマンドを取得するメソッド。たぶんログ表示用。 + * @return + */ public String getCommandString() { String csName = "null"; if (cs != null) { @@ -63,15 +79,15 @@ /** * @return serialized ByteBuffer */ - public ByteBuffer convert() { + public ByteBuffer convert() {//メッセージパックでbyteArrayに変換 ByteBuffer buf = null; MessagePack msg = SingletonMessage.getInstance(); try { - byte[] header = null; - byte[] data = null; - byte[] dataSize = null; - boolean serialized = false; - boolean compressed = false; + byte[] header = null;//DSのメタデータ用byteArray + byte[] data = null;//DS本体用byteArray + byte[] dataSize = null;//DSのサイズ + boolean serialized = false;//DSがシリアライズ状態かのフラグ + boolean compressed = false;//DSが圧縮されているかのフラグ switch (type) { /* * UPDATE, PUT, REPLY need send DataSegment to RemoteDataSegment @@ -83,50 +99,56 @@ */ case UPDATE: case PUT: - case REPLY: - if (rData.compressed()) { + case REPLY://ReceiveDataからREPLYするDSを取得 + if (rData.compressed()) {//圧縮されている場合:各フラグの状態とDS本体を取得 // have already converted data = (byte[]) rData.getObj(); - compressed = rData.compressed(); // true - serialized = rData.serialized(); - } else { + compressed = rData.compressed(); + serialized = rData.serialized();//シリアライズされているか=Remoteから送られてきたか + } else {//圧縮されていない場合:状態にあわせてDS本体を取得し各フラグを立てる if (!rData.serialized() && !rData.isByteArray()) { - data = msg.write(rData.getObj()); + data = msg.write(rData.getObj());//シリアライズ serialized = true; } else { // rData is RAW ByteArray or already serialized data = (byte[]) rData.getObj(); serialized = rData.serialized(); } - if (compressFlag) { - data = zip(data); + if (compressFlag) {//圧縮する ここはあとで消す + data = rData.zip(data); compressed = true; } } CommandMessage cm = new CommandMessage(type.id, index, seq, key, false, serialized, compressed); - if (rData.setTime) { + if (rData.setTime) {//AliceVNCの計測用(消してもいい) cm.setTime = true; cm.time = rData.time; cm.depth = rData.depth + 1; } + //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体) header = msg.write(cm); dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); buf.put(data); + + //System.out.print("messagePack = " + Integer.toHexString(buf.getInt() & 0xff)); + logger.debug("messagePack = " + Integer.toHexString(buf.getInt() & 0xff)); + break; - default: + default://PEEK, TAKE, RemoveならCommandMessageとそのサイズのみセット header = msg.write(new CommandMessage(type.id, index, seq, key, quickFlag, false, compressFlag)); buf = ByteBuffer.allocate(header.length); buf.put(header); + //System.out.print("messagePack = " + Integer.toHexString(buf.getInt() & 0xff)); break; } buf.flip(); } catch (IOException e) { e.printStackTrace(); - } + }//ここに圧縮機能を入れる return buf; } @@ -137,11 +159,11 @@ * @param flag */ - public void setQuickFlag(boolean flag){ + public void setQuickFlag(boolean flag){//SEDA処理の有無フラグのsetter quickFlag = flag; } - public boolean getQuickFlag(){ + public boolean getQuickFlag(){//SEDA処理の有無フラグのgetter return quickFlag; } @@ -152,20 +174,12 @@ * @param flag */ - public void setCompressFlag(boolean flag){ + public void setCompressFlag(boolean flag){//圧縮フラグのsetter compressFlag = flag; } - public boolean getCompressFlag(){ + public boolean getCompressFlag(){//圧縮フラグのgetter return compressFlag; } - public byte[] zip(byte[] input) throws IOException{ - Deflater deflater = new Deflater(); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater); - dos.write(input); - dos.finish(); - return os.toByteArray(); - } } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/CommandType.java --- a/src/main/java/alice/datasegment/CommandType.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/CommandType.java Thu Apr 09 18:36:26 2015 +0900 @@ -7,16 +7,16 @@ UPDATE, // remove a DataSegment value and put PEEK, TAKE, - REMOVE, - REPLY, + REMOVE,//keyごと消すかもしれない + REPLY,//PEEK/TAKEに対応 CLOSE, FINISH, - PING, - RESPONSE; + PING,//heart beat 用 + RESPONSE;//heart beat 用 - public int id; - public static HashMap hash = new HashMap(); - private static int lastId = 0; + public int id;//コマンドのid + public static HashMap hash = new HashMap();//コマンド対応表 + private static int lastId = 0;//コマンドの総数 private CommandType(int id) { this.id = id; diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/datasegment/CompressedLocalDataSegmentManager.java Thu Apr 09 18:36:26 2015 +0900 @@ -0,0 +1,27 @@ +package alice.datasegment; + +import alice.codesegment.CodeSegment; +import org.apache.log4j.Logger; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Created by e125769 on 4/8/15. + */ +public class CompressedLocalDataSegmentManager extends LocalDataSegmentManager{ + private String reverseKey = "compressedLocal"; + //private ConcurrentHashMap dataSegments = new ConcurrentHashMap(); + private Logger logger = Logger.getLogger("compressedLocal"); + + + @Override + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) {//とりあえずコピペ + DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); + int seq = this.seq.getAndIncrement(); + Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); + dataSegmentKey.runCommand(cmd); + if (logger.isDebugEnabled()) + logger.debug(cmd.getCommandString()); + } + +} diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/DataSegment.java --- a/src/main/java/alice/datasegment/DataSegment.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -9,6 +9,7 @@ private static DataSegment dataSegment = new DataSegment(); private LocalDataSegmentManager local = new LocalDataSegmentManager(); + private CompressedLocalDataSegmentManager compressedLocal = new CompressedLocalDataSegmentManager();//追加 private ConcurrentHashMap dataSegmentManagers = new ConcurrentHashMap(); //TODO Over Head private ConcurrentHashMap acceptHash = new ConcurrentHashMap(); @@ -24,6 +25,10 @@ return dataSegment.local; } + public static CompressedLocalDataSegmentManager getCompressedLocal() {//追加 + return dataSegment.compressedLocal; + } + public static void register(String key, DataSegmentManager manager) { dataSegment.dataSegmentManagers.put(key, manager); } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/DataSegmentKey.java --- a/src/main/java/alice/datasegment/DataSegmentKey.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentKey.java Thu Apr 09 18:36:26 2015 +0900 @@ -6,6 +6,8 @@ import alice.datasegment.Command; /** + * ここがコマンドの中身部分 + * * Synchronized DataSegment for each DataSegment key * @author kazz * diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/DataSegmentManager.java --- a/src/main/java/alice/datasegment/DataSegmentManager.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/DataSegmentManager.java Thu Apr 09 18:36:26 2015 +0900 @@ -18,7 +18,7 @@ protected Runnable replyThread = new Runnable() { Logger logger = Logger.getLogger("reply"); @Override - public void run() { + public void run() {//SEDAのREPLYスレッドのなごり。消してもいい。 while (true) { try { Command reply = replyQueue.take(); @@ -50,10 +50,11 @@ } } - public abstract void put(String key, ReceiveData rData, SendOption option); - public abstract void update(String key, ReceiveData rData, SendOption option); - public abstract void peek(Receiver receiver, CodeSegment cs, SendOption option); - public abstract void take(Receiver receiver, CodeSegment cs, SendOption option); + //各コマンドの抽象クラス + public abstract void put(String key, ReceiveData rData, boolean quickFlag); + public abstract void update(String key, ReceiveData rData, boolean quickFlag); + public abstract void peek(Receiver receiver, CodeSegment cs, boolean quickFlag); + public abstract void take(Receiver receiver, CodeSegment cs, boolean quickFlag); public abstract void remove(String key); public abstract void shutdown(); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/LocalDataSegmentManager.java --- a/src/main/java/alice/datasegment/LocalDataSegmentManager.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/LocalDataSegmentManager.java Thu Apr 09 18:36:26 2015 +0900 @@ -9,6 +9,10 @@ import alice.codesegment.CodeSegment; +/** + * localのDSに対する処理。DS自体は持っていない。→ReceivedData + * DataSegmentKey.runCommandに渡してコマンドを実行する。 + */ public class LocalDataSegmentManager extends DataSegmentManager { private String reverseKey = "local"; @@ -21,6 +25,7 @@ TimeUnit.SECONDS, new LinkedBlockingQueue()); + //コンストラクタ。スレッドが走る。 public LocalDataSegmentManager() { new Thread(replyThread, "LocalDataSegmentManager-replyCommand").start(); } @@ -70,7 +75,7 @@ } @Override - public void put(String key, ReceiveData rData, SendOption option) { + public void put(String key, ReceiveData rData, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -78,12 +83,16 @@ logger.debug(cmd.getCommandString()); } + public void put(String key, ReceiveData rData) { + this.put(key, rData, false); + } + /** * Enqueue update command to the queue of each DataSegment key */ @Override - public void update(String key, ReceiveData rData, SendOption option) { + public void update(String key, ReceiveData rData, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, reverseKey); dataSegmentKey.runCommand(cmd); @@ -92,7 +101,7 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, SendOption option) { + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); @@ -102,7 +111,7 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); @@ -111,6 +120,7 @@ logger.debug(cmd.getCommandString()); } + //このコマンドは? @Override public void remove(String key) { DataSegmentKey dataSegmentKey = getDataSegmentKey(key); @@ -129,6 +139,7 @@ } + //? public void recommand(Receiver receiver, CodeSegment cs) { DataSegmentKey dataSegmentKey = getDataSegmentKey(receiver.key); int seq = this.seq.getAndIncrement(); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/ReceiveData.java --- a/src/main/java/alice/datasegment/ReceiveData.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/ReceiveData.java Thu Apr 09 18:36:26 2015 +0900 @@ -2,85 +2,102 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; -import java.util.zip.InflaterOutputStream; +import java.nio.ByteBuffer; +import java.util.zip.*; +import org.apache.log4j.Logger; import org.msgpack.type.Value; - import alice.codesegment.SingletonMessage; +/** + * 送られてきたDSを一時的に取っておくクラス。inputでも使用。 + */ public class ReceiveData { - private Object val; + private Object val;//Object型のDS + private byte[] messagePack;//byteArray(serialized)のDS + private byte[] zMessagePack;//byteArray(compressed)のDS + private Class clazz; + private Logger logger = Logger.getLogger("MessagePack"); - // both flag have to be true or false except DataSegment is byteArray; - private boolean compressed = false; - private boolean serialized = false; - private boolean byteArray = false; - - public long time; + public long time;//測定用 public boolean setTime = false; public int depth = 1; - public ReceiveData(Object obj, boolean cFlag, boolean sFlag){ - val = obj; - compressed = cFlag; - serialized = sFlag; - } - - public ReceiveData(byte[] obj, boolean cFlag, boolean sFlag){ + /** + * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。 + * put/update用? + * @param obj DS本体(Object) + */ + public ReceiveData(Object obj, boolean cFlag, boolean sFlag) { val = obj; - byteArray = true; - compressed = cFlag; - serialized = sFlag; - } - - public boolean isByteArray(){ - return byteArray; - } + if (cFlag){ + messagePack = (byte[])val; + ByteBuffer buf = null; - public boolean compressed(){ - return compressed; - } - - public boolean serialized(){ - return serialized; - } - - public Object getObj(){ - return val; - } - - public String asString(){ - if (serialized){ - return asClass(String.class); - } else { - return (String) val; + try { + zMessagePack = zip(messagePack); + buf = ByteBuffer.allocate(zMessagePack.length + 1); + buf.put((byte) 0xc1); + buf.put(zMessagePack); + zMessagePack = buf.array(); + } catch (IOException e) { + e.printStackTrace(); + } } } - public int asInteger(){ - if (serialized){ - return asClass(Integer.class); + /** + * コンストラクタ。byteArray型のDSと圧縮のメタ情報を受け取り、byteArrayフラグを立てる。 + * + * @param messagePack DS本体(byteArray) + */ + public ReceiveData(byte[] messagePack) { + + if (messagePack[0] == 0xc1){ + this.zMessagePack = messagePack; } else { - return (Integer) val; + this.messagePack = messagePack; } + + logger.debug(this.messagePack); + logger.debug(this.zMessagePack); + } + + + public boolean isByteArray(){//byteArrayフラグの状態を取得するメソッド + return messagePack != null; } - public Float asFloat(){ - if (serialized){ - return asClass(Float.class); - } else { - return (Float) val; - } + public boolean compressed(){//compressedフラグの状態を取得するメソッド + return zMessagePack != null; + } + + public boolean serialized(){//serializedフラグの状態を取得するメソッド + return val != null; + } + + public Object getObj(){//Object型のDS本体を取得するメソッド。 + return asClass(Object.class); } - public Value getVal(){ - if (serialized){ + public String asString(){//String型としてDSを取得するメソッド。DSがシリアライズされていればStringクラスを返す。 + return asClass(String.class); + } + + public int asInteger(){//Int型としてDSを取得するメソッド。DSがシリアライズされていればIntクラスを返す。 + return asClass(Integer.class); + } + + public Float asFloat(){//Float型としてDSを取得するメソッド。DSがシリアライズされていればFloatクラスを返す。 + return asClass(Float.class); + } + + public Value getVal(){//Value型としてDSを取得するメソッド + if (val == null){//もとはval != null return asClass(Value.class); } else { try { - return SingletonMessage.getInstance().unconvert(val); + return SingletonMessage.getInstance().unconvert(val);//MassagePackでvalue型に変換。できなければnullを返す。 } catch (IOException e) { e.printStackTrace(); } @@ -88,30 +105,50 @@ } } - @SuppressWarnings("unchecked") - public T asClass(Class clazz) { + /** + * DSを任意の型で取得するメソッド。 + * DSがbyteArrayでなければ指定された型に変換して返す。 + * DSがbyteArrayなら解凍状態にして指定された型に変換して返す。 + * + * @param clazz + * @param + * @return + */ + public T asClass(Class clazz) {//javasist try { - if (!byteArray) { + if (val != null) { return (T) val; } - byte[] b = null; - if (compressed) { - b = unzip((byte[]) val); - } else { - b = (byte[]) val; + byte[] b = messagePack; + + if (zMessagePack != null && messagePack == null) { + logger.debug("zMessagePack = " + zMessagePack); + messagePack = unzip(zMessagePack); + b = messagePack; } - if (serialized) { + logger.debug("MessagePack = " + messagePack); + + if (val == null) { + this.clazz = clazz; return SingletonMessage.getInstance().read(b, clazz); } else { - return (T) b; + return (T) val; } - } catch (IOException | DataFormatException e) { + } catch (Exception e) { e.printStackTrace(); return null; } } + /** + * java.util.zip.Inflater(zlib)を使ってbyteArray型のDSを解凍する。 + * + * @param input 圧縮されたbyteArray型のDS + * @return 解凍したbyteArray型DS + * @throws IOException + * @throws DataFormatException + */ public byte[] unzip(byte[] input) throws IOException, DataFormatException{ Inflater inflater = new Inflater(); ByteArrayOutputStream os = new ByteArrayOutputStream(); @@ -121,4 +158,20 @@ return os.toByteArray(); } + /** + * java.util.zip.Deflater(zlib)を使ってbyteArray型のDSを圧縮する。 + * + * @param input 非圧縮状態のbyteArray型のDS + * @return 圧縮したbyteArray型DS + * @throws IOException + */ + public byte[] zip(byte[] input) throws IOException{ + Deflater deflater = new Deflater(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DeflaterOutputStream dos = new DeflaterOutputStream(os, deflater); + dos.write(input); + dos.finish(); + return os.toByteArray(); + } + } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/Receiver.java --- a/src/main/java/alice/datasegment/Receiver.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/Receiver.java Thu Apr 09 18:36:26 2015 +0900 @@ -4,6 +4,7 @@ import alice.codesegment.InputDataSegment; /** + * idsコマンド(create, setKey)の処理をする。コマンドの定義はids/LDSM内にある。 * MessagePack implementation and DataSegment Receiver * @author kazz * @@ -14,19 +15,26 @@ public int index; public String from; public CommandType type; - public String managerKey; + public String managerKey;//localかremoteか? public String key; + public boolean compressedFlag = false; public Receiver(InputDataSegment ids, CommandType type) { this.ids = ids; this.type = type; ids.register(); + } + public Receiver(InputDataSegment ids, CommandType type, boolean compressedFlag) { + this.ids = ids; + this.type = type; + ids.register(); + this.compressedFlag = compressedFlag; } public void setQuickKey(String managerKey, String key){ setQuickKey(managerKey, key, 0); - } + }//? public void setQuickKey(String managerKey, String key, int index) { this.managerKey = managerKey; @@ -85,6 +93,7 @@ ids.setKey(); } + //以下各型でDS本体を受け取る public void setData(ReceiveData r) { data = r; } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/RemoteDataSegmentManager.java --- a/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/RemoteDataSegmentManager.java Thu Apr 09 18:36:26 2015 +0900 @@ -18,7 +18,7 @@ public RemoteDataSegmentManager(){} - public RemoteDataSegmentManager(String connectionKey, final String reverseKey, final String hostName, final int port) { + public RemoteDataSegmentManager(final String connectionKey, final String reverseKey, final String hostName, final int port) { logger = Logger.getLogger(connectionKey); connection = new Connection(); connection.name = connectionKey; @@ -58,10 +58,10 @@ * send put command to target DataSegment */ @Override - public void put(String key, ReceiveData rData, SendOption option) { - Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.isCompress()); - if (option.isQuick()){ + public void put(String key, ReceiveData rData, boolean quickFlag) { + Command cmd = new Command(CommandType.PUT, null, key, rData, 0, 0, null, null, "");///set compressed flag + //cmd.setCompressFlag(option.isCompress()); + if (quickFlag){ connection.write(cmd); // put command is executed right now } else { connection.sendCommand(cmd); // put command on the transmission thread @@ -71,10 +71,10 @@ } @Override - public void update(String key, ReceiveData rData, SendOption option) { + public void update(String key, ReceiveData rData, boolean quickFlag) { Command cmd = new Command(CommandType.UPDATE, null, key, rData, 0, 0, null, null, ""); - cmd.setCompressFlag(option.isCompress()); - if (option.isQuick()){ + //cmd.setCompressFlag(option.isCompress()); + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -84,12 +84,12 @@ } @Override - public void take(Receiver receiver, CodeSegment cs, SendOption option) { + public void take(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.TAKE, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(option.isQuick()); + cmd.setQuickFlag(quickFlag); seqHash.put(seq, cmd); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); @@ -99,12 +99,12 @@ } @Override - public void peek(Receiver receiver, CodeSegment cs, SendOption option) { + public void peek(Receiver receiver, CodeSegment cs, boolean quickFlag) { int seq = this.seq.getAndIncrement(); Command cmd = new Command(CommandType.PEEK, receiver, receiver.key, null, receiver.index, seq, replyQueue, cs, null); - cmd.setQuickFlag(option.isQuick()); + cmd.setQuickFlag(quickFlag); seqHash.put(seq, cmd); - if (option.isQuick()){ + if (quickFlag){ connection.write(cmd); } else { connection.sendCommand(cmd); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/datasegment/SendOption.java --- a/src/main/java/alice/datasegment/SendOption.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/datasegment/SendOption.java Thu Apr 09 18:36:26 2015 +0900 @@ -1,5 +1,8 @@ package alice.datasegment; +/** + * フラグを一時的に格納するだけ。たぶんリモート用。 + */ public class SendOption { private boolean quick = false; private boolean compress = false; diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/test/codesegment/local/TestCodeSegment.java --- a/src/main/java/alice/test/codesegment/local/TestCodeSegment.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/test/codesegment/local/TestCodeSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -8,14 +8,14 @@ public class TestCodeSegment extends CodeSegment { // create input datasegment arg1 - Receiver arg1 = ids.create(CommandType.PEEK); + Receiver arg1 = ids.create(CommandType.PEEK);//peek用Receiverを生成 @Override public void run() { System.out.println("type = " + arg1.type); System.out.println("index = " + arg1.index); System.out.println("data = " + arg1.getVal()); - System.out.println(((Value)arg1.getVal()).getType()); + System.out.println(((Value)arg1.getVal()).getType());//←伝統。なくてもいい。 if (arg1.index == 10) { System.exit(0); @@ -23,7 +23,7 @@ } TestCodeSegment cs = new TestCodeSegment(); - cs.arg1.setKey("key1", arg1.index); + cs.arg1.setKey("key1", arg1.index);//Receiverに値をpeekしてくる // DataSegment.get("local").update ods.update("local", "key1", "String data"); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/test/codesegment/local/factorial/FactorialCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/test/codesegment/local/factorial/FactorialCodeSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -0,0 +1,46 @@ +package alice.test.codesegment.local.factorial; + +import alice.codesegment.CodeSegment; +import alice.datasegment.CommandType; +import alice.datasegment.Receiver; + +/** + * Created by e125769 on 2/24/15. + */ +public class FactorialCodeSegment extends CodeSegment { + + Receiver input[]; + + public FactorialCodeSegment(int n, String key){//5個Receiverを作る + input = new Receiver[n]; + for (int i = 0; i < n; i++){ + input[i] = ids.create(CommandType.TAKE);//take用Receiverを生成 + } + for (int i = 0; i < n; i++){//createと同じループではだめ。修論参照。 + input[i].setKey(key); + } + } + + @Override + public void run() {//main. + System.out.println("run FactorialCodeSegment"); + + long result = 1; + + for (int i = 0; i < input.length; i++){//わける + result *= input[i].asClass(Long.class);//(int)r.getObj(); , asInteger Remoteは別。byteArrayだから + } + + System.out.println(input[input.length - 1].asClass(Long.class) + "まで: " + result); + + ods.put("key1", result); + + if (input[0].key.equals("key1")){ + new FactorialCodeSegment(4, "key1");//setKeyよりあとに操作しない + } + + System.exit(0); + + } + +} diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/test/codesegment/local/factorial/StartCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/alice/test/codesegment/local/factorial/StartCodeSegment.java Thu Apr 09 18:36:26 2015 +0900 @@ -0,0 +1,38 @@ +package alice.test.codesegment.local.factorial; + +import alice.codesegment.CodeSegment; +import alice.test.codesegment.local.TestCodeSegment; + +/** + * Created by e125769 on 2/24/15. + */ +public class StartCodeSegment extends CodeSegment{ + + @Override + public void run() {//データをセット&表示をするCS + System.out.println("run StartCodeSegment"); + + FactorialCodeSegment cs[] = new FactorialCodeSegment[4];//計算するCSを用意 + + System.out.println("create FactorialCodeSegment"); + + for (int i = 0; i < cs.length; i++){//4回CSを走らせている + cs[i] = new FactorialCodeSegment(5, "key" + i);//配列にCSインスタンスを代入 newした時点でコンストラクタのsetKeyでデータを取りに行く + } + + for (long i = 1; i <= 20; i++) {//1~100までを4つのキューに格納 + if (i <= 5){//一応パイプライン状に走る + ods.put("local", "key0", i); + } else if (5 < i && i <= 10){ + ods.put("local", "key1", i); + } else if (10 < i && i <= 15){ + ods.put("local", "key2", i); + } else{ + ods.put("local", "key3", i); + } + } + + + } + +} diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java --- a/src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/test/codesegment/remote/TestRemoteAlice.java Thu Apr 09 18:36:26 2015 +0900 @@ -6,9 +6,9 @@ public class TestRemoteAlice { public static void main(String[] args) { - TestRemoteConfig conf = new TestRemoteConfig(args); + TestRemoteConfig conf = new TestRemoteConfig(args);//トポロジー設定をコマンドライン引数からとって設定? - new AliceDaemon(conf).listen(); + new AliceDaemon(conf).listen();//構成開始?TopMはない DataSegment.connect(conf.key, "", conf.hostname, conf.connectPort); new RemoteStartCodeSegment().execute(); } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/fix/FixTopology.java --- a/src/main/java/alice/topology/fix/FixTopology.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/fix/FixTopology.java Thu Apr 09 18:36:26 2015 +0900 @@ -6,7 +6,6 @@ import alice.codesegment.CodeSegment; import alice.daemon.ConnectionInfo; import alice.datasegment.CommandType; -import alice.datasegment.DataSegment; import alice.datasegment.Receiver; import alice.topology.HostMessage; import alice.topology.manager.Parent; @@ -47,7 +46,6 @@ } if (lostNode!=null && lostNode.isAlive()) { - DataSegment.remove(lostNodeName); // change state not Alive lostNode.alive = false; // get lastJoinedNode diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/manager/CheckComingHost.java --- a/src/main/java/alice/topology/manager/CheckComingHost.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/manager/CheckComingHost.java Thu Apr 09 18:36:26 2015 +0900 @@ -1,7 +1,6 @@ package alice.topology.manager; import java.util.HashMap; - import alice.codesegment.CodeSegment; import alice.datasegment.CommandType; import alice.datasegment.Receiver; @@ -11,12 +10,10 @@ // checkIncomingHost private Receiver host = ids.create(CommandType.TAKE); private Receiver absCookieTable = ids.create(CommandType.PEEK); // cookie, AbsName HashMap - private Receiver config = ids.create(CommandType.PEEK); public CheckComingHost(){ this.host.setKey("host"); this.absCookieTable.setKey("absCookieTable"); - this.config.setKey("TMConfig"); } @Override @@ -24,7 +21,6 @@ HostMessage host = this.host.asClass(HostMessage.class); @SuppressWarnings("unchecked") HashMap absCookieTable = this.absCookieTable.asClass(HashMap.class); - TopologyManagerConfig conf = this.config.asClass(TopologyManagerConfig.class); boolean match = false; // check cookie if (host.cookie != null) { @@ -38,16 +34,8 @@ if (match){ // coming host has ever joined this App ods.put("reconnectHost", host); - if (conf.dynamic) { //dynamic topology - if (conf.type == TopologyType.Tree) { - ods.put("orderHash", "order"); - ods.put("newHost", host); - } - } else { // static topology - new SearchHostName(); - } + new SearchHostName(); } else { - host.cookie = null; ods.put("orderHash", "order"); ods.put("newHost", host); } diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/manager/CreateTreeTopology.java --- a/src/main/java/alice/topology/manager/CreateTreeTopology.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/manager/CreateTreeTopology.java Thu Apr 09 18:36:26 2015 +0900 @@ -31,6 +31,7 @@ @Override public void run() { String cookie = info3.asString(); + System.out.println(cookie); HostMessage host = info.asClass(HostMessage.class); int comingHostCount = info1.asInteger(); ParentManager manager = info6.asClass(ParentManager.class); @@ -38,29 +39,15 @@ HashMap nameTable = info2.asClass(HashMap.class); HashMap absCookieTable = info4.asClass(HashMap.class); - String nodeName = null; - int count = comingHostCount; - boolean reconnect = false; - if (host.absName!=null) { - System.out.println(host.cookie); - cookie = host.cookie; - nodeName = host.absName; - System.out.println(nodeName); - reconnect = true; - } else { - System.out.println(cookie); - nodeName = "node"+comingHostCount; - absCookieTable.put(cookie, nodeName); - count++; - } + String nodeName = "node"+comingHostCount; // Manager connect to Node DataSegment.connect(nodeName, nodeName, host.name, host.port); ods.put(nodeName, "host", nodeName); ods.put(nodeName, "cookie", cookie); - ods.put(nodeName, "reconnect", reconnect); + absCookieTable.put(cookie, nodeName); ods.put(info4.key, absCookieTable); - ods.put(info1.key, count); + ods.put(info1.key, comingHostCount+1); host.alive = true; nameTable.put(nodeName, host); manager.register(nodeName); @@ -75,8 +62,7 @@ newHost.absName = parentAbsName; newHost.remoteAbsName = nodeName; // address ods.put(newHost.remoteAbsName, newHost); - if (reconnect) - ods.put(newHost.remoteAbsName, newHost); + ods.put("nodeInfo", newHost); new RecodeTopology(); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/manager/StartTopologyManager.java --- a/src/main/java/alice/topology/manager/StartTopologyManager.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/manager/StartTopologyManager.java Thu Apr 09 18:36:26 2015 +0900 @@ -32,7 +32,6 @@ public void run() { new CheckComingHost(); ods.put("absCookieTable", new HashMap()); - ods.put("TMConfig", conf); if (!conf.dynamic) { LinkedList nodeNames = new LinkedList(); @@ -96,7 +95,7 @@ ods.put("running", true); HashMap nameTable = new HashMap(); - // if you want to add Topology, add code here + if (conf.type == TopologyType.Tree) { int cominghostCount = 0; ParentManager manager = new ParentManager(conf.hasChild); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/ClosedEventManager.java --- a/src/main/java/alice/topology/node/ClosedEventManager.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/ClosedEventManager.java Thu Apr 09 18:36:26 2015 +0900 @@ -6,7 +6,6 @@ import alice.datasegment.CommandType; import alice.datasegment.Receiver; -@SuppressWarnings("rawtypes") public class ClosedEventManager extends CodeSegment{ private Receiver info = ids.create(CommandType.PEEK); @@ -25,12 +24,14 @@ info.setKey("_DISCONNECT"); } + @SuppressWarnings("rawtypes") @Override public void run() { new ExecuteEvent(); setKey(); } + @SuppressWarnings("rawtypes") public synchronized void register(Class clazz) { ods.put("_REGISTERDEVENT", clazz); new RegisterEvent(); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/ConfigurationFinish.java --- a/src/main/java/alice/topology/node/ConfigurationFinish.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/ConfigurationFinish.java Thu Apr 09 18:36:26 2015 +0900 @@ -25,8 +25,6 @@ ods.put("manager", "done", ValueFactory.createNilValue()); Start cs = new Start(startCS); cs.done.setKey("manager", "start"); - cs.config.setKey("TNConfig"); - cs.reconnect.setKey("reconnect"); new StartKeepAlive().execute(); new ReceiveCloseMessage(CommandType.PEEK); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/IncomingConnectionInfo.java --- a/src/main/java/alice/topology/node/IncomingConnectionInfo.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/IncomingConnectionInfo.java Thu Apr 09 18:36:26 2015 +0900 @@ -31,9 +31,11 @@ DataSegment.connect(hostInfo.connectionName, hostInfo.reverseName, hostInfo.name, hostInfo.port); ods.put(hostInfo.connectionName, "reverseKey", hostInfo.reverseName); count++; - ods.put("connection"+hostInfo.connectionName, hostInfo.connectionName); - new PrepareToAddList(hostInfo.connectionName); + + ods.put("cMember", hostInfo.connectionName); + new CreateConnectionList(); } + } IncomingConnectionInfo cs = new IncomingConnectionInfo(absName, count); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/IncomingReverseKey.java --- a/src/main/java/alice/topology/node/IncomingReverseKey.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/IncomingReverseKey.java Thu Apr 09 18:36:26 2015 +0900 @@ -19,7 +19,7 @@ int reverseCount = this.reverseCount.asInteger(); reverseCount++; ods.update("local", "reverseCount", reverseCount); - ods.put("connection"+reverseKey, reverseKey); + IncomingReverseKey cs = new IncomingReverseKey(); cs.reverseKey.setKey("local", "reverseKey"); cs.reverseCount.setKey("local", "reverseCount"); diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/Start.java --- a/src/main/java/alice/topology/node/Start.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/Start.java Thu Apr 09 18:36:26 2015 +0900 @@ -9,8 +9,7 @@ public class Start extends CodeSegment { public Receiver done = ids.create(CommandType.PEEK); - public Receiver reconnect = ids.create(CommandType.PEEK); - public Receiver config = ids.create(CommandType.PEEK); + private Logger logger = Logger.getLogger(Start.class); private CodeSegment startCS; @@ -22,11 +21,6 @@ @Override public void run() { logger.info("Configuration finished."); - boolean reconncet = this.reconnect.asClass(boolean.class); - if (reconncet) { - TopologyNodeConfig tnconfig = this.config.asClass(TopologyNodeConfig.class); - tnconfig.executeEvent(); - } if (startCS == null) return; diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/StartTopologyNode.java --- a/src/main/java/alice/topology/node/StartTopologyNode.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/StartTopologyNode.java Thu Apr 09 18:36:26 2015 +0900 @@ -20,7 +20,6 @@ @Override public void run() { - ods.put("TNConfig", conf); DataSegment.connect("manager", "manager", conf.getManagerHostName(), conf.getManagerPort()); String localHostName = null; try { diff -r b8fe79f0c764 -r 145c425db88d src/main/java/alice/topology/node/TopologyNodeConfig.java --- a/src/main/java/alice/topology/node/TopologyNodeConfig.java Fri Jan 23 23:53:36 2015 +0900 +++ b/src/main/java/alice/topology/node/TopologyNodeConfig.java Thu Apr 09 18:36:26 2015 +0900 @@ -1,16 +1,12 @@ package alice.topology.node; -import java.util.ArrayList; - import alice.daemon.Config; -@SuppressWarnings("rawtypes") public class TopologyNodeConfig extends Config { private String managerHostName; private int managerPort = 10000; public String cookie; - private ArrayList eventList = new ArrayList(); public TopologyNodeConfig(String[] args) { super(args); @@ -41,17 +37,4 @@ this.managerPort = managerPort; } - public void register(Class clazz) { - if ("CodeSegment".equals(clazz.getSuperclass().getSimpleName())) - eventList.add(clazz); - } - - public void executeEvent() { - for (Class clazz : eventList) - try { - clazz.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - e.printStackTrace(); - } - } }