changeset 4:b32668b8e83c

create multicast function
author one
date Tue, 07 Aug 2012 12:15:02 +0900
parents e7ce2b2ffed8
children f3ad6c74c591
files src/main/java/ac/ryukyu/treevnc/MulticastQueue.java src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java
diffstat 2 files changed, 418 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/ac/ryukyu/treevnc/MulticastQueue.java	Tue Jul 24 15:46:36 2012 +0900
+++ b/src/main/java/ac/ryukyu/treevnc/MulticastQueue.java	Tue Aug 07 12:15:02 2012 +0900
@@ -24,7 +24,7 @@
 		return new Client<T>(tail);
 	}
 	
-	static class Client<T>
+	public static class Client<T>
 	{
 		Node<T> node;
 		
--- a/src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java	Tue Jul 24 15:46:36 2012 +0900
+++ b/src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java	Tue Aug 07 12:15:02 2012 +0900
@@ -6,28 +6,50 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.BindException;
 import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
 
-
+import ac.ryukyu.*;
+import ac.ryukyu.test.*;
+import ac.ryukyu.treevnc.MulticastQueue;
 
 import com.glavsoft.exceptions.TransportException;
+import com.glavsoft.rfb.encoding.EncodingType;
 import com.glavsoft.transport.Reader;
 import com.glavsoft.viewer.ContainerManager;
 import com.glavsoft.viewer.swing.ParametersHandler.ConnectionParams;
 
 public class MyRfbProtoClient {
+	final static int FramebufferUpdate = 0;
+	final static int CheckDelay = 11;
+	final static String versionMsg_3_855 = "RFB 003.855\n";
+	private static final int INFLATE_BUFSIZE = 1024*100;
 	private Reader reader;
 	private EchoClient echoValue = new EchoClient();
 	private String host,treenum,parent,pHost,leaderflag;
-	private int echoPort,port;
+	private int echoPort,port,acceptPort;
 	Socket clientSocket,sock;
 	DataInputStream is;
 	OutputStream os;
 	private ContainerManager containerManager;
-
+	private ServerSocket servSock;
+	private byte initData[];
+	private int clients = 0;
+	private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>();
+	boolean proxyFlag = false;
+	int serverMajor, serverMinor;
+	int clientMajor, clientMinor;
 	
+	private Inflater inflater = new Inflater();
+	private Deflater deflater = new Deflater();
 	
 	public MyRfbProtoClient(Reader reader,String host,String port) {
 		this.reader = reader;
@@ -95,4 +117,396 @@
 		return sock;
 	}
 	
+	public Socket accept() throws IOException {
+		return servSock.accept();
+	}
+	
+	void initServSock(int port) throws IOException {
+		servSock = new ServerSocket(port);
+		acceptPort = port;
+	}
+	
+	public void selectPort(int p) {
+		int port = p;
+		while (true) {
+			try {
+				initServSock(port);
+				break;
+			} catch (BindException e) {
+				port++;
+				continue;
+			} catch (IOException e) {
+
+			}
+		}
+		System.out.println("accept port = " + port);
+	}
+	
+	public void newClient(AcceptThread acceptThread, final Socket newCli,
+			final OutputStream os, final InputStream is) throws IOException {
+		// createBimgFlag = true;
+		// rfb.addSockTmp(newCli);
+		//		addSock(newCli);
+		final int myId = clients; 
+		final MulticastQueue.Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient();
+		final AtomicInteger writerRunning = new AtomicInteger();
+		writerRunning.set(1);
+		/**
+		 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory
+		 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running.
+		 */
+		final Runnable timer = new Runnable() {
+			public void run() {
+				int count = 0;
+				for(;;) {
+					long timeout = 40000/8;
+					try {
+						synchronized(this) {
+							int state,flag;
+							writerRunning.set(0);
+							wait(timeout);
+							flag = 0;
+							while((state=writerRunning.get())==0) {
+								c.poll(); // discard, should be timeout
+								count++;
+								if (flag==0) {
+									System.out.println("Discarding "+myId + " count="+ count); flag = 1;
+								}
+								wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow...
+							}
+							if (flag==1) System.out.println("Resuming "+myId + " count="+count);
+							if (state!=1) {
+								System.out.println("Client died "+myId);
+								break;
+							}
+						}
+					} catch (InterruptedException e) {
+					}
+				}
+			}
+		};
+		new Thread(timer).start();
+		/**
+		 * discard all incoming from clients
+		 */
+		final Runnable reader = new Runnable() {
+			public void run() {
+				byte b[] = new byte[4096];
+				for(;;) {
+					try {
+						int c = is.read(b);
+						if (c<=0) throw new IOException();
+						// System.out.println("client read "+c);
+					} catch (IOException e) {
+						try {
+							writerRunning.set(2);
+							os.close();
+							is.close();
+						} catch (IOException e1) {
+						}
+						return;
+					}
+				}
+			}
+		};
+		/**
+		 * send packets to a client
+		 */
+		Runnable sender = new Runnable() {
+			public void run() {
+				writerRunning.set(1);
+				try {
+					/**
+					 *  initial connection of RFB protocol
+					 */
+					sendRfbVersion(os);
+//					readVersionMsg(is);
+					int rfbMinor = readVersionMsg(is,os);
+					sendSecurityType(os);
+					readSecType(is);
+					sendSecResult(os);
+					readClientInit(is);
+					sendInitData(os);
+					new Thread(reader).start(); // discard incoming packet here after.
+					if(rfbMinor == 855){
+						//checkDilay(os);
+						// send jpeg data of full screen.
+						// sendFullScreen("jpeg" ,os);
+					} else {
+						// send raw data of full screen.
+						
+					}
+					for (;;) {
+						LinkedList<ByteBuffer> bufs = c.poll();
+						int inputIndex = 0;
+						ByteBuffer header = bufs.get(inputIndex);
+						if (header==null) continue;
+						if (header.get(0)==CheckDelay) {
+							System.out.println("--------------------");
+							//writeToClient(os, bufs, inputIndex);
+						}
+						if (header.get(0)==FramebufferUpdate) {
+							// System.out.println("client "+ myId);
+						}
+						writeToClient(os, bufs, inputIndex);
+						writerRunning.set(1);  // yes my client is awaking.
+					}
+				} catch (IOException e) {
+					try {
+						writerRunning.set(2);
+						os.close();
+					} catch (IOException e1) {
+						/* if socket closed 	cliList.remove(newCli); */
+					}
+				}
+			}
+
+			public void writeToClient(final OutputStream os,
+					LinkedList<ByteBuffer> bufs, int inputIndex)
+					throws IOException {
+				while(inputIndex < bufs.size()) {
+					ByteBuffer b = bufs.get(inputIndex++);
+					os.write(b.array(), b.position(), b.limit());
+				}
+				os.flush();
+			}
+		};
+		clients++;
+		new Thread(sender).start();
+
+	}
+	
+	void sendRfbVersion(OutputStream os) throws IOException {
+		os.write(versionMsg_3_855.getBytes());
+//		os.write(versionMsg_3_8.getBytes());
+	}
+	int readVersionMsg(InputStream is, OutputStream os) throws IOException {
+
+		byte[] b = new byte[12];
+
+		is.read(b);
+
+		if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
+				|| (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
+				|| (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
+				|| (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
+				|| (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
+			throw new IOException("Host " + host + " port " + port
+					+ " is not an RFB server");
+		}
+
+		int rfbMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
+		int rfbMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');
+
+		if (rfbMajor < 3) {
+			throw new IOException(
+			"RFB server does not support protocol version 3");
+		}
+
+		if (rfbMinor == 855) {
+			sendProxyFlag(os);
+//			if(proxyFlag)sendPortNumber(os);
+		}
+		return rfbMinor;
+		
+	}	void readVersionMsg(InputStream is) throws IOException {
+
+		byte[] b = new byte[12];
+
+		is.read(b);
+
+		if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
+				|| (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
+				|| (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
+				|| (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
+				|| (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
+			throw new IOException("Host " + host + " port " + port
+					+ " is not an RFB server");
+		}
+
+		serverMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
+		serverMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');
+
+		if (serverMajor < 3) {
+			throw new IOException(
+					"RFB server does not support protocol version 3");
+		}
+
+	}
+	
+	void sendSecurityType(OutputStream os) throws IOException {
+		// number-of-security-types
+		os.write(1);
+		// security-types
+		// 1:None
+		os.write(1);
+	}
+	void readSecType(InputStream is) throws IOException {
+		byte[] b = new byte[1];
+		is.read(b);
+
+	}
+	
+	void sendSecResult(OutputStream os) throws IOException {
+		byte[] b = castIntByte(0);
+		os.write(b);
+	}
+	
+	void readClientInit(InputStream in) throws IOException {
+		byte[] b = new byte[0];
+		in.read(b);
+	}
+	
+	void sendInitData(OutputStream os) throws IOException {
+		os.write(initData);
+	}
+	
+	void sendProxyFlag(OutputStream os) throws IOException {
+		if(proxyFlag) os.write(1);
+		else os.write(0);
+	}
+	
+	byte[] castIntByte(int len) {
+		byte[] b = new byte[4];
+		b[0] = (byte) ((len >>> 24) & 0xFF);
+		b[1] = (byte) ((len >>> 16) & 0xFF);
+		b[2] = (byte) ((len >>> 8) & 0xFF);
+		b[3] = (byte) ((len >>> 0) & 0xFF);
+		return b;
+	}
+	
+	/**
+	 * gzip byte arrays
+	 * @param deflater
+	 * @param inputs
+	 *            byte data[]
+	 * @param inputIndex 
+	 * @param outputs
+	 *            byte data[]
+	 * @return  byte length in last byte array
+	 * @throws IOException
+	 */
+	public int zip(Deflater deflater,LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
+		int len = 0;
+		ByteBuffer c1= ByteBuffer.allocate(INFLATE_BUFSIZE);
+		while(inputIndex < inputs.size() ) {
+			ByteBuffer b1 = inputs.get(inputIndex++);
+			deflater.setInput(b1.array(),b1.position(),b1.remaining());
+			/**
+			 * If we finish() stream and reset() it, Deflater start new gzip stream, this makes continuous zlib reader unhappy.
+			 * if we remove finish(), Deflater.deflate() never flushes its output. The original zlib deflate has flush flag. I'm pretty
+			 * sure this a kind of bug of Java library.
+			 */
+			if (inputIndex==inputs.size())	
+				deflater.finish();
+			int len1 = 0;
+			do {
+				len1 = deflater.deflate(c1.array(),c1.position(),c1.remaining());
+				if (len1>0) {
+					len += len1;
+					c1.position(c1.position()+len1); 
+					if (c1.remaining()==0) {
+						c1.flip();	outputs.addLast(c1);
+						c1 = ByteBuffer.allocate(INFLATE_BUFSIZE);
+					}
+				}
+			} while (len1 >0 || !deflater.needsInput()); // &&!deflater.finished());
+		}
+		if (c1.position()!=0) {
+			c1.flip();	outputs.addLast(c1);
+		}
+		deflater.reset();
+		return len;
+	}
+	
+	
+	/**
+	 * gunzip byte arrays
+	 * @param inflater
+	 * @param inputs
+	 *            byte data[]
+	 * @param outputs
+	 *            byte data[]
+	 *@return  number of total bytes            
+	 * @throws IOException
+	 */
+	public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs,
+			int inputIndex, LinkedList<ByteBuffer> outputs, int bufSize)
+			throws DataFormatException {
+		int len = 0;
+		ByteBuffer buf = ByteBuffer.allocate(bufSize);
+		while (inputIndex < inputs.size()) {
+			ByteBuffer input = inputs.get(inputIndex++);
+			inflater.setInput(input.array(), input.position(), input.limit());
+			// if (inputIndex==inputs.size()) if inflater/deflater has symmetry,
+			// we need this
+			// inflater.end(); but this won't work
+			do {
+				int len0 = inflater.inflate(buf.array(), buf.position(),
+						buf.remaining());
+				if (len0 > 0) {
+					buf.position(buf.position() + len0);
+					len += len0;
+					if (buf.remaining() == 0) {
+						buf.flip();
+						outputs.addLast(buf);
+						buf = ByteBuffer.allocate(bufSize);
+					}
+				}
+			} while (!inflater.needsInput());
+		}
+		if (buf.position() != 0) {
+			buf.flip();
+			outputs.addLast(buf);
+		}
+		return len;
+	}
+	
+	void readSendData(int dataLen,Reader reader) throws IOException, DataFormatException, TransportException {
+		LinkedList<ByteBuffer>bufs = new LinkedList<ByteBuffer>();
+		ByteBuffer header = ByteBuffer.allocate(16);
+		reader.readBytes(header.array(),0,16); 
+		header.limit(16);
+		if (header.get(0)==FramebufferUpdate) {
+			int encoding = header.getInt(12);
+			if (encoding==EncodingType.ZRLE.getId()||encoding==EncodingType.ZLIB.getId()) { // ZRLEE is already recompressed
+				ByteBuffer len = ByteBuffer.allocate(4);
+				reader.readBytes(len.array(),0,4); len.limit(4);
+				ByteBuffer inputData = ByteBuffer.allocate(dataLen-20);
+				reader.readBytes(inputData.array(),0,inputData.capacity()); inputData.limit(dataLen-20);
+				LinkedList<ByteBuffer>inputs = new LinkedList<ByteBuffer>();
+				inputs.add(inputData);
+
+				header.putInt(12, EncodingType.ZRLEE.getId()); // means recompress every time
+				// using new Deflecter every time is incompatible with the protocol, clients have to be modified.
+				Deflater nDeflater = deflater; // new Deflater();
+				LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
+				unzip(inflater, inputs, 0 , out, INFLATE_BUFSIZE);
+				// dump32(inputs);
+				int len2 = zip(nDeflater, out, 0, bufs);
+				ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip();
+				bufs.addFirst(blen);
+
+				bufs.addFirst(header);
+				multicastqueue.put(bufs);
+				is.reset();
+				return ;
+			}
+		} 
+		bufs.add(header);
+		if (dataLen>16) {
+			ByteBuffer b = ByteBuffer.allocate(dataLen-16);
+			reader.readBytes(b.array(),0,dataLen-16); b.limit(dataLen-16);
+			bufs.add(b);
+		}
+		multicastqueue.put(bufs);
+		is.reset();
+
+		// It may be compressed. We can inflate here to avoid repeating clients decompressing here,
+		// but it may generate too many large data. It is better to do it in each client.
+		// But we have do inflation for all input data, so we have to do it here.
+	}
+
+
+	
 }