# HG changeset patch # User one # Date 1344309302 -32400 # Node ID b32668b8e83ca33d5f835926a9e7ce7a7fc364bb # Parent e7ce2b2ffed8e1666f5224a054eecac9c408607b create multicast function diff -r e7ce2b2ffed8 -r b32668b8e83c src/main/java/ac/ryukyu/treevnc/MulticastQueue.java --- a/src/main/java/ac/ryukyu/treevnc/MulticastQueue.java Tue Jul 24 15:46:36 2012 +0900 +++ b/src/main/java/ac/ryukyu/treevnc/MulticastQueue.java Tue Aug 07 12:15:02 2012 +0900 @@ -24,7 +24,7 @@ return new Client(tail); } - static class Client + public static class Client { Node node; diff -r e7ce2b2ffed8 -r b32668b8e83c src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java --- a/src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java Tue Jul 24 15:46:36 2012 +0900 +++ b/src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java Tue Aug 07 12:15:02 2012 +0900 @@ -6,28 +6,50 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.BindException; import java.net.InetAddress; +import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; - +import ac.ryukyu.*; +import ac.ryukyu.test.*; +import ac.ryukyu.treevnc.MulticastQueue; import com.glavsoft.exceptions.TransportException; +import com.glavsoft.rfb.encoding.EncodingType; import com.glavsoft.transport.Reader; import com.glavsoft.viewer.ContainerManager; import com.glavsoft.viewer.swing.ParametersHandler.ConnectionParams; public class MyRfbProtoClient { + final static int FramebufferUpdate = 0; + final static int CheckDelay = 11; + final static String versionMsg_3_855 = "RFB 003.855\n"; + private static final int INFLATE_BUFSIZE = 1024*100; private Reader reader; private EchoClient echoValue = new EchoClient(); private String host,treenum,parent,pHost,leaderflag; - private int echoPort,port; + private int echoPort,port,acceptPort; Socket clientSocket,sock; DataInputStream is; OutputStream os; private ContainerManager containerManager; - + private ServerSocket servSock; + private byte initData[]; + private int clients = 0; + private MulticastQueue> multicastqueue = new MulticastQueue>(); + boolean proxyFlag = false; + int serverMajor, serverMinor; + int clientMajor, clientMinor; + private Inflater inflater = new Inflater(); + private Deflater deflater = new Deflater(); public MyRfbProtoClient(Reader reader,String host,String port) { this.reader = reader; @@ -95,4 +117,396 @@ return sock; } + public Socket accept() throws IOException { + return servSock.accept(); + } + + void initServSock(int port) throws IOException { + servSock = new ServerSocket(port); + acceptPort = port; + } + + public void selectPort(int p) { + int port = p; + while (true) { + try { + initServSock(port); + break; + } catch (BindException e) { + port++; + continue; + } catch (IOException e) { + + } + } + System.out.println("accept port = " + port); + } + + public void newClient(AcceptThread acceptThread, final Socket newCli, + final OutputStream os, final InputStream is) throws IOException { + // createBimgFlag = true; + // rfb.addSockTmp(newCli); + // addSock(newCli); + final int myId = clients; + final MulticastQueue.Client > c = multicastqueue.newClient(); + final AtomicInteger writerRunning = new AtomicInteger(); + writerRunning.set(1); + /** + * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory + * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running. + */ + final Runnable timer = new Runnable() { + public void run() { + int count = 0; + for(;;) { + long timeout = 40000/8; + try { + synchronized(this) { + int state,flag; + writerRunning.set(0); + wait(timeout); + flag = 0; + while((state=writerRunning.get())==0) { + c.poll(); // discard, should be timeout + count++; + if (flag==0) { + System.out.println("Discarding "+myId + " count="+ count); flag = 1; + } + wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow... + } + if (flag==1) System.out.println("Resuming "+myId + " count="+count); + if (state!=1) { + System.out.println("Client died "+myId); + break; + } + } + } catch (InterruptedException e) { + } + } + } + }; + new Thread(timer).start(); + /** + * discard all incoming from clients + */ + final Runnable reader = new Runnable() { + public void run() { + byte b[] = new byte[4096]; + for(;;) { + try { + int c = is.read(b); + if (c<=0) throw new IOException(); + // System.out.println("client read "+c); + } catch (IOException e) { + try { + writerRunning.set(2); + os.close(); + is.close(); + } catch (IOException e1) { + } + return; + } + } + } + }; + /** + * send packets to a client + */ + Runnable sender = new Runnable() { + public void run() { + writerRunning.set(1); + try { + /** + * initial connection of RFB protocol + */ + sendRfbVersion(os); +// readVersionMsg(is); + int rfbMinor = readVersionMsg(is,os); + sendSecurityType(os); + readSecType(is); + sendSecResult(os); + readClientInit(is); + sendInitData(os); + new Thread(reader).start(); // discard incoming packet here after. + if(rfbMinor == 855){ + //checkDilay(os); + // send jpeg data of full screen. + // sendFullScreen("jpeg" ,os); + } else { + // send raw data of full screen. + + } + for (;;) { + LinkedList bufs = c.poll(); + int inputIndex = 0; + ByteBuffer header = bufs.get(inputIndex); + if (header==null) continue; + if (header.get(0)==CheckDelay) { + System.out.println("--------------------"); + //writeToClient(os, bufs, inputIndex); + } + if (header.get(0)==FramebufferUpdate) { + // System.out.println("client "+ myId); + } + writeToClient(os, bufs, inputIndex); + writerRunning.set(1); // yes my client is awaking. + } + } catch (IOException e) { + try { + writerRunning.set(2); + os.close(); + } catch (IOException e1) { + /* if socket closed cliList.remove(newCli); */ + } + } + } + + public void writeToClient(final OutputStream os, + LinkedList bufs, int inputIndex) + throws IOException { + while(inputIndex < bufs.size()) { + ByteBuffer b = bufs.get(inputIndex++); + os.write(b.array(), b.position(), b.limit()); + } + os.flush(); + } + }; + clients++; + new Thread(sender).start(); + + } + + void sendRfbVersion(OutputStream os) throws IOException { + os.write(versionMsg_3_855.getBytes()); +// os.write(versionMsg_3_8.getBytes()); + } + int readVersionMsg(InputStream is, OutputStream os) throws IOException { + + byte[] b = new byte[12]; + + is.read(b); + + if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ') + || (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9') + || (b[6] < '0') || (b[6] > '9') || (b[7] != '.') + || (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9') + || (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) { + throw new IOException("Host " + host + " port " + port + + " is not an RFB server"); + } + + int rfbMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0'); + int rfbMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0'); + + if (rfbMajor < 3) { + throw new IOException( + "RFB server does not support protocol version 3"); + } + + if (rfbMinor == 855) { + sendProxyFlag(os); +// if(proxyFlag)sendPortNumber(os); + } + return rfbMinor; + + } void readVersionMsg(InputStream is) throws IOException { + + byte[] b = new byte[12]; + + is.read(b); + + if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ') + || (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9') + || (b[6] < '0') || (b[6] > '9') || (b[7] != '.') + || (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9') + || (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) { + throw new IOException("Host " + host + " port " + port + + " is not an RFB server"); + } + + serverMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0'); + serverMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0'); + + if (serverMajor < 3) { + throw new IOException( + "RFB server does not support protocol version 3"); + } + + } + + void sendSecurityType(OutputStream os) throws IOException { + // number-of-security-types + os.write(1); + // security-types + // 1:None + os.write(1); + } + void readSecType(InputStream is) throws IOException { + byte[] b = new byte[1]; + is.read(b); + + } + + void sendSecResult(OutputStream os) throws IOException { + byte[] b = castIntByte(0); + os.write(b); + } + + void readClientInit(InputStream in) throws IOException { + byte[] b = new byte[0]; + in.read(b); + } + + void sendInitData(OutputStream os) throws IOException { + os.write(initData); + } + + void sendProxyFlag(OutputStream os) throws IOException { + if(proxyFlag) os.write(1); + else os.write(0); + } + + byte[] castIntByte(int len) { + byte[] b = new byte[4]; + b[0] = (byte) ((len >>> 24) & 0xFF); + b[1] = (byte) ((len >>> 16) & 0xFF); + b[2] = (byte) ((len >>> 8) & 0xFF); + b[3] = (byte) ((len >>> 0) & 0xFF); + return b; + } + + /** + * gzip byte arrays + * @param deflater + * @param inputs + * byte data[] + * @param inputIndex + * @param outputs + * byte data[] + * @return byte length in last byte array + * @throws IOException + */ + public int zip(Deflater deflater,LinkedList inputs, int inputIndex, LinkedList outputs) throws IOException { + int len = 0; + ByteBuffer c1= ByteBuffer.allocate(INFLATE_BUFSIZE); + while(inputIndex < inputs.size() ) { + ByteBuffer b1 = inputs.get(inputIndex++); + deflater.setInput(b1.array(),b1.position(),b1.remaining()); + /** + * If we finish() stream and reset() it, Deflater start new gzip stream, this makes continuous zlib reader unhappy. + * if we remove finish(), Deflater.deflate() never flushes its output. The original zlib deflate has flush flag. I'm pretty + * sure this a kind of bug of Java library. + */ + if (inputIndex==inputs.size()) + deflater.finish(); + int len1 = 0; + do { + len1 = deflater.deflate(c1.array(),c1.position(),c1.remaining()); + if (len1>0) { + len += len1; + c1.position(c1.position()+len1); + if (c1.remaining()==0) { + c1.flip(); outputs.addLast(c1); + c1 = ByteBuffer.allocate(INFLATE_BUFSIZE); + } + } + } while (len1 >0 || !deflater.needsInput()); // &&!deflater.finished()); + } + if (c1.position()!=0) { + c1.flip(); outputs.addLast(c1); + } + deflater.reset(); + return len; + } + + + /** + * gunzip byte arrays + * @param inflater + * @param inputs + * byte data[] + * @param outputs + * byte data[] + *@return number of total bytes + * @throws IOException + */ + public int unzip(Inflater inflater, LinkedList inputs, + int inputIndex, LinkedList outputs, int bufSize) + throws DataFormatException { + int len = 0; + ByteBuffer buf = ByteBuffer.allocate(bufSize); + while (inputIndex < inputs.size()) { + ByteBuffer input = inputs.get(inputIndex++); + inflater.setInput(input.array(), input.position(), input.limit()); + // if (inputIndex==inputs.size()) if inflater/deflater has symmetry, + // we need this + // inflater.end(); but this won't work + do { + int len0 = inflater.inflate(buf.array(), buf.position(), + buf.remaining()); + if (len0 > 0) { + buf.position(buf.position() + len0); + len += len0; + if (buf.remaining() == 0) { + buf.flip(); + outputs.addLast(buf); + buf = ByteBuffer.allocate(bufSize); + } + } + } while (!inflater.needsInput()); + } + if (buf.position() != 0) { + buf.flip(); + outputs.addLast(buf); + } + return len; + } + + void readSendData(int dataLen,Reader reader) throws IOException, DataFormatException, TransportException { + LinkedListbufs = new LinkedList(); + ByteBuffer header = ByteBuffer.allocate(16); + reader.readBytes(header.array(),0,16); + header.limit(16); + if (header.get(0)==FramebufferUpdate) { + int encoding = header.getInt(12); + if (encoding==EncodingType.ZRLE.getId()||encoding==EncodingType.ZLIB.getId()) { // ZRLEE is already recompressed + ByteBuffer len = ByteBuffer.allocate(4); + reader.readBytes(len.array(),0,4); len.limit(4); + ByteBuffer inputData = ByteBuffer.allocate(dataLen-20); + reader.readBytes(inputData.array(),0,inputData.capacity()); inputData.limit(dataLen-20); + LinkedListinputs = new LinkedList(); + inputs.add(inputData); + + header.putInt(12, EncodingType.ZRLEE.getId()); // means recompress every time + // using new Deflecter every time is incompatible with the protocol, clients have to be modified. + Deflater nDeflater = deflater; // new Deflater(); + LinkedList out = new LinkedList(); + unzip(inflater, inputs, 0 , out, INFLATE_BUFSIZE); + // dump32(inputs); + int len2 = zip(nDeflater, out, 0, bufs); + ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip(); + bufs.addFirst(blen); + + bufs.addFirst(header); + multicastqueue.put(bufs); + is.reset(); + return ; + } + } + bufs.add(header); + if (dataLen>16) { + ByteBuffer b = ByteBuffer.allocate(dataLen-16); + reader.readBytes(b.array(),0,dataLen-16); b.limit(dataLen-16); + bufs.add(b); + } + multicastqueue.put(bufs); + is.reset(); + + // It may be compressed. We can inflate here to avoid repeating clients decompressing here, + // but it may generate too many large data. It is better to do it in each client. + // But we have do inflation for all input data, so we have to do it here. + } + + + }