changeset 84:5ac3df98f780

merger simulator using BlockingQ
author kent
date Mon, 12 Nov 2007 17:57:38 +0900
parents 3db21fae825a
children 18ae3b9fe57a
files src/pathfinder/BlockingQ/ChannelSimulator.java src/pathfinder/BlockingQ/EditorSimulator.java src/pathfinder/BlockingQ/NetworkSimulator.java src/pathfinder/BlockingQ/SeMaSimulator.java src/pathfinder/BlockingQ/TestMerger.java
diffstat 5 files changed, 426 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pathfinder/BlockingQ/ChannelSimulator.java	Mon Nov 12 17:57:38 2007 +0900
@@ -0,0 +1,39 @@
+package pathfinder.BlockingQ;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ChannelSimulator<P> {
+	private BlockingQueue<P> q0;
+	private BlockingQueue<P> q1;
+
+	private NetworkSimulator<P> ns;
+
+	public ChannelSimulator(NetworkSimulator<P> _ns){
+		this(new LinkedBlockingQueue<P>(), new LinkedBlockingQueue<P>(), _ns);
+	}
+	public ChannelSimulator(BlockingQueue<P> _a, BlockingQueue<P> _b, NetworkSimulator<P> _ns){
+		q0 = _a;
+		q1 = _b;
+		ns = _ns;
+	}
+
+	public P read() throws InterruptedException {
+		return q0.take();
+	}
+
+	public void write(P p) throws InterruptedException{
+		synchronized (ns){
+			q1.put(p);
+			ns.notifyAll();
+		}
+	}
+
+	public ChannelSimulator<P> getServerChannel() {
+		return new ChannelSimulator<P>(q1, q0,ns);
+	}
+	
+	public P poll() {
+		return q0.poll();
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pathfinder/BlockingQ/EditorSimulator.java	Mon Nov 12 17:57:38 2007 +0900
@@ -0,0 +1,105 @@
+package pathfinder.BlockingQ;
+
+import java.util.Queue;
+
+import remoteeditor.command.REPCommand;
+import remoteeditor.network.REP;
+import sample.merge.Translater;
+
+public class EditorSimulator extends Thread{
+	private int eid;
+	private int seq;
+	private boolean isOwner;
+	private NetworkSimulator<REPCommand> ns;
+	private ChannelSimulator<REPCommand> cs;
+	private Queue<REPCommand> CmdList;
+	private Translater translater;
+	private pathfinder.Text text;
+	private boolean running=true;
+
+	public EditorSimulator(int _eid, NetworkSimulator<REPCommand> _ns, Queue<REPCommand> q, String _name) {
+		super(_name);
+		eid = _eid;
+		ns = _ns;
+		CmdList = q;
+		translater = new Translater(_eid);
+		text = new pathfinder.Text();
+		cs = ns.connect();
+	}
+
+	public void setOwner(boolean f){
+		isOwner = f;
+	}
+	synchronized public void finish(){
+		running = false;
+	}
+
+	public void run(){
+		System.out.println("Editor"+eid+" start.");
+
+		// Send All Command that is included CmdList.
+		try {
+			sendAllCommand();
+		} catch (InterruptedException e1) {
+			e1.printStackTrace();
+			running=false;
+		}
+
+		// MainLoop, 
+		while(running){
+			REPCommand cmd;
+			try {
+				cmd = cs.read();
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+				continue;
+			}
+
+			//終了条件
+			if (checkQuit(cmd)){
+				System.out.println("\tEditor"+eid+" catch QUIT command emited by itself.");
+				translater.transReceiveCmd(cmd);
+				running=false; break;
+			}
+			System.out.println("\tEditor"+eid+" catch command:>> "+cmd.toString());
+			cmd = translater.transReceiveCmd(cmd);
+			if (isOwner&&cmd!=null) cmd.setThroughMaster(true);
+			if (cmd==null) continue;
+
+			text.edit(cmd);
+			try {
+				cs.write(cmd);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+				continue;
+			}
+		}
+
+		System.out.println("Editor"+eid+" finish.");
+	}
+
+	private void sendAllCommand() throws InterruptedException {
+		for (REPCommand cmd: CmdList){
+			cmd.seq = seq;
+			cmd.eid = eid;
+			cmd.setString("this is inserted or replaced by Editor"+eid+":"+seq);
+			cmd = translater.transSendCmd(cmd);
+			if (isOwner) cmd.setThroughMaster(true);
+			text.edit(cmd);
+			cs.write(cmd);
+			seq++;
+		}
+
+		// Send Quit Command
+		cs.write( translater.transSendCmd( new REPCommand(REP.SMCMD_QUIT, 0, eid, seq++, 0, 0, "QUIT by Editor"+eid)));
+	}
+
+	private boolean checkQuit(REPCommand cmd) {
+		// 最初に全部のコマンドを送信するから、自分のQUITが来るのは最後
+		return (cmd.eid==eid && cmd.cmd==REP.SMCMD_QUIT);
+	}
+
+	public pathfinder.Text getText(){
+		return text;
+	}
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pathfinder/BlockingQ/NetworkSimulator.java	Mon Nov 12 17:57:38 2007 +0900
@@ -0,0 +1,40 @@
+package pathfinder.BlockingQ;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class NetworkSimulator<P> {
+
+	/** Waiting connectionRequest to be accepted by SessionManager. */
+	private Queue<ChannelSimulator<P>> acceptList;
+	/** Established connection */
+	private Queue<ChannelSimulator<P>> connectedList;
+
+	public NetworkSimulator(){
+		acceptList = new LinkedList<ChannelSimulator<P>>();
+		connectedList = new LinkedList<ChannelSimulator<P>>();
+	}
+
+	/**
+	 * Establish connection. It's called by SeMa.
+	 * @return
+	 */
+	public ChannelSimulator<P> accept(){
+		ChannelSimulator<P> cs = acceptList.poll();
+		if (cs==null) return null;
+
+		connectedList.offer(cs);
+		return cs.getServerChannel();
+	}
+
+	/**
+	 * Request to connect.
+	 * Client editor use this method to connect SeMa. 
+	 * @param cs
+	 */
+	public ChannelSimulator<P> connect(){
+		ChannelSimulator<P> cs = new ChannelSimulator<P>(this);
+		acceptList.offer(cs);
+		return cs;
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pathfinder/BlockingQ/SeMaSimulator.java	Mon Nov 12 17:57:38 2007 +0900
@@ -0,0 +1,93 @@
+package pathfinder.BlockingQ;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeMaSimulator<P> extends Thread {
+	private int MAX_PACKET;
+	private int MAX_CLIENT;
+	private boolean running=true;
+	private NetworkSimulator<P> ns;
+	private List<ChannelSimulator<P>> csList;
+
+	public SeMaSimulator(NetworkSimulator<P> _ns, int max_client, int max_packet){
+		ns = _ns;
+		MAX_CLIENT = max_client;
+		MAX_PACKET = max_packet;
+		csList = new ArrayList<ChannelSimulator<P>>();
+	}
+	public SeMaSimulator(NetworkSimulator<P> _ns){
+		this(_ns, 2, 0);
+	}
+
+	synchronized public void finish(){
+		synchronized(ns){
+			running = false;
+			ns.notify();
+		}
+	}
+
+	/**
+	 * Check whether the NetworkSimulator hold waiting connections.
+	 */
+	private void checkAccept(){
+		ChannelSimulator<P> cs;
+		while((cs=ns.accept())!=null){
+			csList.add(cs);
+		}
+	}
+
+	public void run(){
+		int i=0;
+		int count=0;
+		P packet;
+
+		while(csList.size()<MAX_CLIENT){ checkAccept(); Thread.yield(); }
+		System.out.println("SessionManager start.");
+
+		/* Main Loop */
+		ChannelSimulator<P> cs = csList.get(i);
+		while(running
+				&& (MAX_PACKET==0 || count<MAX_PACKET)){
+			synchronized(ns){
+				int prev_i=i;
+				while((packet=cs.poll())==null && running){
+					i = (i+1)%csList.size();   // i++
+					cs = csList.get(i);        // 次のChennelをゲット
+					if(i==prev_i) try { ns.wait(); } catch (InterruptedException e) { e.printStackTrace(); }
+				}
+			}
+			if(!running) break;
+
+			System.out.println("SeMa pass packet to "+i+":>> "+packet.toString());
+			i = (i+1)%csList.size();   // i++
+			cs = csList.get(i);        // 次のChennelをゲット
+
+			// readできていたならそれを書き込む
+			try {
+				cs.write(packet);
+			} catch (InterruptedException e) {
+				System.out.println("SeMa cannot write!!");
+				e.printStackTrace();
+			}
+			count++;
+		}
+/*
+		ChannelSimulator<P> cs = csList.get(i);
+		while(running
+				&& MAX_PACKET==0 || count<MAX_PACKET){
+			packet=cs.poll();          // [i]からread
+			//if(packet!=null) System.out.println("SeMa catch packet to "+i+":>> "+packet.toString());
+			i = (i+1)%csList.size();   // i++
+			cs = csList.get(i);        // 次のChennelをゲット
+			if (packet!=null) {
+				System.out.println("SeMa pass packet to "+i+":>> "+packet.toString());
+				cs.write(packet);      // readできていたならそれを書き込む
+				count++;
+			}
+			//if (i==0) checkAccept();   //全部回ったらaccept待ちをチェック
+		}
+*/
+		System.out.println("SessionManager finish.");
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/pathfinder/BlockingQ/TestMerger.java	Mon Nov 12 17:57:38 2007 +0900
@@ -0,0 +1,149 @@
+package pathfinder.BlockingQ;
+
+import java.util.LinkedList;
+import remoteeditor.command.REPCommand;
+import remoteeditor.network.REP;
+
+public class TestMerger {
+	static public int cmdNO[] = { REP.REPCMD_INSERT, REP.REPCMD_REPLACE, REP.REPCMD_DELETE };
+	private int N_editor;
+
+	private NetworkSimulator<REPCommand> ns;
+	private SeMaSimulator<REPCommand> sm;
+	private LinkedList<EditorSimulator> editors;
+	private int N_packet;
+
+	public TestMerger(int editor, int packet){
+		N_editor = editor;
+		N_packet = packet;
+		ns = new NetworkSimulator<REPCommand>();
+		sm = new SeMaSimulator<REPCommand>(ns, N_editor, 0);
+		editors = new LinkedList<EditorSimulator>();
+	}
+
+	public static void main(String[] args){
+		TestMerger tm;
+		tm = new TestMerger(2, 3);
+		//tm.init();
+		//tm.test2cmd();
+		tm.test1cmd();
+		//tm.test0cmd();
+		tm.startTest();
+
+		//tm.printAllTexts();
+		//if (!tm.checkCS())
+		//	System.out.println("Error!! :some ChannelSimulator still have packet!");
+		if (!tm.checkEquality())
+			System.out.println("Error!! :all Editor's text is NOT mutch!");
+
+	}
+
+	/*
+	private void init(){
+		for (int i=0; i<N_editor; i++){
+			int j;
+			LinkedList<REPCommand> cmds = new LinkedList<REPCommand>();
+			// 各エディタが送信するコマンド列を生成
+			for(j=0; j<N_packet; j++) {
+				REPCommand cmd = new REPCommand(cmdNO[Verify.random(2)],
+				                                0, i, j,
+				                                10, //Verify.random(text.size()-1), //size-1?
+				                                0, null);
+				cmds.add( cmd);
+			}
+			EditorEmulator2 ee = new EditorEmulator2(i, ns, cmds, "Editor"+i); 
+			editors.add(ee);
+		}
+	}
+	*/
+	private void test2cmd(){
+		for (int i=0; i<N_editor; i++){
+			int j=0;
+			LinkedList<REPCommand> cmds = new LinkedList<REPCommand>();
+			// 各エディタが送信するコマンド列を生成
+
+			String str = "created by Editor"+i+":"+j;
+			REPCommand cmd = new REPCommand(REP.REPCMD_INSERT,
+					0, i, j++,
+					10, //Verify.random(text.size()-1), //size-1?
+					str.length(), str);
+			cmds.add( cmd);
+			str = "created by Editor"+i+":"+j;
+			cmd = new REPCommand(REP.REPCMD_INSERT,
+					0, i, j++,
+					10, //Verify.random(text.size()-1), //size-1?
+					str.length(), str);
+			cmds.add( cmd);
+
+			EditorSimulator ee = new EditorSimulator(i, ns, cmds, "Editor"+i);
+			if(i==0) ee.setOwner(true);
+			editors.add(ee);
+		}
+	}
+
+
+	private void test1cmd(){
+		for (int i=0; i<N_editor; i++){
+			int j=0;
+			LinkedList<REPCommand> cmds = new LinkedList<REPCommand>();
+			//各エディタが送信するコマンド列を生成
+			String str = "Editor"+i+":"+j;
+			REPCommand cmd = new REPCommand(REP.REPCMD_INSERT,
+					0, i, j++,
+					10, //Verify.random(text.size()-1), //size-1?
+					str.length(), str);
+			cmds.add( cmd);
+			EditorSimulator ee = new EditorSimulator(i, ns, cmds, "Editor"+i); 
+			if(i==0) ee.setOwner(true);
+			editors.add(ee);
+		}
+	}
+
+	private void test0cmd(){
+		for (int i=0; i<N_editor; i++){
+			int j=0;
+			LinkedList<REPCommand> cmds = new LinkedList<REPCommand>();
+			//各エディタが送信するコマンド列を生成
+			EditorSimulator ee = new EditorSimulator(i, ns, cmds, "Editor"+i); 
+			if(i==0) ee.setOwner(true);
+			editors.add(ee);
+		}
+	}
+
+	private void startTest() {
+		for (EditorSimulator ee: editors){
+			ee.start();
+		}
+		sm.start();
+
+		try {
+			for (EditorSimulator ee: editors){
+				//ee.finish();
+				ee.join();
+			}
+			sm.finish();
+			sm.join();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+
+	private void printAllTexts(){
+		for(EditorSimulator ee: editors){
+			System.out.println("--"+ee.getName()+"------------------------");
+			ee.getText().printAllText();
+		}
+	}
+
+	private boolean checkEquality(){
+		/*
+		Text ee0 = editors.remove().getText();
+		return editors.remove().getText().equals(ee0);
+		*/
+		pathfinder.Text text0 = editors.element().getText();
+		for(EditorSimulator ee: editors){
+			if (!text0.equals(ee.getText())) return false;
+		}
+		return true;
+	}
+}