# HG changeset patch # User one # Date 1326351748 -32400 # Node ID 78b415d019de005a624ab03534aa15c7ea1b528a # Parent 352eb19d837d6bdc65b8c2cd7556aa916f2aea53 Local DS and CS work! maybe... diff -r 352eb19d837d -r 78b415d019de src/alice/codesegment/CodeSegment.java --- a/src/alice/codesegment/CodeSegment.java Thu Jan 12 13:48:34 2012 +0900 +++ b/src/alice/codesegment/CodeSegment.java Thu Jan 12 16:02:28 2012 +0900 @@ -6,7 +6,6 @@ public InputDataSegment ids = new InputDataSegment(this); - public abstract void prepare(); public abstract void run(); } diff -r 352eb19d837d -r 78b415d019de src/alice/codesegment/CodeSegmentManager.java --- a/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 +++ b/src/alice/codesegment/CodeSegmentManager.java Thu Jan 12 16:02:28 2012 +0900 @@ -1,31 +1,28 @@ package alice.codesegment; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import alice.datasegment.Reply; - public class CodeSegmentManager { private static CodeSegmentManager instance = new CodeSegmentManager(); - private LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); - private ConcurrentHashMap seqHash = new ConcurrentHashMap(); + public LinkedBlockingQueue readyQueue = new LinkedBlockingQueue(); private CodeSegmentManager() { - Runnable replyThread = new Runnable() { + Runnable prepareThread = new Runnable() { @Override public void run() { while (true) { try { - Reply reply = replyQueue.take(); - + CodeSegment cs = readyQueue.take(); + cs.run(); } catch (InterruptedException e) { e.printStackTrace(); } } - } + }; + new Thread(prepareThread).start(); } public void create(CodeSegment cs) { diff -r 352eb19d837d -r 78b415d019de src/alice/codesegment/InputDataSegment.java --- a/src/alice/codesegment/InputDataSegment.java Thu Jan 12 13:48:34 2012 +0900 +++ b/src/alice/codesegment/InputDataSegment.java Thu Jan 12 16:02:28 2012 +0900 @@ -36,5 +36,16 @@ public void reply(String key, DataSegmentValue val) { inputDataSegments.put(key, val); + if (count.decrementAndGet() == 0) { + try { + CodeSegmentManager.get().readyQueue.put(cs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public DataSegmentValue get(String argKey) { + return inputDataSegments.get(argKey); } } diff -r 352eb19d837d -r 78b415d019de src/alice/datasegment/LocalDataSegmentManager.java --- a/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 13:48:34 2012 +0900 +++ b/src/alice/datasegment/LocalDataSegmentManager.java Thu Jan 12 16:02:28 2012 +0900 @@ -33,8 +33,9 @@ private DataSegmentKey getDataSegmentKey(String key) { DataSegmentKey newDataSegmentKey = new DataSegmentKey(); DataSegmentKey dataSegmentKey = dataSegments.putIfAbsent(key, newDataSegmentKey); - if (dataSegmentKey == newDataSegmentKey) { + if (dataSegmentKey == null) { newDataSegmentKey.runKeyThread(); + dataSegmentKey = newDataSegmentKey; } return dataSegmentKey; } diff -r 352eb19d837d -r 78b415d019de src/alice/test/codesegment/StartCodeSegment.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/StartCodeSegment.java Thu Jan 12 16:02:28 2012 +0900 @@ -0,0 +1,23 @@ +package alice.test.codesegment; + +import org.msgpack.type.Value; +import org.msgpack.type.ValueFactory; + +import alice.codesegment.CodeSegment; +import alice.datasegment.DataSegment; + +public class StartCodeSegment extends CodeSegment { + + @Override + public void run() { + System.out.println("run StartCodeSegment"); + + TestCodeSegment tcs = new TestCodeSegment(); + tcs.ids.peek("arg1", "local", "key1"); + System.out.println("create TestCodeSegment"); + + Value val = ValueFactory.createRawValue("String data"); + DataSegment.get("local").put("key1", val); + } + +} diff -r 352eb19d837d -r 78b415d019de src/alice/test/codesegment/TestAlice.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/alice/test/codesegment/TestAlice.java Thu Jan 12 16:02:28 2012 +0900 @@ -0,0 +1,17 @@ +package alice.test.codesegment; + +import alice.codesegment.CodeSegment; +import alice.codesegment.CodeSegmentManager; + +public class TestAlice { + public static void main(String args[]) { + CodeSegment scs = new StartCodeSegment(); + try { + CodeSegmentManager.get().readyQueue.put(scs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + +} diff -r 352eb19d837d -r 78b415d019de src/alice/test/codesegment/TestCodeSegment.java --- a/src/alice/test/codesegment/TestCodeSegment.java Thu Jan 12 13:48:34 2012 +0900 +++ b/src/alice/test/codesegment/TestCodeSegment.java Thu Jan 12 16:02:28 2012 +0900 @@ -1,22 +1,15 @@ package alice.test.codesegment; import alice.codesegment.CodeSegment; +import alice.datasegment.DataSegmentValue; public class TestCodeSegment extends CodeSegment { - public TestCodeSegment() { - - } - - @Override - public void prepare() { - ids.peek("arg1", "local", "key1"); - } - @Override public void run() { - - + DataSegmentValue data = ids.get("arg1"); + System.out.println("index = " + data.index); + System.out.println("data = " + data.val.toString()); } }