# HG changeset patch # User Nobuyasu Oshiro # Date 1314654589 -32400 # Node ID acd88e63854b1b9189313acd5abadc0f5fe8e7fd # Parent 4297c2011b713878e14ee1ec865ef0d56ef08e67# Parent fa2122e5c8071ef7522a7c3c529742bbf6275d8d modify 128 diff -r fa2122e5c807 -r acd88e63854b .classpath --- a/.classpath Tue Aug 02 20:15:01 2011 +0900 +++ b/.classpath Tue Aug 30 06:49:49 2011 +0900 @@ -2,5 +2,6 @@ + diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/AuthPanel.class Binary file bin/myVncProxy/AuthPanel.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/ButtonPanel.class Binary file bin/myVncProxy/ButtonPanel.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/CapabilityInfo.class Binary file bin/myVncProxy/CapabilityInfo.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/CapsContainer.class Binary file bin/myVncProxy/CapsContainer.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/ClipboardFrame.class Binary file bin/myVncProxy/ClipboardFrame.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/CreateHtmlFile.class Binary file bin/myVncProxy/CreateHtmlFile.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/CreateThread.class Binary file bin/myVncProxy/CreateThread.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/DesCipher.class Binary file bin/myVncProxy/DesCipher.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/HTTPConnectSocket.class Binary file bin/myVncProxy/HTTPConnectSocket.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/HTTPConnectSocketFactory.class Binary file bin/myVncProxy/HTTPConnectSocketFactory.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/InStream.class Binary file bin/myVncProxy/InStream.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MemInStream.class Binary file bin/myVncProxy/MemInStream.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MulticastQueue$Client.class Binary file bin/myVncProxy/MulticastQueue$Client.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MulticastQueue$Node.class Binary file bin/myVncProxy/MulticastQueue$Node.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MulticastQueue.class Binary file bin/myVncProxy/MulticastQueue.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MyRfbProto$1.class Binary file bin/myVncProxy/MyRfbProto$1.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MyRfbProto$2.class Binary file bin/myVncProxy/MyRfbProto$2.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/MyRfbProto.class Binary file bin/myVncProxy/MyRfbProto.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/OptionsFrame.class Binary file bin/myVncProxy/OptionsFrame.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/OptionsNoFrame.class Binary file bin/myVncProxy/OptionsNoFrame.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/ProxyVncCanvas.class Binary file bin/myVncProxy/ProxyVncCanvas.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/RecordingFrame.class Binary file bin/myVncProxy/RecordingFrame.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/ReloginPanel.class Binary file bin/myVncProxy/ReloginPanel.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/RfbProto.class Binary file bin/myVncProxy/RfbProto.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/SendThread.class Binary file bin/myVncProxy/SendThread.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/SessionRecorder.class Binary file bin/myVncProxy/SessionRecorder.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/SocketFactory.class Binary file bin/myVncProxy/SocketFactory.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/VncCanvas.class Binary file bin/myVncProxy/VncCanvas.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/VncCanvas2.class Binary file bin/myVncProxy/VncCanvas2.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/VncProxyService.class Binary file bin/myVncProxy/VncProxyService.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/VncViewer.class Binary file bin/myVncProxy/VncViewer.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/myVncProxy/ZlibInStream.class Binary file bin/myVncProxy/ZlibInStream.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MultiThreadTee$Client.class Binary file bin/test/MultiThreadTee$Client.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MultiThreadTee$Parent.class Binary file bin/test/MultiThreadTee$Parent.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MultiThreadTee.class Binary file bin/test/MultiThreadTee.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MulticastQueue$1.class Binary file bin/test/MulticastQueue$1.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MulticastQueue$2.class Binary file bin/test/MulticastQueue$2.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MulticastQueue$Client.class Binary file bin/test/MulticastQueue$Client.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MulticastQueue$Node.class Binary file bin/test/MulticastQueue$Node.class has changed diff -r fa2122e5c807 -r acd88e63854b bin/test/MulticastQueue.class Binary file bin/test/MulticastQueue.class has changed diff -r fa2122e5c807 -r acd88e63854b src/myVncProxy/AcceptClient.java --- a/src/myVncProxy/AcceptClient.java Tue Aug 02 20:15:01 2011 +0900 +++ b/src/myVncProxy/AcceptClient.java Tue Aug 30 06:49:49 2011 +0900 @@ -6,215 +6,235 @@ import java.util.*; public class AcceptClient extends Thread { - int counter = 0 , parentnum = 0/*落ちたときの親の番号をカウントするためのもの*/; + int counter = 0, parentnum = 0/* 落ちたときの親の番号をカウントするためのもの */; LinkedList ls = new LinkedList(); - boolean runflag =false; - private String name; - boolean addrRegistor=true; + boolean runflag = false; + private String name; + boolean addrRegistor = true; public AcceptClient(String _name) { name = _name; } + public AcceptClient() { new CreateThread(this); } - - public synchronized void gethost(BufferedReader is,PrintStream os) { - String line,port; - int intv_time = 100; - String request; - int treebranch = 2;//treeの子ノードの数 - String leaderflag="0",sendleaderflag="0";//Socket send standardization String - + + // public synchronized void transferParentAddrerss(BufferedReader + // is,PrintStream os) { + public synchronized void transferParentAddrerss(BufferedReader is,PrintStream os) { + String line, port; + int intv_time = 100; + String request; + int treebranch = 2;// treeの子ノードの数 + String leaderflag = "0", sendleaderflag = "0"; // クライアントからのメッセージを待ち、受け取ったメッセージをそのまま返す - try { - while (true) { - line = is.readLine(); - port = is.readLine(); - - System.out.println("データーを受信しましたlin="+line+" port="+port); - //自分の IPADRESSを取得する - InetAddress addr = InetAddress.getLocalHost(); - String add = new String(addr.getHostAddress()); + try { + while (true) { + line = is.readLine(); + port = is.readLine(); + + System.out.println("データーを受信しましたlin=" + line + + " port=" + port); + // 自分の IPADRESSを取得する + InetAddress addr = InetAddress.getLocalHost(); + String add = new String(addr.getHostAddress()); + + if ("1".equals(line)) { + System.out.println("親が落ちましたmessage" + port); + String checkRepetition = is.readLine(); + os.println(ls.getLast()); + parentnum = (Integer.parseInt(port) - 1) + / treebranch; + String newparent = ls.get(parentnum); + + counter--; + runflag = true; + + sendleaderflag = decisionLeader( + Integer.parseInt(port), treebranch); + + Child report = new Child(); - if("1".equals(line)) { - System.out.println("親が落ちましたmessage" + port); - os.println(ls.getLast()); - parentnum = (Integer.parseInt(port) -1) / treebranch; - String hidenchild=ls.getLast(); - String newparent=ls.get(parentnum); - listupdate(port); - counter--; - runflag = true; - - if((counter-1)%treebranch==1) { // children in most young treenum have leaderflag 1 other 0 - sendleaderflag = "1"; - } else { - sendleaderflag = "0"; - } + report.reportLastNode(ls.getLast(), newparent, + port, String.valueOf(parentnum), + sendleaderflag, counter); + + listupdate(port); + + int g = 0; + for (String bs : ls) { + System.out.println(g + "番目" + bs); + g++; + } + + os.println(port); + // os.println(leaderflag); + + leaderflag = decisionLeader( + Integer.parseInt(checkRepetition), + treebranch); + + if (Integer.parseInt(checkRepetition) == counter + 1) { + checkRepetition = "stop"; + } else { + checkRepetition = "go"; + } + os.println(checkRepetition); + + // os.println(leaderflag); + Thread.sleep(intv_time); + is.close(); + os.close(); - Child report = new Child(); - - report.reportLastNode(hidenchild,newparent,port,String.valueOf(parentnum), - sendleaderflag,counter); - - os.println(port); - Thread.sleep(intv_time); - is.close(); - os.close(); + } else if ("2".equals(line)) { + parentnum = (Integer.parseInt(port) - 1) + / treebranch; + String newparent = ls.get(parentnum); + + outputStream(os, newparent, + String.valueOf(parentnum), + String.valueOf(counter), leaderflag); + + os.close(); + is.close(); + } else if ("3".equals(line)) { + String checkRepetition = is.readLine(); + System.out.println("落ちたのを確認しました"); + + os.println(ls.get(Integer.parseInt(port))); + os.println(port); + + if (checkRepetition.equals(ls.getLast())) { + checkRepetition = "stop"; + } else { + checkRepetition = "go"; + } + os.println(checkRepetition); - } else if("2".equals(line)) { - parentnum = (Integer.parseInt(port) -1) / treebranch; - String newparent=ls.get(parentnum); - - outputStream(os,newparent,String.valueOf(parentnum), - String.valueOf(counter),leaderflag); - - os.close(); - is.close(); - } else if("3".equals(line)) { - System.out.println("落ちたのを確認しました"); + System.out.println("num4=" + + ls.get(Integer.parseInt(port))); + line = null; + runflag = false; + is.close(); + os.close(); + } else { + if (addrRegistor == true) { + ls.add(add); + addrRegistor = false; + } + + if (line != null) { + addClientAdress(line, ls); + counter++; + } else { + break; + } - os.println(ls.get(Integer.parseInt(port))); - os.println(port); - int g = 0; - for(String bs: ls){ - System.out.println(g+"番目"+bs); - g++; + if (counter >= treebranch + 1) { + + leaderflag = decisionLeader(counter, treebranch); + parentnum = (counter - 1) / treebranch; + + request = ls.get(parentnum); + System.out.println(parentnum); + + outputStream(os, request, + String.valueOf(parentnum), + String.valueOf(counter), leaderflag); + + checkParameter(parentnum, counter, leaderflag); + } else { + // treeの親ノードに接続する人に接続する人を教える + outputStream(os, add, "0", + String.valueOf(counter), leaderflag); + } + Thread.sleep(intv_time); } - System.out.println("num4=" + ls.get(Integer.parseInt(port))); - line=null; - runflag = false; - is.close(); - os.close(); - } else { - if(addrRegistor==true){ - ls.add(add); - addrRegistor = false; - } - System.out.println(parentnum); - if(line != null) { - arg(line,ls); - counter++; - } else { - break; - } + } + + } catch (IOException e) { + System.out.println(e); + } - if(counter>=treebranch+1) { - if((counter-1)%treebranch==0) { - leaderflag = "1"; - } else { - leaderflag = "0"; - } - - parentnum = (counter - 1) / treebranch; - // request = [p-1]; - request = ls.get(parentnum); - - outputStream(os,request,String.valueOf(parentnum), - String.valueOf(counter),leaderflag); - - checkParameter(parentnum,counter,leaderflag); - } else { - //treeの親ノードに接続する人に接続する人を教える - outputStream(os,name,"0",String.valueOf(counter), - leaderflag); - } - Thread.sleep(intv_time); - } + catch (InterruptedException e) { + e.printStackTrace(); } - - } catch (IOException e) { - System.out.println(e); - } - - catch(InterruptedException e) { - e.printStackTrace(); - } - /* - try{ - echoServer.close(); - } - catch (IOException e){ - System.out.println(e); - } - */ - //} comment out while + + } - + /** * @param port - * parent value + * parent value */ void listupdate(String port) { ls.remove(Integer.parseInt(port)); - ls.add(Integer.parseInt(port),ls.getLast()); + ls.add(Integer.parseInt(port), ls.getLast()); ls.removeLast(); } - void outputStream(PrintStream os,String request,String parentnum,String treenum,String leaderflag) { + void outputStream(PrintStream os, String request, String parentnum, + String treenum, String leaderflag) { os.println(request); os.println(parentnum); os.println(treenum); os.println(leaderflag); } - - void checkParameter(int parent,int counter,String leaderflag) { - System.out.println("pの値="+parentnum); - System.out.println("iの値="+counter); - System.out.println("leaderflag="+leaderflag + "\n"); + + void checkParameter(int parent, int counter, String leaderflag) { + System.out.println("pの値=" + parentnum); + System.out.println("iの値=" + counter); + System.out.println("leaderflag=" + leaderflag + "\n"); } - - void arg(String line,LinkedList ls) { - if(line != null){ + + void addClientAdress(String line, LinkedList ls) { + int g = 0; + if (line != null) { ls.add(line); } - int g=0; - - for(String bs: ls){ - System.out.println(g+"番目"+bs); + for (String bs : ls) { + System.out.println(g + "番目" + bs); g++; } } - + + String decisionLeader(int counter, int treebranch) { + if ((counter - 1) % treebranch == 1) { // children in most young treenum + // have leaderflag 1 other 0 + return "0"; + } else { + return "1"; + } + } + } -/* -class sock { - void arg(String line,LinkedList ls) { - if(line != null){ - ls.add(line); - } - int g=0; +class Child { - for(String bs: ls){ - System.out.println(g+"番目"+bs); - g++; - } - } -} -*/ -class Child{ - - void reportLastNode(String hiddenchild,String newparent,String newtreenum,String newpnum,String newleaderflag,int i) throws IOException{ + void reportLastNode(String hiddenchild, String newparent, + String newtreenum, String newpnum, String newleaderflag, int i) + throws IOException { try { Socket echoSocket; System.out.println(hiddenchild + "に接続します"); - echoSocket = new Socket(hiddenchild, 10001 + (i + 1));//i+1は実験中に同じマシーンを使っていたのでportを変えて対応、本番時には取り除く予定。 + echoSocket = new Socket(hiddenchild, 10001 + (i + 1));// i+1は実験中に同じマシーンを使っていたのでportを変えて対応、本番時には取り除く予定。 - DataOutputStream os = new DataOutputStream(echoSocket.getOutputStream()); + DataOutputStream os = new DataOutputStream( + echoSocket.getOutputStream()); - os.writeBytes(newparent+"\n"); - os.writeBytes(newpnum+"\n"); - os.writeBytes(newtreenum+"\n"); - os.writeBytes(newleaderflag+"\n"); - + os.writeBytes(newparent + "\n"); + os.writeBytes(newpnum + "\n"); + os.writeBytes(newtreenum + "\n"); + os.writeBytes(newleaderflag + "\n"); + os.close(); } catch (UnknownHostException e) { System.err.println("Don't know about host: localhost"); } catch (IOException e) { - System.err.println("Couldn't get I/O for the connection to: localhost"); + System.err + .println("Couldn't get I/O for the connection to: localhost"); } } diff -r fa2122e5c807 -r acd88e63854b src/myVncProxy/AcceptThread.java --- a/src/myVncProxy/AcceptThread.java Tue Aug 02 20:15:01 2011 +0900 +++ b/src/myVncProxy/AcceptThread.java Tue Aug 30 06:49:49 2011 +0900 @@ -1,4 +1,4 @@ -package myVncProxy; + package myVncProxy; import java.net.Socket; import java.io.IOException; import java.io.InputStream; @@ -7,13 +7,19 @@ public class AcceptThread implements Runnable { MyRfbProto rfb; byte[] imageBytes; - + int port; + AcceptThread(MyRfbProto _rfb) { rfb = _rfb; } + AcceptThread(MyRfbProto _rfb, int p) { + rfb = _rfb; + port = p; + } + public void run() { - rfb.selectPort(); + rfb.selectPort(port); while (true) { try { Socket newCli = rfb.accept(); diff -r fa2122e5c807 -r acd88e63854b src/myVncProxy/CreateThread.java --- a/src/myVncProxy/CreateThread.java Tue Aug 02 20:15:01 2011 +0900 +++ b/src/myVncProxy/CreateThread.java Tue Aug 30 06:49:49 2011 +0900 @@ -4,47 +4,79 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; +import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; - - public class CreateThread implements Runnable { ServerSocket echoServer; AcceptClient acceptClient; + int port; + public CreateThread(AcceptClient _acc) { + acceptClient = _acc; + port = 9999; + } + + public CreateThread(AcceptClient _acc , int port) { + acceptClient = _acc; + this.port = port; + } - public CreateThread(AcceptClient _acc){ - acceptClient = _acc; + void newEchoClient(final BufferedReader is,final PrintStream os) { + Runnable echoSender = new Runnable() { + public void run() { + acceptClient.transferParentAddrerss(is,os); + } + }; + new Thread(echoSender).start(); + } + + void selectPort(int p) { + int port = p; + while (true) { + try { + initServSock(port); + break; + } catch (BindException e) { + port++; + continue; + } catch (IOException e) { + + } + } + System.out.println("accept Echo port = " + port); + } + + void initServSock(int port) throws IOException { + echoServer = new ServerSocket(port); + this.port = port; } public void run() { + selectPort(port); while (true) { - try { - echoServer = new ServerSocket(9999); - } - catch (IOException e) { +// echoServer = new ServerSocket(9999); + Socket clientSocket = echoServer.accept(); + BufferedReader is = new BufferedReader(new InputStreamReader( + clientSocket.getInputStream())); + PrintStream os = new PrintStream(clientSocket.getOutputStream()); + newEchoClient(is,os); +// acceptClient.transferParentAddrerss(is, os); + } catch (IOException e) { System.out.println(e); } +/* try { - Socket clientSocket = echoServer.accept(); - BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); - PrintStream os = new PrintStream(clientSocket.getOutputStream()); - acceptClient.gethost(is,os); - } catch (IOException e){ - e.printStackTrace(); + echoServer.close(); + } catch (IOException e) { System.out.println(e); } - try { - echoServer.close(); - } - catch (IOException e){ - System.out.println(e); - } - +*/ + } } diff -r fa2122e5c807 -r acd88e63854b src/myVncProxy/MostRecentMultiCast.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/myVncProxy/MostRecentMultiCast.java Tue Aug 30 06:49:49 2011 +0900 @@ -0,0 +1,27 @@ +package myVncProxy; + +import java.util.LinkedList; + + +public class MostRecentMultiCast extends MulticastQueue { + + LinkedList> alive; + int count = 0; + MostRecentMultiCast(int limit) { + count = limit; + this.alive = new LinkedList>(); + } + + @Override + public synchronized void put(T item) + { + Node next = new Node(item); + tail.set(next); + tail = next; + alive.addLast(next); + if (alive.size()>count) { + Node old = alive.getFirst(); + old.clear(); + } + } +} diff -r fa2122e5c807 -r acd88e63854b src/myVncProxy/MulticastQueue.java --- a/src/myVncProxy/MulticastQueue.java Tue Aug 02 20:15:01 2011 +0900 +++ b/src/myVncProxy/MulticastQueue.java Tue Aug 30 06:49:49 2011 +0900 @@ -33,21 +33,24 @@ node = tail; } - public T poll() + synchronized public T poll() { Node next = null; - - try { - next = node.next(); - }catch(InterruptedException _e){ - _e.printStackTrace(); - } - node = next; - return next.item; + T item = null; + do { + try { + next = node.next(); + }catch(InterruptedException _e){ + continue; + } + item = node.getItem(); + node = next; + } while ( item == null); + return item; } } - private static class Node + static class Node { private T item; private Node next; @@ -60,6 +63,10 @@ latch = new CountDownLatch(1); } + synchronized public T getItem() { + return item; + } + public void set(Node next) { this.next = next; @@ -71,5 +78,9 @@ latch.await(); return next; } + + synchronized public void clear() { + item = null; + } } } diff -r fa2122e5c807 -r acd88e63854b src/myVncProxy/MyRfbProto.java --- a/src/myVncProxy/MyRfbProto.java Tue Aug 02 20:15:01 2011 +0900 +++ b/src/myVncProxy/MyRfbProto.java Tue Aug 30 06:49:49 2011 +0900 @@ -1,36 +1,46 @@ package myVncProxy; +import static org.junit.Assert.*; + import java.awt.Graphics; import java.awt.Image; import java.awt.image.BufferedImage; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.LinkedList; import javax.imageio.ImageIO; +import org.junit.Test; + import myVncProxy.MulticastQueue.Client; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; import java.io.OutputStream; +public class MyRfbProto extends RfbProto { final static String versionMsg_3_998 = "RFB 003.998\n"; /** * CheckMillis is one of new msgType for RFB 3.998. */ - final static int SpeedCheckMillis = 4; + final static byte SpeedCheckMillis = 4; + private static final int INFLATE_BUFSIZE = 1024*100; boolean printStatusFlag = false; long startCheckTime; @@ -41,51 +51,44 @@ private int rectW; private int rectH; private int encoding; - private int bytesPixel; + private int zLen; + private boolean clicomp = false; private ServerSocket servSock; private int acceptPort; private byte initData[]; private LinkedList cliListTmp; private LinkedList cliList; - private LinkedList sendThreads; boolean createBimgFlag; - boolean sendFlag = true; + ExecutorService executor; - // override viewer to VncProxyService from VncViewer - VncProxyService viewer; - byte[] pngBytes; - private MulticastQueue multicastqueue = new MulticastQueue(); - + // private MulticastQueue> multicastqueue = new MostRecentMultiCast>(10); + private MulticastQueue> multicastqueue = new MulticastQueue>(); + private int clients = 0; + private Inflater inflater = new Inflater(); + private Deflater deflater = new Deflater(); + private CreateThread geth; + + public + MyRfbProto() throws IOException { + } + MyRfbProto(String h, int p, VncViewer v) throws IOException { super(h, p, v); - cliList = new LinkedList(); - cliListTmp = new LinkedList(); - createBimgFlag = false; - // sendThreads = new LinkedList(); - // executor = Executors.newCachedThreadPool(); - // executor = Executors.newSingleThreadExecutor(); } - MyRfbProto(String h, int p, VncProxyService v) throws IOException { + MyRfbProto(String h, int p, CreateThread geth) throws IOException { super(h, p); - viewer = v; + this.geth = geth; } - + MyRfbProto(String h, int p) throws IOException { super(h, p); - cliList = new LinkedList(); - cliListTmp = new LinkedList(); - createBimgFlag = false; - // sendThreads = new LinkedList(); - // executor = Executors.newCachedThreadPool(); - // executor = Executors.newSingleThreadExecutor(); } - - + // over write void writeVersionMsg() throws IOException { clientMajor = 3; @@ -114,21 +117,21 @@ acceptPort = port; } - // open port 5999 for to accept client. - void selectPort() { - int i = 5999;// i = 5550; + // 5999を開けるが、開いてないなら+1のポートを開ける。 + void selectPort(int p) { + int port = p; while (true) { try { - initServSock(i); + initServSock(port); break; } catch (BindException e) { - i++; + port++; continue; } catch (IOException e) { } } - System.out.println("accept port = " + i); + System.out.println("accept port = " + port); } int getAcceptPort() { @@ -206,7 +209,7 @@ os.write(versionMsg_3_998.getBytes()); } - void readVersionMsg(InputStream is) throws IOException { + void readVersionMsg(InputStream is, OutputStream os) throws IOException { byte[] b = new byte[12]; @@ -229,6 +232,16 @@ "RFB server does not support protocol version 3"); } + if (serverMinor == 998) { + sendPortNumber(os); + } + + } + + void sendPortNumber(OutputStream os) throws IOException { + byte[] b = new byte[4]; + b = castIntByte(geth.port); + os.write(b); } void sendSecurityType(OutputStream os) throws IOException { @@ -259,20 +272,6 @@ os.write(initData); } - void sendData(byte b[]) { - try { - multicastqueue.put(b); - - /* - * // for(Socket cli : cliList){ // try{ // - * cli.getOutputStream().write(b, 0, b.length); // - * }catch(IOException e){ // // if socket closed // - * cliList.remove(cli); // } // } - */ - // System.out.println("cliSize="+cliSize()); - } catch (Exception e) { - } - } void sendPngImage() { try { @@ -304,77 +303,50 @@ System.out.println("numBytesRead=" + numBytesRead); } - void bufResetSend(int size) throws IOException { - is.reset(); - byte buffer[] = new byte[size]; - readFully(buffer); - sendData(buffer); - } void regiFramebufferUpdate() throws IOException { - is.mark(30); - messageType = readU8(); - skipBytes(1); - rectangles = readU16(); - rectX = readU16(); - rectY = readU16(); - rectW = readU16(); - rectH = readU16(); - encoding = readU32(); + is.mark(20); + messageType = readU8(); // 0 + skipBytes(1); // 1 + rectangles = readU16(); // 2 + rectX = readU16(); // 4 + rectY = readU16(); // 6 + rectW = readU16(); // 8 + rectH = readU16(); // 10 + encoding = readU32(); // 12 +// System.out.println("encoding = "+encoding); + if (encoding == EncodingZRLE|| encoding==EncodingZRLEE||encoding==EncodingZlib) + zLen = readU32(); + else + zLen = 0; + is.reset(); + } + + int checkAndMark() throws IOException { int dataLen; switch (encoding) { case RfbProto.EncodingRaw: dataLen = rectW * rectH * 4 + 16; - break; - case RfbProto.EncodingTight: - dataLen = 4000000; +// is.mark(dataLen); break; - case RfbProto.EncodingZRLE: - int zLen = readU32(); - dataLen = zLen + 20; + case RfbProto.EncodingCopyRect: + dataLen = 16 + 4; +// is.mark(dataLen); break; case RfbProto.EncodingRRE: case RfbProto.EncodingCoRRE: case RfbProto.EncodingHextile: - dataLen = rectW * rectH * 4 + 16; + case RfbProto.EncodingTight: + dataLen = zLen + 20; +// is.mark(dataLen); break; case RfbProto.EncodingZlib: - default: - dataLen = rectW * rectH * 4 + 16; - break; - } - System.out.println("dataLen = "+dataLen); - is.reset(); - is.mark(dataLen); - - } - - - int checkAndMark() throws IOException { -/* - int dataLen; - switch (encoding) { - case RfbProto.EncodingRaw: - dataLen = rectW * rectH * 4 + 16; - is.mark(dataLen); - break; - case RfbProto.EncodingCopyRect: - dataLen = 16 + 4; - is.mark(dataLen); - break; - case RfbProto.EncodingRRE: - case RfbProto.EncodingCoRRE: - case RfbProto.EncodingHextile: - case RfbProto.EncodingZlib: - case RfbProto.EncodingTight: + case RfbProto.EncodingZRLE: + case RfbProto.EncodingZRLEE: dataLen = zLen + 20; - is.mark(dataLen); - break; - case RfbProto.EncodingZRLE: - dataLen = zLen + 20; - is.mark(dataLen); +// is.mark(dataLen); break; case RfbProto.EncodingXCursor: case RfbProto.EncodingRichCursor: @@ -382,50 +354,23 @@ int u8Array = (int)Math.floor((rectW + 7)/8) * rectH; dataLen = pixArray + u8Array; printFramebufferUpdate(); - is.mark(dataLen); +// is.mark(dataLen); break; default: dataLen = 1000000; - is.mark(dataLen); +// is.mark(dataLen); } return dataLen; -*/ - return 0; } - void readSendData(int dataLen) throws IOException { - byte buffer[] = new byte[dataLen]; - readFully(buffer); - multicastqueue.put(buffer); - is.reset(); -/* - for (Socket cli : cliList) { - try { - OutputStream out = cli.getOutputStream(); - executor.execute(new SendThread(out, buffer)); - } catch (IOException e) { - // if client socket closed - cliListTmp.remove(cli); - } catch (Exception e) { - - } - - } -*/ - } - void sendDataToClient() throws IOException { + void sendDataToClient() throws Exception { regiFramebufferUpdate(); + printFramebufferUpdate(); int dataLen = checkAndMark(); readSendData(dataLen); } - void sendDataToClient(int num) throws IOException { - bytesPixel = num; - regiFramebufferUpdate(); - int dataLen = checkAndMark(); - readSendData(dataLen); - } BufferedImage createBufferedImage(Image img) { BufferedImage bimg = new BufferedImage(img.getWidth(null), img.getHeight(null), BufferedImage.TYPE_INT_RGB); @@ -475,28 +420,26 @@ BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(pngBytes)); return bimg; } -/* - void readPngData() throws IOException { - pngBytes = new byte[is.available()]; - readFully(pngBytes); - } -*/ + void printFramebufferUpdate() { System.out.println("messageType=" + messageType); System.out.println("rectangles=" + rectangles); System.out.println("encoding=" + encoding); - System.out.println("rectX = "+ rectX +": rectY = "+rectY); - System.out.println("rectW = "+ rectW +": rectH = "+rectH); - System.out.println("rectW * rectH = " + rectW * rectH); + System.out.println("rectX = "+rectX+": rectY = "+rectY); + System.out.println("rectW = "+rectW+": rectH = "+rectH); switch (encoding) { case RfbProto.EncodingRaw: - int dataLen = rectW * rectH * 4 + 16; - System.out.println("rectW * rectH * 4 + 16 = " + (rectW * rectH * 4 + 16)); + System.out.println("rectW * rectH * 4 + 16 =" + rectW * rectH * 4 + + 16); break; default: } } + int returnMsgtype() { + return messageType; + } + void readSpeedCheck() throws IOException { byte[] b = new byte[1]; @@ -504,12 +447,14 @@ } void startSpeedCheck() { - byte[] b = new byte[2]; - b[0] = (byte) SpeedCheckMillis; - b[1] = (byte) 0; + ByteBuffer b = ByteBuffer.allocate(10); + b.put((byte)SpeedCheckMillis); + b.flip(); startCheckTime = System.currentTimeMillis(); System.out.println("startChckTime = "+ startCheckTime); - multicastqueue.put(b); + LinkedListbufs = new LinkedList(); + bufs.add(b); + multicastqueue.put(bufs); } void endSpeedCheck() { @@ -518,9 +463,6 @@ System.out.println("checkMillis: " + time); } - void printStatus() { - System.out.println(); - } synchronized void changeStatusFlag() { printStatusFlag = true; @@ -534,93 +476,9 @@ changeStatusFlag(); } } - void require() throws IOException { - sendFlag = false; - System.out.println("setEncodingRaw()"); - setEncodingRaw(); - writeFramebufferUpdateRequest(0, 0, - 16, 16, false); - System.out.println("setEncodingZRLE()"); - setEncodingZRLE(); - System.out.println("writeFramebufferUpdateRequest"); - writeFramebufferUpdateRequest(0, 0, - framebufferWidth, framebufferHeight, false); - } - - void setEncodingRaw() throws IOException{ - byte[] b = new byte[4 + 4]; - - b[0] = (byte) SetEncodings; - b[2] = (byte) ((1 >> 8) & 0xff); - b[3] = (byte) (1 & 0xff); - - b[4] = (byte)0; - b[5] = (byte)0; - b[6] = (byte)0; - b[7] = (byte)0; - - os.write(b); - - } - - void setEncodingZRLE() throws IOException{ - byte[] b = new byte[4 + 4]; - - b[0] = (byte) SetEncodings; - b[2] = (byte) ((1 >> 8) & 0xff); - b[3] = (byte) (1 & 0xff); - - b[4] = (byte)0; - b[5] = (byte)0; - b[6] = (byte)0; - b[7] = (byte)16; - - os.write(b); - - } - void newClient(AcceptThread acceptThread, final Socket newCli, - final OutputStream os, final InputStream is) throws IOException { - // createBimgFlag = true; - // rfb.addSockTmp(newCli); - // addSock(newCli); - final Client c = multicastqueue.newClient(); - require(); - Runnable sender = new Runnable() { - public void run() { - try { - /** - * initial connection of RFB protocol - */ - sendRfbVersion(os); - readVersionMsg(is); - sendSecurityType(os); - readSecType(is); - sendSecResult(os); - readClientInit(is); - sendInitData(os); - - for (;;) { - byte[] b = c.poll(); - os.write(b, 0, b.length); - } - } catch (IOException e) { - /** - * if socket closed - */ - // cliList.remove(newCli); - } - - } - - }; - new Thread(sender).start(); - - } - void speedCheckMillis() { - - Runnable stdin = new Runnable() { + Runnable stdin = new Runnable() { public void run() { int c; try { @@ -629,10 +487,7 @@ case 's': break; default: -// startSpeedCheck(); - writeFramebufferUpdateRequest(0, 0, framebufferWidth, - framebufferHeight, false); - + startSpeedCheck(); break; } } @@ -645,28 +500,397 @@ new Thread(stdin).start(); } - void requireFramebuffer() { - Runnable stdin = new Runnable() { + /** + * gzip byte arrays + * @param deflater + * @param inputs + * byte data[] + * @param inputIndex + * @param outputs + * byte data[] + * @return byte length in last byte array + * @throws IOException + */ + public int zip(Deflater deflater,LinkedList inputs, int inputIndex, LinkedList outputs) throws IOException { + int len = 0; + ByteBuffer c1= ByteBuffer.allocate(INFLATE_BUFSIZE); + while(inputIndex < inputs.size() ) { + ByteBuffer b1 = inputs.get(inputIndex++); + deflater.setInput(b1.array(),b1.position(),b1.remaining()); + /** + * If we finish() stream and reset() it, Deflater start new gzip stream, this makes continuous zlib reader unhappy. + * if we remove finish(), Deflater.deflate() never flushes its output. The original zlib deflate has flush flag. I'm pretty + * sure this a kind of bug of Java library. + */ + if (inputIndex==inputs.size()) + deflater.finish(); + int len1 = 0; + do { + len1 = deflater.deflate(c1.array(),c1.position(),c1.remaining()); + if (len1>0) { + len += len1; + c1.position(c1.position()+len1); + if (c1.remaining()==0) { + c1.flip(); outputs.addLast(c1); + c1 = ByteBuffer.allocate(INFLATE_BUFSIZE); + } + } + } while (len1 >0 || !deflater.needsInput()); // &&!deflater.finished()); + } + if (c1.position()!=0) { + c1.flip(); outputs.addLast(c1); + } + deflater.reset(); + return len; + } + + /** + * gunzip byte arrays + * @param inflater + * @param inputs + * byte data[] + * @param outputs + * byte data[] + *@return number of total bytes + * @throws IOException + */ + public int unzip(Inflater inflater, LinkedList inputs, int inputIndex, LinkedList outputs,int bufSize) + throws DataFormatException { + int len=0; + ByteBuffer buf = ByteBuffer.allocate(bufSize); + while (inputIndex < inputs.size()) { + ByteBuffer input = inputs.get(inputIndex++); + inflater.setInput(input.array(),input.position(),input.limit()); +// if (inputIndex==inputs.size()) if inflater/deflater has symmetry, we need this +// inflater.end(); but this won't work + do { + int len0 = inflater.inflate(buf.array(),buf.position(),buf.remaining()); + if (len0>0) { + buf.position(buf.position()+len0); + len += len0; + if (buf.remaining()==0) { + buf.flip(); + outputs.addLast(buf); + buf = ByteBuffer.allocate(bufSize); + } + } + } while (!inflater.needsInput()); + } + if (buf.position()!=0) { + buf.flip(); + outputs.addLast(buf); + } + return len; + } + + /** + * send data to clients + * @param dataLen + * @throws IOException + * @throws DataFormatException + * + * Zlibed packet is compressed in context dependent way, that is, it have to send from the beginning. But this is + * impossible. So we have to compress it again for each clients. Separate deflater for each clients is necessary. + * + * Java's deflater does not support flush. This means to get the result, we have to finish the compression. Reseting + * start new compression, but it is not accepted well in zlib continuous reading. So we need new Encoding ZRLEE + * which reset decoder for each packet. ZRLEE can be invisible from user, but it have to be implemented in the clients. + * ZRLEE compression is not context dependent, so no recompression is necessary. + */ + void readSendData(int dataLen) throws IOException, DataFormatException { + LinkedListbufs = new LinkedList(); + ByteBuffer header = ByteBuffer.allocate(16); + readFully(header.array(),0,16); + header.limit(16); + if (header.get(0)==RfbProto.FramebufferUpdate) { + int encoding = header.getInt(12); + if (encoding==RfbProto.EncodingZRLE||encoding==RfbProto.EncodingZlib) { // ZRLEE is already recompressed + ByteBuffer len = ByteBuffer.allocate(4); + readFully(len.array(),0,4); len.limit(4); + ByteBuffer inputData = ByteBuffer.allocate(dataLen-20); + readFully(inputData.array(),0,inputData.capacity()); inputData.limit(dataLen-20); + LinkedListinputs = new LinkedList(); + inputs.add(inputData); + + header.putInt(12, RfbProto.EncodingZRLEE); // means recompress every time + // using new Deflecter every time is incompatible with the protocol, clients have to be modified. + Deflater nDeflater = deflater; // new Deflater(); + LinkedList out = new LinkedList(); + unzip(inflater, inputs, 0 , out, INFLATE_BUFSIZE); + // dump32(inputs); + int len2 = zip(nDeflater, out, 0, bufs); + ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip(); + bufs.addFirst(blen); + + bufs.addFirst(header); + multicastqueue.put(bufs); + +// is.reset(); + return ; + } + bufs.add(header); + if (dataLen>16) { + ByteBuffer b = ByteBuffer.allocate(dataLen-16); + readFully(b.array(),0,dataLen-16); b.limit(dataLen-16); + bufs.add(b); + } + multicastqueue.put(bufs); + is.reset(); + } + is.reset(); + + // It may be compressed. We can inflate here to avoid repeating clients decompressing here, + // but it may generate too many large data. It is better to do it in each client. + // But we have do inflation for all input data, so we have to do it here. + } + + void newClient(AcceptThread acceptThread, final Socket newCli, + final OutputStream os, final InputStream is) throws IOException { + // createBimgFlag = true; + // rfb.addSockTmp(newCli); + // addSock(newCli); + final int myId = clients; + final Client > c = multicastqueue.newClient(); + final AtomicInteger writerRunning = new AtomicInteger(); + writerRunning.set(1); + /** + * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory + * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running. + */ + final Runnable timer = new Runnable() { public void run() { - int c; - try { - while ((c = System.in.read()) != -1) { - sendFlag = false; - switch (c) { - default: - System.out.println("writeFramebufferUpdateRequest()"); - writeFramebufferUpdateRequest(0, 0, - framebufferWidth, framebufferHeight, false); - break; + int count = 0; + for(;;) { + long timeout = 30000/8; + try { + synchronized(this) { + int state,flag; + writerRunning.set(0); + wait(timeout); + flag = 0; + while((state=writerRunning.get())==0) { + c.poll(); // discard, should be timeout + count++; + if (flag==0) { + System.out.println("Discarding "+myId + " count="+ count); flag = 1; + } + wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow... + } + if (flag==1) System.out.println("Resuming "+myId + " count="+count); + if (state!=1) { + System.out.println("Client died "+myId); + break; + } } + } catch (InterruptedException e) { } - } catch (IOException e) { - e.printStackTrace(); } } }; + new Thread(timer).start(); + /** + * discard all incoming from clients + */ + final Runnable reader = new Runnable() { + public void run() { + byte b[] = new byte[4096]; + for(;;) { + try { + int c = is.read(b); + if (c<=0) throw new IOException(); + // System.out.println("client read "+c); + } catch (IOException e) { + try { + writerRunning.set(2); + os.close(); + is.close(); + } catch (IOException e1) { + } + return; + } + } + } + }; + /** + * send packets to a client + */ + Runnable sender = new Runnable() { + public void run() { + writerRunning.set(1); + try { + /** + * initial connection of RFB protocol + */ + sendRfbVersion(os); +// readVersionMsg(is); + readVersionMsg(is,os); + sendSecurityType(os); + readSecType(is); + sendSecResult(os); + readClientInit(is); + sendInitData(os); + new Thread(reader).start(); // discard incoming packet here after. + for (;;) { + LinkedList bufs = c.poll(); + int inputIndex = 0; + ByteBuffer header = bufs.get(inputIndex); + if (header==null) continue; + if (header.get(0)==RfbProto.FramebufferUpdate) { + // System.out.println("client "+ myId); + } + writeToClient(os, bufs, inputIndex); + writerRunning.set(1); // yes my client is awaking. + } + } catch (IOException e) { + try { + writerRunning.set(2); + os.close(); + } catch (IOException e1) { + } + /* if socket closed cliList.remove(newCli); */ + } + } - new Thread(stdin).start(); + public void writeToClient(final OutputStream os, + LinkedList bufs, int inputIndex) + throws IOException { + while(inputIndex < bufs.size()) { + ByteBuffer b = bufs.get(inputIndex++); + os.write(b.array(), b.position(), b.limit()); + } + os.flush(); + } + }; + clients++; + new Thread(sender).start(); + + } + + + public void dump32(LinkedListbufs) { + int len =0; + for(ByteBuffer b: bufs) len += b.remaining(); + ByteBuffer top = bufs.getFirst(); + ByteBuffer end = bufs.getLast(); + System.err.println("length: "+len); + System.err.print("head 0: "); + for(int i = 0; i<16 && i < top.remaining(); i++) { + System.err.print(" "+ top.get(i)); + } + System.err.print("tail 0: "); + for(int i = 0; i<16 && i < end.remaining(); i++) { + System.err.print(" "+end.get(i)); + } + System.err.println(); + } + + @Test + public void test1() { + try { + LinkedList in = new LinkedList(); + LinkedList out = new LinkedList(); + LinkedList out2 = new LinkedList(); +// if (false) { +// for(int i=0;i<10;i++) { +// in.add(ByteBuffer.wrap("test1".getBytes())); +// in.add(ByteBuffer.wrap("test2".getBytes())); +// in.add(ByteBuffer.wrap("test3".getBytes())); +// in.add(ByteBuffer.wrap("test44".getBytes())); +// } +// } else + { + String t = ""; + for(int i=0;i<10;i++) { + t += "test1"; + t += "test2"; + t += "test3"; + t += "test44"; + } + in.add(ByteBuffer.wrap(t.getBytes())); + } + + LinkedList in1 = clone(in); + + Deflater deflater = new Deflater(); + zip(deflater,in,0,out); + // LinkedList out3 = clone(out); zipped result is depend on deflator's state + unzip(inflater, out, 0,out2, INFLATE_BUFSIZE); + // inflater.reset(); + equalByteBuffers(in1, out2); + LinkedList out4 = new LinkedList(); + deflater = new Deflater(); + zip(deflater,out2,0,out4); + LinkedList out5 = new LinkedList(); + unzip(inflater,out4,0, out5, INFLATE_BUFSIZE); + int len = equalByteBuffers(in1,out5); + + System.out.println("Test Ok. "+len); + } catch (Exception e) { + assertEquals(0,1); + } + } + + private LinkedList clone(LinkedList in) { + LinkedList copy = new LinkedList(); + for(ByteBuffer b: in) { + ByteBuffer c = b.duplicate(); + copy.add(c); + } + return copy; + } + + + + public int equalByteBuffers(LinkedList in, + LinkedList out2) { + int len = 0; + Iterable i = byteBufferIterator(in); + Iterator o = byteBufferIterator(out2).iterator(); + + for(int b: i) { + len ++; + if (o.hasNext()) { + int c = o.next(); + assertEquals(b,c); + } else + assertEquals(0,1); + } + if (o.hasNext()) + assertEquals(0,1); + // System.out.println(); + return len; + } + + private Iterable byteBufferIterator(final LinkedList in) { + return new Iterable() { + public Iterator iterator() { + return new Iterator() { + int bytes = 0; + int buffers = 0; + public boolean hasNext() { + for(;;) { + if (buffers>=in.size()) return false; + ByteBuffer b = in.get(buffers); + if (! (bytes= 0 && options.compressLevel <= 9) { encodings[nEncodings++] = RfbProto.EncodingCompressLevel0