diff src/main/java/jp/ac/u_ryukyu/treevnc/test/MyRfbProto.java @ 28:18fad65bc447

Create MyRfbProto.java MyRfbProto.java is a summary of the intersection of MyRfbClient and MyRfbProxy.
author one
date Sat, 01 Sep 2012 19:24:07 +0900
parents
children 57eb5575e6c4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/jp/ac/u_ryukyu/treevnc/test/MyRfbProto.java	Sat Sep 01 19:24:07 2012 +0900
@@ -0,0 +1,276 @@
+package jp.ac.u_ryukyu.treevnc.test;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import jp.ac.u_ryukyu.treevnc.MulticastQueue;
+import jp.ac.u_ryukyu.treevnc.server.AcceptThread;
+import jp.ac.u_ryukyu.treevnc.server.RequestScreenThread;
+
+import com.glavsoft.exceptions.TransportException;
+import com.glavsoft.rfb.protocol.Protocol;
+import com.glavsoft.rfb.protocol.ProtocolContext;
+import com.glavsoft.transport.Reader;
+import com.glavsoft.transport.Writer;
+
+public class MyRfbProto {
+	final static int CheckDelay = 11;
+	final static int FramebufferUpdate = 0;
+	private ProtocolContext context;
+	final static String versionMsg_3_855 = "RFB 003.855\n";
+	private int clients;
+	private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>();
+	private RequestScreenThread rThread;
+	private boolean proxyFlag = true;
+
+	
+	public void newClient(AcceptThread acceptThread, final Socket newCli,
+			final Writer os, final Reader is) throws IOException {
+		// createBimgFlag = true;
+		// rfb.addSockTmp(newCli);
+		// addSock(newCli);
+		final int myId = clients;
+		final MulticastQueue.Client<LinkedList<ByteBuffer>> c = multicastqueue.newClient();
+		final AtomicInteger writerRunning = new AtomicInteger();
+		writerRunning.set(1);
+		/**
+		 * Timeout thread. If a client is suspended, it has top of queue
+		 * indefinitely, which caused memory overflow. After the timeout, we
+		 * poll the queue and discard it. Start long wait if writer is running.
+		 */
+		final Runnable timer = new Runnable() {
+			public void run() {
+				int count = 0;
+				for (;;) {
+					long timeout = 50000 / 8;
+					try {
+						synchronized (this) {
+							int state, flag;
+							writerRunning.set(0);
+							wait(timeout);
+							flag = 0;
+							while ((state = writerRunning.get()) == 0) {
+								c.poll(); // discard, should be timeout
+								count++;
+								if (flag == 0) {
+									System.out.println("Discarding " + myId
+											+ " count=" + count);
+									flag = 1;
+								}
+								wait(10); // if this is too short, writer cannot
+											// take the poll, if this is too
+											// long, memory will overflow...
+							}
+							if (flag == 1)
+								System.out.println("Resuming " + myId
+										+ " count=" + count);
+							if (state != 1) {
+								System.out.println("Client died " + myId);
+								break;
+							}
+						}
+					} catch (InterruptedException e) {
+					}
+				}
+			}
+		};
+		new Thread(timer).start();
+		/**
+		 * discard all incoming from clients
+		 */
+		final Runnable reader = new Runnable() {
+			public void run() {
+				byte b[] = new byte[4096];
+				for (;;) {
+					try {
+						int c = is.readByte(b);
+						if (c <= 0)
+							throw new IOException();
+						// System.out.println("client read "+c);
+					} catch (IOException e) {
+						try {
+							writerRunning.set(2);
+							os.close();
+							is.close();
+						} catch (IOException e1) {
+						} catch (TransportException e1) {
+							e1.printStackTrace();
+						}
+						return;
+					} catch (TransportException e) {
+						e.printStackTrace();
+					}
+				}
+			}
+		};
+		/**
+		 * send packets to a client
+		 */
+		Runnable sender = new Runnable() {
+			public void run() {
+				writerRunning.set(1);
+				try {
+					requestThreadNotify();
+					// rThread.checkDelay();
+
+					/**
+					 * initial connection of RFB protocol
+					 */
+					sendRfbVersion(os);
+					// readVersionMsg(is);
+					readVersionMsg(is, os);
+					sendSecurityType(os);
+					readSecType(is);
+					sendSecResult(os);
+					readClientInit(is);
+					sendInitData(os);
+					new Thread(reader).start(); // discard incoming packet here
+												// after.
+					// writeFramebufferUpdateRequest(0,0, framebufferWidth,
+					// framebufferHeight, false );
+					for (;;) {
+						LinkedList<ByteBuffer> bufs = c.poll();
+						int inputIndex = 0;
+						ByteBuffer header = bufs.get(inputIndex);
+						if (header == null)
+							continue;
+						else if (header.get(0) == CheckDelay) {
+							writeToClient(os, bufs, inputIndex);
+							continue;
+						} else if (header.get(0) == FramebufferUpdate) {
+							// System.out.println("client "+ myId);
+						}
+						/*
+						 * if(i%20==0){ sendDataCheckDelay(); } i++;
+						 */
+						writeToClient(os, bufs, inputIndex);
+						writerRunning.set(1); // yes my client is awaking.
+					}
+				} catch (IOException e) {
+					try {
+						writerRunning.set(2);
+						os.close();
+					} catch (IOException e1) {
+					}
+					/* if socket closed cliList.remove(newCli); */
+				} catch (TransportException e) {
+					e.printStackTrace();
+				}
+			}
+
+			public void writeToClient(final Writer os,
+					LinkedList<ByteBuffer> bufs, int inputIndex)
+					throws TransportException {
+				while (inputIndex < bufs.size()) {
+					ByteBuffer b = bufs.get(inputIndex++);
+					os.write(b.array(), b.position(), b.limit());
+				}
+				os.flush();
+			}
+		};
+		clients++;
+		new Thread(sender).start();
+
+	}
+	
+	public synchronized void requestThreadNotify() {
+		rThread.reStart();
+	}
+	
+	private void sendRfbVersion(Writer writer) throws IOException, TransportException {
+		// os.write(versionMsg_3_8.getBytes());
+		writer.write(versionMsg_3_855.getBytes());
+	}
+	
+	private int readVersionMsg(Reader reader, Writer writer) throws IOException, TransportException {
+
+		byte[] b = new byte[12];
+
+		reader.readBytes(b);
+
+		if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
+				|| (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
+				|| (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
+				|| (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
+				|| (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
+			throw new IOException("this is not an RFB server");
+		}
+
+		int rfbMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
+		int rfbMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');
+
+		if (rfbMajor < 3) {
+			throw new IOException(
+					"RFB server does not support protocol version 3");
+		}
+
+		if (rfbMinor == 855) {
+			sendProxyFlag(writer);
+			if (proxyFlag)
+				sendPortNumber(writer);
+		}
+		return rfbMinor;
+	}
+	
+	private void sendProxyFlag(Writer writer) throws TransportException {
+		if (proxyFlag)
+			writer.writeInt(1);
+		else
+			writer.writeInt(0);
+	}
+
+	private void sendPortNumber(Writer writer) throws TransportException {
+		byte[] b = new byte[4];
+		//b = castIntByte(getHost.getPort());
+		b = castIntByte(9999);
+		writer.write(b);
+	}
+	
+	private byte[] castIntByte(int len) {
+		byte[] b = new byte[4];
+		b[0] = (byte) ((len >>> 24) & 0xFF);
+		b[1] = (byte) ((len >>> 16) & 0xFF);
+		b[2] = (byte) ((len >>> 8) & 0xFF);
+		b[3] = (byte) ((len >>> 0) & 0xFF);
+		return b;
+	}
+	
+	private void readSecType(Reader reader) throws TransportException {
+		byte[] b = new byte[1];
+		reader.read(b);
+	}
+	
+	private void sendSecurityType(Writer os) throws TransportException {
+		// number-of-security-types
+		os.writeInt(1);
+		// security-types
+		// 1:None
+		os.writeInt(1);
+
+		/*
+		 * os.write(4); os.write(30); os.write(31); os.write(32); os.write(35);
+		 * os.flush();
+		 */
+	}
+	
+	private void sendSecResult(Writer os) throws TransportException {
+		byte[] b = castIntByte(0);
+		os.write(b);
+	}
+
+	private void readClientInit(Reader in) throws TransportException {
+		byte[] b = new byte[0];
+		in.readBytes(b);
+	}
+	
+	private void sendInitData(Writer os) throws TransportException {
+		os.write(context.getInitData());
+	}
+	
+    public void setProtocolContext(Protocol workingProtocol) {
+        context = workingProtocol;
+    }
+}