diff src/main/java/jp/ac/u_ryukyu/treevnc/server/MyRfbProtoProxy.java @ 12:12c3a73be47f

rename package
author one
date Tue, 21 Aug 2012 14:24:38 +0900
parents src/main/java/ac/ryukyu/treevnc/server/MyRfbProtoProxy.java@57ae9fbb1245
children ff01665d26b4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/jp/ac/u_ryukyu/treevnc/server/MyRfbProtoProxy.java	Tue Aug 21 14:24:38 2012 +0900
@@ -0,0 +1,664 @@
+package jp.ac.u_ryukyu.treevnc.server;
+
+import static org.junit.Assert.*;
+
+import java.awt.Graphics;
+import java.awt.Image;
+import java.awt.image.BufferedImage;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.BindException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import javax.imageio.ImageIO;
+
+import org.junit.Test;
+
+
+//import myVncProxy.MulticastQueue.Client;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+import java.io.OutputStream;
+
+import jp.ac.u_ryukyu.treevnc.MulticastQueue;
+
+public class MyRfbProtoProxy {
+	final static String versionMsg_3_855 = "RFB 003.855\n";
+	/**
+	 * CheckMillis is one of new msgType for RFB 3.855.
+	 */
+	final static byte SpeedCheckMillis = 4;
+
+	// Secyrity type of OS X
+	final static int SecTypeReqAccess = 32;
+
+	// Supported authentication types
+	final static int AuthAccess = 32;
+
+	private static final int INFLATE_BUFSIZE = 1024 * 100;
+	boolean printStatusFlag = false;
+	long startCheckTime;
+	private int messageType;
+	private int rectangles;
+	private int rectX;
+	private int rectY;
+	private int rectW;
+	private int rectH;
+	private int encoding;
+	private int zLen;
+	private boolean clicomp = false;
+
+	private ServerSocket servSock;
+	protected int acceptPort;
+	// private byte initData[];
+	byte initData[];
+	private LinkedList<Socket> cliListTmp;
+	private LinkedList<Socket> cliList;
+	boolean createBimgFlag;
+	boolean proxyFlag = true;
+
+	ExecutorService executor;
+
+	byte[] pngBytes;
+
+	// private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new
+	// MostRecentMultiCast<LinkedList<ByteBuffer>>(10);
+	private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>();
+	private int clients = 0;
+	private Inflater inflater = new Inflater();
+	private Deflater deflater = new Deflater();
+	private CreateThread geth;
+	// private Thread requestThread;
+	private RequestScreenThread rThread;
+	private Thread requestThread;
+
+	public MyRfbProtoProxy() throws IOException {
+	}
+
+
+	void initServSock(int port) throws IOException {
+		servSock = new ServerSocket(port);
+		acceptPort = port;
+	}
+
+	void authenticationRequestAccess() throws IOException {
+
+		byte[] headBuf = new byte[2];
+		is.read(headBuf);
+		if (headBuf[1] == 2) {
+			byte[] b = new byte[258];
+			is.read(b);
+
+			byte[] outBuf = new byte[256];
+			os.write(outBuf);
+			os.flush();
+		} else if (headBuf[1] == 23) {
+			byte[] b = new byte[130];
+			is.read(b);
+			byte[] outBuf = new byte[192];
+			os.write(outBuf);
+			os.flush();
+		}
+
+		int result = readU32();
+		if (result != 0) {
+			System.out.println("faild authentication  ");
+			throw new IOException();
+		}
+
+	}
+
+	/*
+	 * default port number is 5999.
+	 */
+	public void selectPort(int p) {
+		if (servSock != null)
+			return;
+		int port = p;
+		while (true) {
+			try {
+				initServSock(port);
+				break;
+			} catch (BindException e) {
+				port++;
+				continue;
+			} catch (IOException e) {
+
+			}
+		}
+		System.out.println("accept port = " + port);
+	}
+
+	int getAcceptPort() {
+		return acceptPort;
+	}
+
+	void setSoTimeout(int num) throws IOException {
+		servSock.setSoTimeout(num);
+	}
+
+	public Socket accept() throws IOException {
+		return servSock.accept();
+	}
+
+	void addSock(Socket sock) {
+		cliList.add(sock);
+	}
+
+	void addSockTmp(Socket sock) {
+		System.out.println("connected " + sock.getInetAddress());
+		cliListTmp.add(sock);
+	}
+
+	boolean markSupported() {
+		return is.markSupported();
+	}
+
+	synchronized void changeStatusFlag() {
+		printStatusFlag = true;
+	}
+
+	void printMills() {
+		if (printStatusFlag) {
+
+			changeStatusFlag();
+		} else {
+			changeStatusFlag();
+		}
+	}
+
+
+	void requestThreadStart() {
+		requestThread.start();
+	}
+
+	public synchronized void requestThreadNotify() {
+		rThread.reStart();
+	}
+
+	/**
+	 * gzip byte arrays
+	 * 
+	 * @param deflater
+	 * @param inputs
+	 *            byte data[]
+	 * @param inputIndex
+	 * @param outputs
+	 *            byte data[]
+	 * @return byte length in last byte array
+	 * @throws IOException
+	 */
+	public int zip(Deflater deflater, LinkedList<ByteBuffer> inputs,
+			int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
+		int len = 0;
+		ByteBuffer c1 = ByteBuffer.allocate(INFLATE_BUFSIZE);
+		while (inputIndex < inputs.size()) {
+			ByteBuffer b1 = inputs.get(inputIndex++);
+			deflater.setInput(b1.array(), b1.position(), b1.remaining());
+			/**
+			 * If we finish() stream and reset() it, Deflater start new gzip
+			 * stream, this makes continuous zlib reader unhappy. if we remove
+			 * finish(), Deflater.deflate() never flushes its output. The
+			 * original zlib deflate has flush flag. I'm pretty sure this a kind
+			 * of bug of Java library.
+			 */
+			if (inputIndex == inputs.size())
+				deflater.finish();
+			int len1 = 0;
+			do {
+				len1 = deflater.deflate(c1.array(), c1.position(),
+						c1.remaining());
+				if (len1 > 0) {
+					len += len1;
+					c1.position(c1.position() + len1);
+					if (c1.remaining() == 0) {
+						c1.flip();
+						outputs.addLast(c1);
+						c1 = ByteBuffer.allocate(INFLATE_BUFSIZE);
+					}
+				}
+			} while (len1 > 0 || !deflater.needsInput()); // &&!deflater.finished());
+		}
+		if (c1.position() != 0) {
+			c1.flip();
+			outputs.addLast(c1);
+		}
+		deflater.reset();
+		return len;
+	}
+
+	/**
+	 * gunzip byte arrays
+	 * 
+	 * @param inflater
+	 * @param inputs
+	 *            byte data[]
+	 * @param outputs
+	 *            byte data[]
+	 * @return number of total bytes
+	 * @throws IOException
+	 */
+	public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs,
+			int inputIndex, LinkedList<ByteBuffer> outputs, int bufSize)
+			throws DataFormatException {
+		int len = 0;
+		ByteBuffer buf = ByteBuffer.allocate(bufSize);
+		while (inputIndex < inputs.size()) {
+			ByteBuffer input = inputs.get(inputIndex++);
+			inflater.setInput(input.array(), input.position(), input.limit());
+			// if (inputIndex==inputs.size()) if inflater/deflater has symmetry,
+			// we need this
+			// inflater.end(); but this won't work
+			do {
+				int len0 = inflater.inflate(buf.array(), buf.position(),
+						buf.remaining());
+				if (len0 > 0) {
+					buf.position(buf.position() + len0);
+					len += len0;
+					if (buf.remaining() == 0) {
+						buf.flip();
+						outputs.addLast(buf);
+						buf = ByteBuffer.allocate(bufSize);
+					}
+				}
+			} while (!inflater.needsInput());
+		}
+		if (buf.position() != 0) {
+			buf.flip();
+			outputs.addLast(buf);
+		}
+		return len;
+	}
+
+	float maxMag = 1;
+
+	/**
+	 * send data to clients
+	 * 
+	 * @param dataLen
+	 * @throws IOException
+	 * @throws DataFormatException
+	 * 
+	 *             Zlibed packet is compressed in context dependent way, that
+	 *             is, it have to send from the beginning. But this is
+	 *             impossible. So we have to compress it again for each clients.
+	 *             Separate deflater for each clients is necessary.
+	 * 
+	 *             Java's deflater does not support flush. This means to get the
+	 *             result, we have to finish the compression. Reseting start new
+	 *             compression, but it is not accepted well in zlib continuous
+	 *             reading. So we need new Encoding ZRLEE which reset decoder
+	 *             for each packet. ZRLEE can be invisible from user, but it
+	 *             have to be implemented in the clients. ZRLEE compression is
+	 *             not context dependent, so no recompression is necessary.
+	 */
+
+	void readSendData(int dataLen) throws IOException, DataFormatException {
+		LinkedList<ByteBuffer> bufs = new LinkedList<ByteBuffer>();
+		ByteBuffer header = ByteBuffer.allocate(16);
+		readFully(header.array(), 0, 16);
+		header.limit(16);
+		if (header.get(0) == RfbProto.FramebufferUpdate) {
+			int encoding = header.getInt(12);
+			if (encoding == RfbProto.EncodingZRLE
+					|| encoding == RfbProto.EncodingZlib) { // ZRLEE is already
+															// recompressed
+				ByteBuffer len = ByteBuffer.allocate(4);
+				readFully(len.array(), 0, 4);
+				len.limit(4);
+				ByteBuffer inputData = ByteBuffer.allocate(dataLen - 20);
+
+				startTiming();
+				readFully(inputData.array(), 0, inputData.capacity());
+//				System.out.println(dataLen);
+				inputData.limit(dataLen - 20);
+				stopTiming();
+
+				LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
+				inputs.add(inputData);
+
+				header.putInt(12, RfbProto.EncodingZRLEE); // means recompress
+															// every time
+				// using new Deflecter every time is incompatible with the
+				// protocol, clients have to be modified.
+				Deflater nDeflater = deflater; // new Deflater();
+				LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
+				unzip(inflater, inputs, 0, out, INFLATE_BUFSIZE);
+				// dump32(inputs);
+				int len2 = zip(nDeflater, out, 0, bufs);
+				ByteBuffer blen = ByteBuffer.allocate(4);
+				blen.putInt(len2);
+				blen.flip();
+				bufs.addFirst(blen);
+
+				bufs.addFirst(header);
+			//	if(dataLen<=64000)
+					multicastqueue.put(bufs);
+				// is.reset();
+
+				/*
+				 * System.out.println("ZRLE = "+dataLen);
+				 * System.out.println("ZRLEE = "+(len2+20)); float mag =
+				 * (float)dataLen / (float)(len2 + 20);
+				 * System.out.println("ZRLE / ZRLEE = "+ mag); if(mag > maxMag)
+				 * maxMag = mag; System.out.println("maxMag = "+maxMag);
+				 */
+				return;
+			}
+			bufs.add(header);
+			if (dataLen > 16) {
+				ByteBuffer b = ByteBuffer.allocate(dataLen - 16);
+				startTiming();
+				readFully(b.array(), 0, dataLen - 16);
+				b.limit(dataLen - 16);
+				stopTiming();
+				bufs.add(b);
+			}
+			multicastqueue.put(bufs);
+			// is.reset();
+			return;
+		}
+		is.reset();
+
+		// It may be compressed. We can inflate here to avoid repeating clients
+		// decompressing here,
+		// but it may generate too many large data. It is better to do it in
+		// each client.
+		// But we have do inflation for all input data, so we have to do it
+		// here.
+	}
+
+	public void newClient(AcceptThread acceptThread, final Socket newCli,
+			final OutputStream os, final InputStream is) throws IOException {
+		// createBimgFlag = true;
+		// rfb.addSockTmp(newCli);
+		// addSock(newCli);
+		final int myId = clients;
+		final MulticastQueue.Client<LinkedList<ByteBuffer>> c = multicastqueue
+				.newClient();
+		final AtomicInteger writerRunning = new AtomicInteger();
+		writerRunning.set(1);
+		/**
+		 * Timeout thread. If a client is suspended, it has top of queue
+		 * indefinitely, which caused memory overflow. After the timeout, we
+		 * poll the queue and discard it. Start long wait if writer is running.
+		 */
+		final Runnable timer = new Runnable() {
+			public void run() {
+				int count = 0;
+				for (;;) {
+					long timeout = 50000 / 8;
+					try {
+						synchronized (this) {
+							int state, flag;
+							writerRunning.set(0);
+							wait(timeout);
+							flag = 0;
+							while ((state = writerRunning.get()) == 0) {
+								c.poll(); // discard, should be timeout
+								count++;
+								if (flag == 0) {
+									System.out.println("Discarding " + myId
+											+ " count=" + count);
+									flag = 1;
+								}
+								wait(10); // if this is too short, writer cannot
+											// take the poll, if this is too
+											// long, memory will overflow...
+							}
+							if (flag == 1)
+								System.out.println("Resuming " + myId
+										+ " count=" + count);
+							if (state != 1) {
+								System.out.println("Client died " + myId);
+								break;
+							}
+						}
+					} catch (InterruptedException e) {
+					}
+				}
+			}
+		};
+		new Thread(timer).start();
+		/**
+		 * discard all incoming from clients
+		 */
+		final Runnable reader = new Runnable() {
+			public void run() {
+				byte b[] = new byte[4096];
+				for (;;) {
+					try {
+						int c = is.read(b);
+						if (c <= 0)
+							throw new IOException();
+						// System.out.println("client read "+c);
+					} catch (IOException e) {
+						try {
+							writerRunning.set(2);
+							os.close();
+							is.close();
+						} catch (IOException e1) {
+						}
+						return;
+					}
+				}
+			}
+		};
+		/**
+		 * send packets to a client
+		 */
+		Runnable sender = new Runnable() {
+			public void run() {
+				writerRunning.set(1);
+				try {
+					requestThreadNotify();
+					// rThread.checkDelay();
+
+					/**
+					 * initial connection of RFB protocol
+					 */
+					sendRfbVersion(os);
+					// readVersionMsg(is);
+					int rfbMinor = readVersionMsg(is, os);
+					sendSecurityType(os);
+					readSecType(is);
+					sendSecResult(os);
+					readClientInit(is);
+					sendInitData(os);
+					new Thread(reader).start(); // discard incoming packet here
+												// after.
+					// writeFramebufferUpdateRequest(0,0, framebufferWidth,
+					// framebufferHeight, false );
+					int i  = 0;
+					for (;;) {
+						LinkedList<ByteBuffer> bufs = c.poll();
+						int inputIndex = 0;
+						ByteBuffer header = bufs.get(inputIndex);
+						if (header == null)
+							continue;
+						else if (header.get(0) == RfbProto.CheckDelay) {
+							writeToClient(os, bufs, inputIndex);
+							continue;
+						} else if (header.get(0) == RfbProto.FramebufferUpdate) {
+							// System.out.println("client "+ myId);
+						}
+						/*
+						if(i%20==0){
+							sendDataCheckDelay();
+						}
+						i++;
+						*/
+						writeToClient(os, bufs, inputIndex);
+						writerRunning.set(1); // yes my client is awaking.
+					}
+				} catch (IOException e) {
+					try {
+						writerRunning.set(2);
+						os.close();
+					} catch (IOException e1) {
+					}
+					/* if socket closed cliList.remove(newCli); */
+				}
+			}
+
+			public void writeToClient(final OutputStream os,
+					LinkedList<ByteBuffer> bufs, int inputIndex)
+					throws IOException {
+				while (inputIndex < bufs.size()) {
+					ByteBuffer b = bufs.get(inputIndex++);
+					os.write(b.array(), b.position(), b.limit());
+				}
+				os.flush();
+			}
+		};
+		clients++;
+		new Thread(sender).start();
+
+	}
+
+	public void dump32(LinkedList<ByteBuffer> bufs) {
+		int len = 0;
+		for (ByteBuffer b : bufs)
+			len += b.remaining();
+		ByteBuffer top = bufs.getFirst();
+		ByteBuffer end = bufs.getLast();
+		System.err.println("length: " + len);
+		System.err.print("head 0: ");
+		for (int i = 0; i < 16 && i < top.remaining(); i++) {
+			System.err.print(" " + top.get(i));
+		}
+		System.err.print("tail 0: ");
+		for (int i = 0; i < 16 && i < end.remaining(); i++) {
+			System.err.print(" " + end.get(i));
+		}
+		System.err.println();
+	}
+
+	@Test
+	public void test1() {
+		try {
+			LinkedList<ByteBuffer> in = new LinkedList<ByteBuffer>();
+			LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
+			LinkedList<ByteBuffer> out2 = new LinkedList<ByteBuffer>();
+			// if (false) {
+			// for(int i=0;i<10;i++) {
+			// in.add(ByteBuffer.wrap("test1".getBytes()));
+			// in.add(ByteBuffer.wrap("test2".getBytes()));
+			// in.add(ByteBuffer.wrap("test3".getBytes()));
+			// in.add(ByteBuffer.wrap("test44".getBytes()));
+			// }
+			// } else
+			{
+				String t = "";
+				for (int i = 0; i < 10; i++) {
+					t += "test1";
+					t += "test2";
+					t += "test3";
+					t += "test44";
+				}
+				in.add(ByteBuffer.wrap(t.getBytes()));
+			}
+
+			LinkedList<ByteBuffer> in1 = clone(in);
+
+			Deflater deflater = new Deflater();
+			zip(deflater, in, 0, out);
+			// LinkedList<ByteBuffer> out3 = clone(out); zipped result is depend
+			// on deflator's state
+			unzip(inflater, out, 0, out2, INFLATE_BUFSIZE);
+			// inflater.reset();
+			equalByteBuffers(in1, out2);
+			LinkedList<ByteBuffer> out4 = new LinkedList<ByteBuffer>();
+			deflater = new Deflater();
+			zip(deflater, out2, 0, out4);
+			LinkedList<ByteBuffer> out5 = new LinkedList<ByteBuffer>();
+			unzip(inflater, out4, 0, out5, INFLATE_BUFSIZE);
+			int len = equalByteBuffers(in1, out5);
+
+			System.out.println("Test Ok. " + len);
+		} catch (Exception e) {
+			assertEquals(0, 1);
+		}
+	}
+
+	private LinkedList<ByteBuffer> clone(LinkedList<ByteBuffer> in) {
+		LinkedList<ByteBuffer> copy = new LinkedList<ByteBuffer>();
+		for (ByteBuffer b : in) {
+			ByteBuffer c = b.duplicate();
+			copy.add(c);
+		}
+		return copy;
+	}
+
+	public int equalByteBuffers(LinkedList<ByteBuffer> in,
+			LinkedList<ByteBuffer> out2) {
+		int len = 0;
+		Iterable<Byte> i = byteBufferIterator(in);
+		Iterator<Byte> o = byteBufferIterator(out2).iterator();
+
+		for (int b : i) {
+			len++;
+			if (o.hasNext()) {
+				int c = o.next();
+				assertEquals(b, c);
+			} else
+				assertEquals(0, 1);
+		}
+		if (o.hasNext())
+			assertEquals(0, 1);
+		// System.out.println();
+		return len;
+	}
+
+	private Iterable<Byte> byteBufferIterator(final LinkedList<ByteBuffer> in) {
+		return new Iterable<Byte>() {
+			public Iterator<Byte> iterator() {
+				return new Iterator<Byte>() {
+					int bytes = 0;
+					int buffers = 0;
+
+					public boolean hasNext() {
+						for (;;) {
+							if (buffers >= in.size())
+								return false;
+							ByteBuffer b = in.get(buffers);
+							if (!(bytes < b.remaining())) {
+								buffers++;
+								bytes = 0;
+							} else
+								return true;
+						}
+					}
+
+					public Byte next() {
+						ByteBuffer bf = in.get(buffers);
+						byte b = bf.get(bytes++);
+						if (bf.remaining() <= bytes) {
+							buffers++;
+							bytes = 0;
+						}
+						// System.out.print(b);
+						return b;
+					}
+
+					public void remove() {
+					}
+				};
+			}
+		};
+	}
+
+}