Mercurial > hg > Members > nobuyasu > tightVNCProxy
changeset 134:c3ae65fea76a
merge 129
author | Nobuyasu Oshiro <dimolto@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Tue, 30 Aug 2011 06:50:58 +0900 |
parents | acd88e63854b (diff) 0571d955da35 (current diff) |
children | d2decb2f0eb8 |
files | src/myVncProxy/ProxyVncCanvas.java src/myVncProxy/VncViewer.java src/myVncProxy/ZlibInStream.java |
diffstat | 12 files changed, 542 insertions(+), 379 deletions(-) [+] |
line wrap: on
line diff
--- a/src/myVncProxy/AcceptClient.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/AcceptClient.java Tue Aug 30 06:50:58 2011 +0900 @@ -6,215 +6,235 @@ import java.util.*; public class AcceptClient extends Thread { - int counter = 0 , parentnum = 0/*落ちたときの親の番号をカウントするためのもの*/; + int counter = 0, parentnum = 0/* 落ちたときの親の番号をカウントするためのもの */; LinkedList<String> ls = new LinkedList<String>(); - boolean runflag =false; - private String name; - boolean addrRegistor=true; + boolean runflag = false; + private String name; + boolean addrRegistor = true; public AcceptClient(String _name) { name = _name; } + public AcceptClient() { new CreateThread(this); } - - public synchronized void gethost(BufferedReader is,PrintStream os) { - String line,port; - int intv_time = 100; - String request; - int treebranch = 2;//treeの子ノードの数 - String leaderflag="0",sendleaderflag="0";//Socket send standardization String - + + // public synchronized void transferParentAddrerss(BufferedReader + // is,PrintStream os) { + public synchronized void transferParentAddrerss(BufferedReader is,PrintStream os) { + String line, port; + int intv_time = 100; + String request; + int treebranch = 2;// treeの子ノードの数 + String leaderflag = "0", sendleaderflag = "0"; // クライアントからのメッセージを待ち、受け取ったメッセージをそのまま返す - try { - while (true) { - line = is.readLine(); - port = is.readLine(); - - System.out.println("データーを受信しましたlin="+line+" port="+port); - //自分の IPADRESSを取得する - InetAddress addr = InetAddress.getLocalHost(); - String add = new String(addr.getHostAddress()); + try { + while (true) { + line = is.readLine(); + port = is.readLine(); + + System.out.println("データーを受信しましたlin=" + line + + " port=" + port); + // 自分の IPADRESSを取得する + InetAddress addr = InetAddress.getLocalHost(); + String add = new String(addr.getHostAddress()); + + if ("1".equals(line)) { + System.out.println("親が落ちましたmessage" + port); + String checkRepetition = is.readLine(); + os.println(ls.getLast()); + parentnum = (Integer.parseInt(port) - 1) + / treebranch; + String newparent = ls.get(parentnum); + + counter--; + runflag = true; + + sendleaderflag = decisionLeader( + Integer.parseInt(port), treebranch); + + Child report = new Child(); - if("1".equals(line)) { - System.out.println("親が落ちましたmessage" + port); - os.println(ls.getLast()); - parentnum = (Integer.parseInt(port) -1) / treebranch; - String hidenchild=ls.getLast(); - String newparent=ls.get(parentnum); - listupdate(port); - counter--; - runflag = true; - - if((counter-1)%treebranch==1) { // children in most young treenum have leaderflag 1 other 0 - sendleaderflag = "1"; - } else { - sendleaderflag = "0"; - } + report.reportLastNode(ls.getLast(), newparent, + port, String.valueOf(parentnum), + sendleaderflag, counter); + + listupdate(port); + + int g = 0; + for (String bs : ls) { + System.out.println(g + "番目" + bs); + g++; + } + + os.println(port); + // os.println(leaderflag); + + leaderflag = decisionLeader( + Integer.parseInt(checkRepetition), + treebranch); + + if (Integer.parseInt(checkRepetition) == counter + 1) { + checkRepetition = "stop"; + } else { + checkRepetition = "go"; + } + os.println(checkRepetition); + + // os.println(leaderflag); + Thread.sleep(intv_time); + is.close(); + os.close(); - Child report = new Child(); - - report.reportLastNode(hidenchild,newparent,port,String.valueOf(parentnum), - sendleaderflag,counter); - - os.println(port); - Thread.sleep(intv_time); - is.close(); - os.close(); + } else if ("2".equals(line)) { + parentnum = (Integer.parseInt(port) - 1) + / treebranch; + String newparent = ls.get(parentnum); + + outputStream(os, newparent, + String.valueOf(parentnum), + String.valueOf(counter), leaderflag); + + os.close(); + is.close(); + } else if ("3".equals(line)) { + String checkRepetition = is.readLine(); + System.out.println("落ちたのを確認しました"); + + os.println(ls.get(Integer.parseInt(port))); + os.println(port); + + if (checkRepetition.equals(ls.getLast())) { + checkRepetition = "stop"; + } else { + checkRepetition = "go"; + } + os.println(checkRepetition); - } else if("2".equals(line)) { - parentnum = (Integer.parseInt(port) -1) / treebranch; - String newparent=ls.get(parentnum); - - outputStream(os,newparent,String.valueOf(parentnum), - String.valueOf(counter),leaderflag); - - os.close(); - is.close(); - } else if("3".equals(line)) { - System.out.println("落ちたのを確認しました"); + System.out.println("num4=" + + ls.get(Integer.parseInt(port))); + line = null; + runflag = false; + is.close(); + os.close(); + } else { + if (addrRegistor == true) { + ls.add(add); + addrRegistor = false; + } + + if (line != null) { + addClientAdress(line, ls); + counter++; + } else { + break; + } - os.println(ls.get(Integer.parseInt(port))); - os.println(port); - int g = 0; - for(String bs: ls){ - System.out.println(g+"番目"+bs); - g++; + if (counter >= treebranch + 1) { + + leaderflag = decisionLeader(counter, treebranch); + parentnum = (counter - 1) / treebranch; + + request = ls.get(parentnum); + System.out.println(parentnum); + + outputStream(os, request, + String.valueOf(parentnum), + String.valueOf(counter), leaderflag); + + checkParameter(parentnum, counter, leaderflag); + } else { + // treeの親ノードに接続する人に接続する人を教える + outputStream(os, add, "0", + String.valueOf(counter), leaderflag); + } + Thread.sleep(intv_time); } - System.out.println("num4=" + ls.get(Integer.parseInt(port))); - line=null; - runflag = false; - is.close(); - os.close(); - } else { - if(addrRegistor==true){ - ls.add(add); - addrRegistor = false; - } - System.out.println(parentnum); - if(line != null) { - arg(line,ls); - counter++; - } else { - break; - } + } + + } catch (IOException e) { + System.out.println(e); + } - if(counter>=treebranch+1) { - if((counter-1)%treebranch==0) { - leaderflag = "1"; - } else { - leaderflag = "0"; - } - - parentnum = (counter - 1) / treebranch; - // request = [p-1]; - request = ls.get(parentnum); - - outputStream(os,request,String.valueOf(parentnum), - String.valueOf(counter),leaderflag); - - checkParameter(parentnum,counter,leaderflag); - } else { - //treeの親ノードに接続する人に接続する人を教える - outputStream(os,name,"0",String.valueOf(counter), - leaderflag); - } - Thread.sleep(intv_time); - } + catch (InterruptedException e) { + e.printStackTrace(); } - - } catch (IOException e) { - System.out.println(e); - } - - catch(InterruptedException e) { - e.printStackTrace(); - } - /* - try{ - echoServer.close(); - } - catch (IOException e){ - System.out.println(e); - } - */ - //} comment out while + + } - + /** * @param port - * parent value + * parent value */ void listupdate(String port) { ls.remove(Integer.parseInt(port)); - ls.add(Integer.parseInt(port),ls.getLast()); + ls.add(Integer.parseInt(port), ls.getLast()); ls.removeLast(); } - void outputStream(PrintStream os,String request,String parentnum,String treenum,String leaderflag) { + void outputStream(PrintStream os, String request, String parentnum, + String treenum, String leaderflag) { os.println(request); os.println(parentnum); os.println(treenum); os.println(leaderflag); } - - void checkParameter(int parent,int counter,String leaderflag) { - System.out.println("pの値="+parentnum); - System.out.println("iの値="+counter); - System.out.println("leaderflag="+leaderflag + "\n"); + + void checkParameter(int parent, int counter, String leaderflag) { + System.out.println("pの値=" + parentnum); + System.out.println("iの値=" + counter); + System.out.println("leaderflag=" + leaderflag + "\n"); } - - void arg(String line,LinkedList<String> ls) { - if(line != null){ + + void addClientAdress(String line, LinkedList<String> ls) { + int g = 0; + if (line != null) { ls.add(line); } - int g=0; - - for(String bs: ls){ - System.out.println(g+"番目"+bs); + for (String bs : ls) { + System.out.println(g + "番目" + bs); g++; } } - + + String decisionLeader(int counter, int treebranch) { + if ((counter - 1) % treebranch == 1) { // children in most young treenum + // have leaderflag 1 other 0 + return "0"; + } else { + return "1"; + } + } + } -/* -class sock { - void arg(String line,LinkedList<String> ls) { - if(line != null){ - ls.add(line); - } - int g=0; +class Child { - for(String bs: ls){ - System.out.println(g+"番目"+bs); - g++; - } - } -} -*/ -class Child{ - - void reportLastNode(String hiddenchild,String newparent,String newtreenum,String newpnum,String newleaderflag,int i) throws IOException{ + void reportLastNode(String hiddenchild, String newparent, + String newtreenum, String newpnum, String newleaderflag, int i) + throws IOException { try { Socket echoSocket; System.out.println(hiddenchild + "に接続します"); - echoSocket = new Socket(hiddenchild, 10001 + (i + 1));//i+1は実験中に同じマシーンを使っていたのでportを変えて対応、本番時には取り除く予定。 + echoSocket = new Socket(hiddenchild, 10001 + (i + 1));// i+1は実験中に同じマシーンを使っていたのでportを変えて対応、本番時には取り除く予定。 - DataOutputStream os = new DataOutputStream(echoSocket.getOutputStream()); + DataOutputStream os = new DataOutputStream( + echoSocket.getOutputStream()); - os.writeBytes(newparent+"\n"); - os.writeBytes(newpnum+"\n"); - os.writeBytes(newtreenum+"\n"); - os.writeBytes(newleaderflag+"\n"); - + os.writeBytes(newparent + "\n"); + os.writeBytes(newpnum + "\n"); + os.writeBytes(newtreenum + "\n"); + os.writeBytes(newleaderflag + "\n"); + os.close(); } catch (UnknownHostException e) { System.err.println("Don't know about host: localhost"); } catch (IOException e) { - System.err.println("Couldn't get I/O for the connection to: localhost"); + System.err + .println("Couldn't get I/O for the connection to: localhost"); } }
--- a/src/myVncProxy/AcceptThread.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/AcceptThread.java Tue Aug 30 06:50:58 2011 +0900 @@ -1,4 +1,4 @@ -package myVncProxy; + package myVncProxy; import java.net.Socket; import java.io.IOException; import java.io.InputStream;
--- a/src/myVncProxy/CreateThread.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/CreateThread.java Tue Aug 30 06:50:58 2011 +0900 @@ -4,47 +4,79 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; +import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; - - public class CreateThread implements Runnable { ServerSocket echoServer; AcceptClient acceptClient; + int port; + public CreateThread(AcceptClient _acc) { + acceptClient = _acc; + port = 9999; + } + + public CreateThread(AcceptClient _acc , int port) { + acceptClient = _acc; + this.port = port; + } - public CreateThread(AcceptClient _acc){ - acceptClient = _acc; + void newEchoClient(final BufferedReader is,final PrintStream os) { + Runnable echoSender = new Runnable() { + public void run() { + acceptClient.transferParentAddrerss(is,os); + } + }; + new Thread(echoSender).start(); + } + + void selectPort(int p) { + int port = p; + while (true) { + try { + initServSock(port); + break; + } catch (BindException e) { + port++; + continue; + } catch (IOException e) { + + } + } + System.out.println("accept Echo port = " + port); + } + + void initServSock(int port) throws IOException { + echoServer = new ServerSocket(port); + this.port = port; } public void run() { + selectPort(port); while (true) { - try { - echoServer = new ServerSocket(9999); - } - catch (IOException e) { +// echoServer = new ServerSocket(9999); + Socket clientSocket = echoServer.accept(); + BufferedReader is = new BufferedReader(new InputStreamReader( + clientSocket.getInputStream())); + PrintStream os = new PrintStream(clientSocket.getOutputStream()); + newEchoClient(is,os); +// acceptClient.transferParentAddrerss(is, os); + } catch (IOException e) { System.out.println(e); } +/* try { - Socket clientSocket = echoServer.accept(); - BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); - PrintStream os = new PrintStream(clientSocket.getOutputStream()); - acceptClient.gethost(is,os); - } catch (IOException e){ - e.printStackTrace(); + echoServer.close(); + } catch (IOException e) { System.out.println(e); } - try { - echoServer.close(); - } - catch (IOException e){ - System.out.println(e); - } - +*/ + } }
--- a/src/myVncProxy/MulticastQueue.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/MulticastQueue.java Tue Aug 30 06:50:58 2011 +0900 @@ -33,15 +33,15 @@ node = tail; } - public T poll() + synchronized public T poll() { Node<T> next = null; - T item; + T item = null; do { try { next = node.next(); }catch(InterruptedException _e){ - _e.printStackTrace(); + continue; } item = node.getItem(); node = next;
--- a/src/myVncProxy/MyRfbProto.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/MyRfbProto.java Tue Aug 30 06:50:58 2011 +0900 @@ -26,14 +26,15 @@ import myVncProxy.MulticastQueue.Client; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; import java.io.OutputStream; public -class MyRfbProto<ByteBuffersIterator> extends RfbProto { +class MyRfbProto extends RfbProto { final static String versionMsg_3_998 = "RFB 003.998\n"; /** * CheckMillis is one of new msgType for RFB 3.998. @@ -51,13 +52,13 @@ private int rectH; private int encoding; private int zLen; + private boolean clicomp = false; private ServerSocket servSock; private int acceptPort; private byte initData[]; private LinkedList<Socket> cliListTmp; private LinkedList<Socket> cliList; - private LinkedList<Thread> sendThreads; boolean createBimgFlag; ExecutorService executor; @@ -68,29 +69,24 @@ private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>(); private int clients = 0; private Inflater inflater = new Inflater(); - + private Deflater deflater = new Deflater(); + private CreateThread geth; + public MyRfbProto() throws IOException { } MyRfbProto(String h, int p, VncViewer v) throws IOException { super(h, p, v); - cliList = new LinkedList<Socket>(); - cliListTmp = new LinkedList<Socket>(); - createBimgFlag = false; - // sendThreads = new LinkedList<Thread>(); - // executor = Executors.newCachedThreadPool(); - // executor = Executors.newSingleThreadExecutor(); } + MyRfbProto(String h, int p, CreateThread geth) throws IOException { + super(h, p); + this.geth = geth; + } + MyRfbProto(String h, int p) throws IOException { super(h, p); - cliList = new LinkedList<Socket>(); - cliListTmp = new LinkedList<Socket>(); - createBimgFlag = false; - // sendThreads = new LinkedList<Thread>(); - // executor = Executors.newCachedThreadPool(); - // executor = Executors.newSingleThreadExecutor(); } // over write @@ -121,7 +117,7 @@ acceptPort = port; } - // 5550を開けるが、開いてないなら+1のポートを開ける。 + // 5999を開けるが、開いてないなら+1のポートを開ける。 void selectPort(int p) { int port = p; while (true) { @@ -213,7 +209,7 @@ os.write(versionMsg_3_998.getBytes()); } - void readVersionMsg(InputStream is) throws IOException { + void readVersionMsg(InputStream is, OutputStream os) throws IOException { byte[] b = new byte[12]; @@ -236,6 +232,16 @@ "RFB server does not support protocol version 3"); } + if (serverMinor == 998) { + sendPortNumber(os); + } + + } + + void sendPortNumber(OutputStream os) throws IOException { + byte[] b = new byte[4]; + b = castIntByte(geth.port); + os.write(b); } void sendSecurityType(OutputStream os) throws IOException { @@ -309,40 +315,13 @@ rectW = readU16(); // 8 rectH = readU16(); // 10 encoding = readU32(); // 12 - System.out.println("encoding = "+encoding); - if (encoding == EncodingZRLE) +// System.out.println("encoding = "+encoding); + if (encoding == EncodingZRLE|| encoding==EncodingZRLEE||encoding==EncodingZlib) zLen = readU32(); else zLen = 0; is.reset(); -/* - int dataLen; - switch (encoding) { - case RfbProto.EncodingRaw: - dataLen = rectW * rectH * 4 + 16; - mark(dataLen); - break; - case RfbProto.EncodingCopyRect: - dataLen = 16 + 4; - mark(dataLen); - break; - case RfbProto.EncodingRRE: - case RfbProto.EncodingCoRRE: - case RfbProto.EncodingHextile: - - case RfbProto.EncodingZlib: - case RfbProto.EncodingTight: - case RfbProto.EncodingZRLE: - dataLen = zLen + 20; - mark(dataLen); - break; - default: - dataLen = 1000000; - mark(dataLen); - } - -*/ - + } int checkAndMark() throws IOException { @@ -350,23 +329,24 @@ switch (encoding) { case RfbProto.EncodingRaw: dataLen = rectW * rectH * 4 + 16; - is.mark(dataLen); +// is.mark(dataLen); break; case RfbProto.EncodingCopyRect: dataLen = 16 + 4; - is.mark(dataLen); +// is.mark(dataLen); break; case RfbProto.EncodingRRE: case RfbProto.EncodingCoRRE: case RfbProto.EncodingHextile: case RfbProto.EncodingTight: dataLen = zLen + 20; - is.mark(dataLen); +// is.mark(dataLen); break; case RfbProto.EncodingZlib: case RfbProto.EncodingZRLE: + case RfbProto.EncodingZRLEE: dataLen = zLen + 20; - is.mark(dataLen); +// is.mark(dataLen); break; case RfbProto.EncodingXCursor: case RfbProto.EncodingRichCursor: @@ -374,11 +354,11 @@ int u8Array = (int)Math.floor((rectW + 7)/8) * rectH; dataLen = pixArray + u8Array; printFramebufferUpdate(); - is.mark(dataLen); +// is.mark(dataLen); break; default: dataLen = 1000000; - is.mark(dataLen); +// is.mark(dataLen); } return dataLen; } @@ -386,6 +366,7 @@ void sendDataToClient() throws Exception { regiFramebufferUpdate(); + printFramebufferUpdate(); int dataLen = checkAndMark(); readSendData(dataLen); } @@ -439,12 +420,7 @@ BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(pngBytes)); return bimg; } -/* - void readPngData() throws IOException { - pngBytes = new byte[is.available()]; - readFully(pngBytes); - } -*/ + void printFramebufferUpdate() { System.out.println("messageType=" + messageType); @@ -460,6 +436,10 @@ default: } } + int returnMsgtype() { + return messageType; + } + void readSpeedCheck() throws IOException { byte[] b = new byte[1]; @@ -483,9 +463,6 @@ System.out.println("checkMillis: " + time); } - void printStatus() { - System.out.println(); - } synchronized void changeStatusFlag() { printStatusFlag = true; @@ -535,24 +512,35 @@ * @throws IOException */ public int zip(Deflater deflater,LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException { - int len1=0,len = 0; - deflater.reset(); - while(inputIndex < inputs.size()) { + int len = 0; + ByteBuffer c1= ByteBuffer.allocate(INFLATE_BUFSIZE); + while(inputIndex < inputs.size() ) { ByteBuffer b1 = inputs.get(inputIndex++); - deflater.setInput(b1.array(),b1.position(),b1.limit()); - if (inputs.size()==0) { + deflater.setInput(b1.array(),b1.position(),b1.remaining()); + /** + * If we finish() stream and reset() it, Deflater start new gzip stream, this makes continuous zlib reader unhappy. + * if we remove finish(), Deflater.deflate() never flushes its output. The original zlib deflate has flush flag. I'm pretty + * sure this a kind of bug of Java library. + */ + if (inputIndex==inputs.size()) deflater.finish(); - } + int len1 = 0; do { - ByteBuffer c1 = ByteBuffer.allocate(INFLATE_BUFSIZE); - len1 = deflater.deflate(c1.array(),c1.position(),c1.capacity()); - c1.limit(len1); + len1 = deflater.deflate(c1.array(),c1.position(),c1.remaining()); if (len1>0) { - outputs.addLast(c1); len += len1; + c1.position(c1.position()+len1); + if (c1.remaining()==0) { + c1.flip(); outputs.addLast(c1); + c1 = ByteBuffer.allocate(INFLATE_BUFSIZE); + } } - } while (len1==INFLATE_BUFSIZE); - } + } while (len1 >0 || !deflater.needsInput()); // &&!deflater.finished()); + } + if (c1.position()!=0) { + c1.flip(); outputs.addLast(c1); + } + deflater.reset(); return len; } @@ -566,27 +554,49 @@ *@return number of total bytes * @throws IOException */ - public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs, LinkedList<ByteBuffer> outputs) + public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs,int bufSize) throws DataFormatException { - int len=0,len0; - // inflater.reset(); - int inputIndex = 0; - do { + int len=0; + ByteBuffer buf = ByteBuffer.allocate(bufSize); + while (inputIndex < inputs.size()) { ByteBuffer input = inputs.get(inputIndex++); - inflater.setInput(input.array(),0,input.limit()); + inflater.setInput(input.array(),input.position(),input.limit()); +// if (inputIndex==inputs.size()) if inflater/deflater has symmetry, we need this +// inflater.end(); but this won't work do { - ByteBuffer buf = ByteBuffer.allocate(INFLATE_BUFSIZE); - len0 = inflater.inflate(buf.array(),0,buf.capacity()); - buf.limit(len0); - len += len0; + int len0 = inflater.inflate(buf.array(),buf.position(),buf.remaining()); if (len0>0) { - outputs.addLast(buf); + buf.position(buf.position()+len0); + len += len0; + if (buf.remaining()==0) { + buf.flip(); + outputs.addLast(buf); + buf = ByteBuffer.allocate(bufSize); + } } - } while (len0>0); - } while (inputIndex < inputs.size()) ; + } while (!inflater.needsInput()); + } + if (buf.position()!=0) { + buf.flip(); + outputs.addLast(buf); + } return len; } + /** + * send data to clients + * @param dataLen + * @throws IOException + * @throws DataFormatException + * + * Zlibed packet is compressed in context dependent way, that is, it have to send from the beginning. But this is + * impossible. So we have to compress it again for each clients. Separate deflater for each clients is necessary. + * + * Java's deflater does not support flush. This means to get the result, we have to finish the compression. Reseting + * start new compression, but it is not accepted well in zlib continuous reading. So we need new Encoding ZRLEE + * which reset decoder for each packet. ZRLEE can be invisible from user, but it have to be implemented in the clients. + * ZRLEE compression is not context dependent, so no recompression is necessary. + */ void readSendData(int dataLen) throws IOException, DataFormatException { LinkedList<ByteBuffer>bufs = new LinkedList<ByteBuffer>(); ByteBuffer header = ByteBuffer.allocate(16); @@ -594,20 +604,30 @@ header.limit(16); if (header.get(0)==RfbProto.FramebufferUpdate) { int encoding = header.getInt(12); - if (encoding==RfbProto.EncodingZlib||encoding==RfbProto.EncodingZRLE) { + if (encoding==RfbProto.EncodingZRLE||encoding==RfbProto.EncodingZlib) { // ZRLEE is already recompressed ByteBuffer len = ByteBuffer.allocate(4); readFully(len.array(),0,4); len.limit(4); ByteBuffer inputData = ByteBuffer.allocate(dataLen-20); readFully(inputData.array(),0,inputData.capacity()); inputData.limit(dataLen-20); LinkedList<ByteBuffer>inputs = new LinkedList<ByteBuffer>(); inputs.add(inputData); - unzip(inflater, inputs, bufs); + + header.putInt(12, RfbProto.EncodingZRLEE); // means recompress every time + // using new Deflecter every time is incompatible with the protocol, clients have to be modified. + Deflater nDeflater = deflater; // new Deflater(); + LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>(); + unzip(inflater, inputs, 0 , out, INFLATE_BUFSIZE); + // dump32(inputs); + int len2 = zip(nDeflater, out, 0, bufs); + ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip(); + bufs.addFirst(blen); + bufs.addFirst(header); multicastqueue.put(bufs); - is.reset(); + +// is.reset(); return ; } - } bufs.add(header); if (dataLen>16) { ByteBuffer b = ByteBuffer.allocate(dataLen-16); @@ -616,6 +636,8 @@ } multicastqueue.put(bufs); is.reset(); + } + is.reset(); // It may be compressed. We can inflate here to avoid repeating clients decompressing here, // but it may generate too many large data. It is better to do it in each client. @@ -627,81 +649,117 @@ // createBimgFlag = true; // rfb.addSockTmp(newCli); // addSock(newCli); + final int myId = clients; final Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient(); + final AtomicInteger writerRunning = new AtomicInteger(); + writerRunning.set(1); + /** + * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory + * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running. + */ + final Runnable timer = new Runnable() { + public void run() { + int count = 0; + for(;;) { + long timeout = 30000/8; + try { + synchronized(this) { + int state,flag; + writerRunning.set(0); + wait(timeout); + flag = 0; + while((state=writerRunning.get())==0) { + c.poll(); // discard, should be timeout + count++; + if (flag==0) { + System.out.println("Discarding "+myId + " count="+ count); flag = 1; + } + wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow... + } + if (flag==1) System.out.println("Resuming "+myId + " count="+count); + if (state!=1) { + System.out.println("Client died "+myId); + break; + } + } + } catch (InterruptedException e) { + } + } + } + }; + new Thread(timer).start(); + /** + * discard all incoming from clients + */ + final Runnable reader = new Runnable() { + public void run() { + byte b[] = new byte[4096]; + for(;;) { + try { + int c = is.read(b); + if (c<=0) throw new IOException(); + // System.out.println("client read "+c); + } catch (IOException e) { + try { + writerRunning.set(2); + os.close(); + is.close(); + } catch (IOException e1) { + } + return; + } + } + } + }; + /** + * send packets to a client + */ Runnable sender = new Runnable() { public void run() { - - Deflater deflater = new Deflater(); + writerRunning.set(1); try { /** * initial connection of RFB protocol */ sendRfbVersion(os); - readVersionMsg(is); +// readVersionMsg(is); + readVersionMsg(is,os); sendSecurityType(os); readSecType(is); sendSecResult(os); readClientInit(is); sendInitData(os); - - Runnable reader = new Runnable() { - public void run() { - byte b[] = new byte[4096]; - for(;;) { - try { - int c = is.read(b); - if (c<=0) throw new IOException(); - System.out.println("client read "+c); - } catch (IOException e) { - try { - os.close(); - is.close(); - } catch (IOException e1) { - } - return; - } - } - } - }; - new Thread(reader).start(); - + new Thread(reader).start(); // discard incoming packet here after. for (;;) { LinkedList<ByteBuffer> bufs = c.poll(); int inputIndex = 0; - ByteBuffer header = bufs.get(inputIndex++); + ByteBuffer header = bufs.get(inputIndex); if (header==null) continue; if (header.get(0)==RfbProto.FramebufferUpdate) { - System.out.println("client "+ clients); - int encoding = header.getInt(12); - if (encoding==RfbProto.EncodingZlib||encoding==RfbProto.EncodingZRLE) { - LinkedList<ByteBuffer> outs = new LinkedList<ByteBuffer>(); - int len2 = zip(deflater, bufs, inputIndex, outs); - ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip(); - outs.addFirst(blen); - outs.addFirst(header); - while(!outs.isEmpty()) { - ByteBuffer out= outs.poll(); - os.write(out.array(),out.position(),out.limit()); - } - } - os.flush(); - continue; + // System.out.println("client "+ myId); } - os.write(header.array(),header.position(),header.limit()); - while(inputIndex < bufs.size()) { - ByteBuffer b = bufs.get(inputIndex++); - os.write(b.array(), b.position(), b.limit()); - } - os.flush(); + writeToClient(os, bufs, inputIndex); + writerRunning.set(1); // yes my client is awaking. } } catch (IOException e) { try { + writerRunning.set(2); os.close(); } catch (IOException e1) { } /* if socket closed cliList.remove(newCli); */ } } + + public void writeToClient(final OutputStream os, + LinkedList<ByteBuffer> bufs, int inputIndex) + throws IOException { + while(inputIndex < bufs.size()) { + ByteBuffer b = bufs.get(inputIndex++); + os.write(b.array(), b.position(), b.limit()); + } + os.flush(); + } }; clients++; new Thread(sender).start(); @@ -709,6 +767,22 @@ } + public void dump32(LinkedList<ByteBuffer>bufs) { + int len =0; + for(ByteBuffer b: bufs) len += b.remaining(); + ByteBuffer top = bufs.getFirst(); + ByteBuffer end = bufs.getLast(); + System.err.println("length: "+len); + System.err.print("head 0: "); + for(int i = 0; i<16 && i < top.remaining(); i++) { + System.err.print(" "+ top.get(i)); + } + System.err.print("tail 0: "); + for(int i = 0; i<16 && i < end.remaining(); i++) { + System.err.print(" "+end.get(i)); + } + System.err.println(); + } @Test public void test1() { @@ -716,26 +790,41 @@ LinkedList<ByteBuffer> in = new LinkedList<ByteBuffer>(); LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>(); LinkedList<ByteBuffer> out2 = new LinkedList<ByteBuffer>(); - for(int i=0;i<10;i++) { - in.add(ByteBuffer.wrap("test1".getBytes())); - in.add(ByteBuffer.wrap("test2".getBytes())); - in.add(ByteBuffer.wrap("test3".getBytes())); - in.add(ByteBuffer.wrap("test4".getBytes())); +// if (false) { +// for(int i=0;i<10;i++) { +// in.add(ByteBuffer.wrap("test1".getBytes())); +// in.add(ByteBuffer.wrap("test2".getBytes())); +// in.add(ByteBuffer.wrap("test3".getBytes())); +// in.add(ByteBuffer.wrap("test44".getBytes())); +// } +// } else + { + String t = ""; + for(int i=0;i<10;i++) { + t += "test1"; + t += "test2"; + t += "test3"; + t += "test44"; + } + in.add(ByteBuffer.wrap(t.getBytes())); } + LinkedList<ByteBuffer> in1 = clone(in); Deflater deflater = new Deflater(); zip(deflater,in,0,out); // LinkedList<ByteBuffer> out3 = clone(out); zipped result is depend on deflator's state - unzip(inflater, out, out2); + unzip(inflater, out, 0,out2, INFLATE_BUFSIZE); + // inflater.reset(); equalByteBuffers(in1, out2); LinkedList<ByteBuffer> out4 = new LinkedList<ByteBuffer>(); + deflater = new Deflater(); zip(deflater,out2,0,out4); LinkedList<ByteBuffer> out5 = new LinkedList<ByteBuffer>(); - unzip(inflater,out4,out5); - equalByteBuffers(in1,out5); + unzip(inflater,out4,0, out5, INFLATE_BUFSIZE); + int len = equalByteBuffers(in1,out5); - System.out.println("Test Ok."); + System.out.println("Test Ok. "+len); } catch (Exception e) { assertEquals(0,1); } @@ -752,20 +841,24 @@ - public void equalByteBuffers(LinkedList<ByteBuffer> in, + public int equalByteBuffers(LinkedList<ByteBuffer> in, LinkedList<ByteBuffer> out2) { + int len = 0; Iterable<Byte> i = byteBufferIterator(in); Iterator<Byte> o = byteBufferIterator(out2).iterator(); for(int b: i) { + len ++; if (o.hasNext()) { int c = o.next(); assertEquals(b,c); } else assertEquals(0,1); } - if (o.hasNext()) assertEquals(0,1); + if (o.hasNext()) + assertEquals(0,1); // System.out.println(); + return len; } private Iterable<Byte> byteBufferIterator(final LinkedList<ByteBuffer> in) { @@ -775,12 +868,16 @@ int bytes = 0; int buffers = 0; public boolean hasNext() { - if (buffers>=in.size()) return false; - ByteBuffer b = in.getFirst(); - return bytes<b.remaining(); + for(;;) { + if (buffers>=in.size()) return false; + ByteBuffer b = in.get(buffers); + if (! (bytes<b.remaining())) { + buffers ++; bytes=0; + } else return true; + } } public Byte next() { - ByteBuffer bf =in.get(buffers); + ByteBuffer bf =in.get(buffers); byte b = bf.get(bytes++); if (bf.remaining()<=bytes) { buffers++;
--- a/src/myVncProxy/OptionsNoFrame.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/OptionsNoFrame.java Tue Aug 30 06:50:58 2011 +0900 @@ -74,10 +74,16 @@ // Constructor. Set up the labels and choices from the names and values // arrays. // - +/* OptionsNoFrame(VncProxyService v) { - viewer = v; - + viewer = v; + preferredEncoding = -1; + } +*/ +// OptionsNoFrame(VncProxyService v, VncCanvas vc) { + OptionsNoFrame(VncProxyService v) { + viewer = v; + for (int i = 0; i < names.length; i++) { labels[i] = new Label(names[i]);
--- a/src/myVncProxy/ProxyVncCanvas.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/ProxyVncCanvas.java Tue Aug 30 06:50:58 2011 +0900 @@ -3,12 +3,8 @@ import java.awt.event.*; import java.awt.image.*; import java.io.*; -import java.lang.*; -import java.nio.ByteBuffer; import java.util.zip.*; -import java.net.Socket; - import javax.imageio.ImageIO; // @@ -105,7 +101,7 @@ for (int i = 0; i < 256; i++) colors[i] = new Color(cm8.getRGB(i)); -// setPixelFormat(); + setPixelFormat(); inputEnabled = false; // Keyboard listener is enabled even in view-only mode, to catch @@ -215,7 +211,7 @@ } public void setPixelFormat() throws IOException { -/* + if (viewer.options.eightBitColors) { rfb.writeSetPixelFormat(8, 8, false, true, 7, 7, 3, 0, 3, 6); bytesPixel = 1; @@ -224,7 +220,7 @@ 0); bytesPixel = 4; } -*/ + updateFramebufferSize(); } @@ -363,7 +359,8 @@ long count = 0; while (true) { - System.out.println("\ncount=" + count); + // System.out.println("\ncount=" + count); + count++; @@ -371,6 +368,14 @@ * read Data from parents and send Data to Client. */ rfb.sendDataToClient(); + + if(rfb.returnMsgtype() == RfbProto.FramebufferUpdate ) { + boolean fullUpdateNeeded = false; + int w = rfb.framebufferWidth; + int h = rfb.framebufferHeight; + rfb.writeFramebufferUpdateRequest(0, 0, w, h, !fullUpdateNeeded); + continue; + } long numBytesRead = rfb.getNumBytesRead(); @@ -381,10 +386,13 @@ switch (msgType) { case MyRfbProto.SpeedCheckMillis: rfb.readSpeedCheck(); - break; case RfbProto.FramebufferUpdate: - + if(msgType == RfbProto.FramebufferUpdate){ + rfb.is.reset(); + break; + } + if (statNumUpdates == viewer.debugStatsExcludeUpdates && !statsRestarted) { resetStats(); @@ -452,6 +460,7 @@ handleHextileRect(rx, ry, rw, rh); break; case RfbProto.EncodingZRLE: + case RfbProto.EncodingZRLEE: statNumRectsZRLE++; handleZRLERect(rx, ry, rw, rh); break; @@ -469,13 +478,6 @@ rfb.stopTiming(); - /** - * connection speed. - */ - long kbitsPerSecond = rfb.kbitsPerSecond(); - System.out.println("kbitsPerSecond = " + kbitsPerSecond); - - statNumPixelRects++; statNumBytesDecoded += rw * rh * bytesPixel; statNumBytesEncoded += (int) (rfb.getNumBytesRead() - numBytesReadBefore); @@ -504,9 +506,6 @@ viewer.autoSelectEncodings(); - - - // Before requesting framebuffer update, check if the pixel // format should be changed. /* @@ -538,9 +537,11 @@ default: throw new Exception("Unknown RFB message type " + msgType); } + + int bufSize = (int)(rfb.getNumBytesRead() - numBytesRead); -// System.out.println("bufSize="+bufSize); + System.out.println("bufSize="+bufSize); // rfb.bufResetSend(bufSize);
--- a/src/myVncProxy/RfbProto.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/RfbProto.java Tue Aug 30 06:50:58 2011 +0900 @@ -26,12 +26,9 @@ // import java.io.*; -import java.awt.*; import java.awt.event.*; import java.net.Socket; -import java.net.ServerSocket; import java.util.zip.*; -import java.nio.*; class RfbProto { @@ -79,7 +76,7 @@ // Supported encodings and pseudo-encodings final static int EncodingRaw = 0, EncodingCopyRect = 1, EncodingRRE = 2, EncodingCoRRE = 4, EncodingHextile = 5, EncodingZlib = 6, - EncodingTight = 7, EncodingZRLE = 16, + EncodingTight = 7, EncodingZRLEE = 15, EncodingZRLE = 16, EncodingCompressLevel0 = 0xFFFFFF00, EncodingQualityLevel0 = 0xFFFFFFE0, EncodingXCursor = 0xFFFFFF10, EncodingRichCursor = 0xFFFFFF11, EncodingPointerPos = 0xFFFFFF18, @@ -88,6 +85,7 @@ SigEncodingCopyRect = "COPYRECT", SigEncodingRRE = "RRE_____", SigEncodingCoRRE = "CORRE___", SigEncodingHextile = "HEXTILE_", SigEncodingZlib = "ZLIB____", SigEncodingTight = "TIGHT___", + SigEncodingZRLEE = "ZRLEE___", SigEncodingZRLE = "ZRLE____", SigEncodingCompressLevel0 = "COMPRLVL", SigEncodingQualityLevel0 = "JPEGQLVL", @@ -295,7 +293,7 @@ protocolTightVNC = false; initCapabilities(); } - + // // Negotiate the authentication scheme. // @@ -471,7 +469,9 @@ encodingCaps.add(EncodingHextile, StandardVendor, SigEncodingHextile, "Standard Hextile encoding"); encodingCaps.add(EncodingZRLE, StandardVendor, SigEncodingZRLE, - "Standard ZRLE encoding"); + "Standard ZRLE encoding"); + encodingCaps.add(EncodingZRLEE, StandardVendor, SigEncodingZRLEE, + "Standard ZRLE(E) encoding"); encodingCaps.add(EncodingZlib, TridiaVncVendor, SigEncodingZlib, "Zlib encoding"); encodingCaps.add(EncodingTight, TightVncVendor, SigEncodingTight, @@ -726,6 +726,7 @@ if (updateRectEncoding == EncodingZlib || updateRectEncoding == EncodingZRLE + || updateRectEncoding == EncodingZRLEE || updateRectEncoding == EncodingTight) wereZlibUpdates = true; @@ -958,6 +959,8 @@ final static int META_MASK = InputEvent.META_MASK; final static int ALT_MASK = InputEvent.ALT_MASK; + + // // Write a pointer event message. We may need to send modifier key events // around it to set the correct modifier state.
--- a/src/myVncProxy/VncCanvas.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/VncCanvas.java Tue Aug 30 06:50:58 2011 +0900 @@ -26,11 +26,8 @@ import java.awt.event.*; import java.awt.image.*; import java.io.*; -import java.lang.*; -import java.nio.ByteBuffer; import java.util.zip.*; -import java.net.Socket; import javax.imageio.ImageIO; @@ -478,6 +475,7 @@ handleHextileRect(rx, ry, rw, rh); break; case RfbProto.EncodingZRLE: + case RfbProto.EncodingZRLEE: statNumRectsZRLE++; handleZRLERect(rx, ry, rw, rh); break; @@ -889,7 +887,7 @@ void handleZRLERect(int x, int y, int w, int h) throws Exception { if (noZRLEdecode) return; - if (zrleInStream == null) + if (zrleInStream == null || rfb.updateRectEncoding==RfbProto.EncodingZRLEE) zrleInStream = new ZlibInStream(); // System.out.println("zrleInStream.end="+zrleInStream.inflater.off);
--- a/src/myVncProxy/VncProxyService.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/VncProxyService.java Tue Aug 30 06:50:58 2011 +0900 @@ -64,6 +64,9 @@ int debugStatsExcludeUpdates; int debugStatsMeasureUpdates; + int echoPort = 9999; + + void checkArgs(String[] argv){ if(argv.length > 3){ @@ -199,7 +202,7 @@ void connectAndAuthenticate() throws Exception { acc = new AcceptClient(mainArgs[0]); - geth = new CreateThread(acc); + geth = new CreateThread(acc , echoPort); Thread thread = new Thread(geth); thread.start(); @@ -209,7 +212,9 @@ showConnectionStatus("Connecting to " + host + ", port " + port + "..."); // rfb = new RfbProto(host, port, this); - rfb = new MyRfbProto(host, port); +// rfb = new MyRfbProto(host, port); + rfb = new MyRfbProto(host, port, geth); + showConnectionStatus("Connected to server"); rfb.readVersionMsg(); @@ -355,12 +360,14 @@ encodings[nEncodings++] = preferredEncoding; + if (options.useCopyRect) { encodings[nEncodings++] = RfbProto.EncodingCopyRect; } if (preferredEncoding != RfbProto.EncodingTight) { encodings[nEncodings++] = RfbProto.EncodingTight; } + if (preferredEncoding != RfbProto.EncodingZRLE) { encodings[nEncodings++] = RfbProto.EncodingZRLE; } @@ -370,14 +377,13 @@ if (preferredEncoding != RfbProto.EncodingZlib) { encodings[nEncodings++] = RfbProto.EncodingZlib; } - /* if (preferredEncoding != RfbProto.EncodingCoRRE) { encodings[nEncodings++] = RfbProto.EncodingCoRRE; } if (preferredEncoding != RfbProto.EncodingRRE) { encodings[nEncodings++] = RfbProto.EncodingRRE; } - +/* if (options.compressLevel >= 0 && options.compressLevel <= 9) { encodings[nEncodings++] = RfbProto.EncodingCompressLevel0 + options.compressLevel;
--- a/src/myVncProxy/VncViewer.java Thu Aug 04 15:11:30 2011 +0900 +++ b/src/myVncProxy/VncViewer.java Tue Aug 30 06:50:58 2011 +0900 @@ -136,7 +136,7 @@ rfbThread = new Thread(this); rfbThread.start(); - accThread = new Thread(new AcceptThread(rfb, 5999)); + accThread = new Thread(new AcceptThread(rfb)); accThread.start(); }