changeset 190:2d0d41b648fa

add PromiseProposal and refactor acceptor
author akahori
date Tue, 12 Feb 2019 16:10:15 +0900
parents 9b0a7f8dde81
children 3a4f77778d90
files src/main/java/christie/test/topology/paxos/acceptor/AcceptCodeGear.java src/main/java/christie/test/topology/paxos/acceptor/AcceptorCodeGear.java src/main/java/christie/test/topology/paxos/acceptor/PromiseCodeGear.java src/main/java/christie/test/topology/paxos/acceptor/PromiseProposal.java
diffstat 4 files changed, 90 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/christie/test/topology/paxos/acceptor/AcceptCodeGear.java	Tue Feb 12 11:12:48 2019 +0900
+++ b/src/main/java/christie/test/topology/paxos/acceptor/AcceptCodeGear.java	Tue Feb 12 16:10:15 2019 +0900
@@ -10,8 +10,6 @@
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 public class AcceptCodeGear extends CodeGear {
 
@@ -21,16 +19,26 @@
     @Take
     Proposal acceptProposal;
 
-    @Take
-    Proposal promisedProposal;
+
+    //PromiseProposal promiseProposal;
+
+    //@Take
+    //Proposal promisedProposal;
 
     Logger logger = LogManager.getLogger(AcceptCodeGear.class);
 
+
     @Override
     protected void run(CodeGearManager cgm) {
         List<String> _CLIST = topoDG.getConnectionList();
 
+        // promise codegearでも使うので, lock
+        PromiseProposal.getInstance().lock();
+        PromiseProposal promiseProposal = PromiseProposal.getInstance();
+        Proposal promisedProposal = promiseProposal.getPromiseProposal();
+
         if(acceptProposal.getNumber() >= promisedProposal.getNumber()) {
+            logger.debug(acceptProposal + " >= " + promisedProposal);
             acceptProposal.setAccepted(true);
             acceptProposal.setAcceptorName(topoDG.getNodeName());
 
@@ -38,16 +46,20 @@
 
             for (String connectionNodeName : _CLIST){
                 if(connectionNodeName.matches("learner" + ".*")) {
-                    logger.debug("send " + connectionNodeName + " " + promisedProposal);
+                    //logger.debug("send " + topoDG.getNodeName() + " to "
+                    // + connectionNodeName + " " + promisedProposal);
                     getDGM(connectionNodeName).put("acceptedProposal", promisedProposal);
                 }
             }
         }
-
+        promiseProposal.setPromiseProposal(promisedProposal);
+        promiseProposal.unLock();
+        logger.debug("accept " + topoDG.getNodeName() + " " + promisedProposal);
         put(acceptProposal.getProposerName(), "acceptedProposal", promisedProposal);
         put("promisedProposal", promisedProposal);
 
         cgm.setup(new AcceptCodeGear());
+    }
 
-    }
+
 }
--- a/src/main/java/christie/test/topology/paxos/acceptor/AcceptorCodeGear.java	Tue Feb 12 11:12:48 2019 +0900
+++ b/src/main/java/christie/test/topology/paxos/acceptor/AcceptorCodeGear.java	Tue Feb 12 16:10:15 2019 +0900
@@ -10,8 +10,13 @@
 
     @Override
     protected void run(CodeGearManager cgm) {
+        PromiseProposal promiseProposal = PromiseProposal.getInstance();
+        promiseProposal.setPromiseProposal(new Proposal());
         cgm.setup(new PromiseCodeGear());
         cgm.setup(new AcceptCodeGear());
-        put("promisedProposal", new Proposal());
+
+        /*cgm.setup(new PromiseCodeGear());
+        cgm.setup(new AcceptCodeGear());
+        put("promisedProposal", new Proposal());*/
     }
 }
--- a/src/main/java/christie/test/topology/paxos/acceptor/PromiseCodeGear.java	Tue Feb 12 11:12:48 2019 +0900
+++ b/src/main/java/christie/test/topology/paxos/acceptor/PromiseCodeGear.java	Tue Feb 12 16:10:15 2019 +0900
@@ -1,16 +1,22 @@
 package christie.test.topology.paxos.acceptor;
 
+import christie.annotation.Peek;
 import christie.annotation.Take;
 import christie.codegear.CodeGear;
 import christie.codegear.CodeGearManager;
 import christie.test.topology.paxos.Proposal;
+import christie.test.topology.paxos.proposer.ProposerCodeGear;
+import christie.topology.TopologyDataGear;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class PromiseCodeGear extends CodeGear {
 
-    @Take
-    Proposal promisedProposal;
+    @Peek
+    TopologyDataGear topoDG;
+
+    //@Peek
+    //Proposal promisedProposal;
 
     @Take
     Proposal prepareProposal;
@@ -18,20 +24,25 @@
     Logger logger = LogManager.getLogger(PromiseCodeGear.class);
 
 
-    public PromiseCodeGear(){ }
-
     @Override
     protected void run(CodeGearManager cgm) {
+        // accept codegearでも使うので, lock
+        PromiseProposal.getInstance().lock();
+        PromiseProposal promiseProposal = PromiseProposal.getInstance();
+        Proposal promisedProposal = promiseProposal.getPromiseProposal();
+
         if(promisedProposal.getNumber() < prepareProposal.getNumber()) {
-            if(promisedProposal.isAccepted()){
-                promisedProposal.setNumber(prepareProposal.getNumber());
-            }else{
+            if(!promisedProposal.isAccepted()){
                 promisedProposal = prepareProposal;
             }
         }
+        promiseProposal.setPromiseProposal(promisedProposal);
+        promiseProposal.unLock();
         put("promisedProposal", promisedProposal);
-        logger.debug("promise " + promisedProposal);
+        logger.debug("promise " + topoDG.getNodeName() + " " + promisedProposal + ", " + promisedProposal.isAccepted());
         put(prepareProposal.getProposerName(),"receivePromise", promisedProposal);
+
+        //cgm.setup(new PromiseCodeGear(promiseProposal));
         cgm.setup(new PromiseCodeGear());
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/christie/test/topology/paxos/acceptor/PromiseProposal.java	Tue Feb 12 16:10:15 2019 +0900
@@ -0,0 +1,46 @@
+package christie.test.topology.paxos.acceptor;
+
+import christie.test.topology.paxos.Proposal;
+
+// singleton
+public class PromiseProposal {
+    Proposal promiseProposal = null;
+
+    private static PromiseProposal instance = new PromiseProposal();
+
+
+    private Object _lock = new Object();
+
+    private boolean sync = false;
+
+    public void setPromiseProposal(Proposal promiseProposal){
+        this.promiseProposal = promiseProposal;
+    }
+
+    public Proposal getPromiseProposal() {
+        return promiseProposal;
+    }
+
+    public synchronized static PromiseProposal getInstance(){
+        return instance;
+    }
+
+    public void lock(){
+        synchronized (_lock){
+            while(sync){
+                try {
+                    _lock.wait();
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+        sync = true;
+    }
+
+    public void unLock(){
+        synchronized (_lock){
+            _lock.notify();
+            sync = false;
+        }
+    }
+}