view src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java @ 7:30bd62abf424

add files
author one
date Wed, 08 Aug 2012 18:58:05 +0900
parents b32668b8e83c
children
line wrap: on
line source

package ac.ryukyu.treevnc.client;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

import ac.ryukyu.*;
import ac.ryukyu.treevnc.MulticastQueue;

import com.glavsoft.exceptions.TransportException;
import com.glavsoft.rfb.encoding.EncodingType;
import com.glavsoft.transport.Reader;
import com.glavsoft.viewer.ContainerManager;
import com.glavsoft.viewer.swing.ParametersHandler.ConnectionParams;

public class MyRfbProtoClient {
	final static int FramebufferUpdate = 0;
	final static int CheckDelay = 11;
	final static String versionMsg_3_855 = "RFB 003.855\n";
	private static final int INFLATE_BUFSIZE = 1024*100;
	private Reader reader;
	private EchoClient echoValue = new EchoClient();
	private String host,treenum,parent,pHost,leaderflag;
	private int echoPort,port,acceptPort;
	Socket clientSocket,sock;
	DataInputStream is;
	OutputStream os;
	private ContainerManager containerManager;
	private ServerSocket servSock;
	private byte initData[];
	private int clients = 0;
	private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>();
	boolean proxyFlag = false;
	int serverMajor, serverMinor;
	int clientMajor, clientMinor;
	
	private Inflater inflater = new Inflater();
	private Deflater deflater = new Deflater();
	
	public MyRfbProtoClient(Reader reader,String host,String port) {
		this.reader = reader;
	}
	
	public void setParam(ConnectionParams connectionParams) {
		pHost = connectionParams.hostName;
		echoPort = connectionParams.portNumber;
	}
	
	public boolean readProxyFlag() throws TransportException {
		int flag = reader.readUInt8();
		if(flag == 1)
			return true;
		else
			return false;
	}
	
	public byte[] readEchoPort() throws Exception {
		byte[] b = new byte[4];
		reader.readBytes(b, 0, b.length);
		//readFully(b);
		return b;
	}
	
	public void getParentName() {
		if (echoValue == null) {

			if (clientSocket == null) {

				// echo = new EchoClient(pHost, this);
				echoValue = new EchoClient(pHost, echoPort);
				echoValue.openport();

				echoValue = echoValue.requestHostName("1");
			} else {
				echoValue = new EchoClient();
				echoValue = echoValue.Interruption(clientSocket);
			}
		}
		// proxyからの返信で接続先を決定する
		host = echoValue.responseLine;
		parent = echoValue.parent;
		if (echoValue.treenum != null) {
			treenum = echoValue.treenum;
		} else {
			treenum = echoValue.treenum;
		}
		
		if (echoValue.leaderflag != null) {
			leaderflag = echoValue.leaderflag;
		} else {
			leaderflag = echoValue.leaderflag;
		}
	}
	
	int castByteInt(byte[] b) {
		ByteBuffer bb = ByteBuffer.wrap(b);
		int value = bb.getInt();
		return value;
	}
	
	Socket changeParent(String host, int port) throws IOException {
		sock = new Socket(host, port);
		return sock;
	}
	
	public Socket accept() throws IOException {
		return servSock.accept();
	}
	
	void initServSock(int port) throws IOException {
		servSock = new ServerSocket(port);
		acceptPort = port;
	}
	
	public void selectPort(int p) {
		int port = p;
		while (true) {
			try {
				initServSock(port);
				break;
			} catch (BindException e) {
				port++;
				continue;
			} catch (IOException e) {

			}
		}
		System.out.println("accept port = " + port);
	}
	
	public void newClient(AcceptThread acceptThread, final Socket newCli,
			final OutputStream os, final InputStream is) throws IOException {
		// createBimgFlag = true;
		// rfb.addSockTmp(newCli);
		//		addSock(newCli);
		final int myId = clients; 
		final MulticastQueue.Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient();
		final AtomicInteger writerRunning = new AtomicInteger();
		writerRunning.set(1);
		/**
		 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory
		 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running.
		 */
		final Runnable timer = new Runnable() {
			public void run() {
				int count = 0;
				for(;;) {
					long timeout = 40000/8;
					try {
						synchronized(this) {
							int state,flag;
							writerRunning.set(0);
							wait(timeout);
							flag = 0;
							while((state=writerRunning.get())==0) {
								c.poll(); // discard, should be timeout
								count++;
								if (flag==0) {
									System.out.println("Discarding "+myId + " count="+ count); flag = 1;
								}
								wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow...
							}
							if (flag==1) System.out.println("Resuming "+myId + " count="+count);
							if (state!=1) {
								System.out.println("Client died "+myId);
								break;
							}
						}
					} catch (InterruptedException e) {
					}
				}
			}
		};
		new Thread(timer).start();
		/**
		 * discard all incoming from clients
		 */
		final Runnable reader = new Runnable() {
			public void run() {
				byte b[] = new byte[4096];
				for(;;) {
					try {
						int c = is.read(b);
						if (c<=0) throw new IOException();
						// System.out.println("client read "+c);
					} catch (IOException e) {
						try {
							writerRunning.set(2);
							os.close();
							is.close();
						} catch (IOException e1) {
						}
						return;
					}
				}
			}
		};
		/**
		 * send packets to a client
		 */
		Runnable sender = new Runnable() {
			public void run() {
				writerRunning.set(1);
				try {
					/**
					 *  initial connection of RFB protocol
					 */
					sendRfbVersion(os);
//					readVersionMsg(is);
					int rfbMinor = readVersionMsg(is,os);
					sendSecurityType(os);
					readSecType(is);
					sendSecResult(os);
					readClientInit(is);
					sendInitData(os);
					new Thread(reader).start(); // discard incoming packet here after.
					if(rfbMinor == 855){
						//checkDilay(os);
						// send jpeg data of full screen.
						// sendFullScreen("jpeg" ,os);
					} else {
						// send raw data of full screen.
						
					}
					for (;;) {
						LinkedList<ByteBuffer> bufs = c.poll();
						int inputIndex = 0;
						ByteBuffer header = bufs.get(inputIndex);
						if (header==null) continue;
						if (header.get(0)==CheckDelay) {
							System.out.println("--------------------");
							//writeToClient(os, bufs, inputIndex);
						}
						if (header.get(0)==FramebufferUpdate) {
							// System.out.println("client "+ myId);
						}
						writeToClient(os, bufs, inputIndex);
						writerRunning.set(1);  // yes my client is awaking.
					}
				} catch (IOException e) {
					try {
						writerRunning.set(2);
						os.close();
					} catch (IOException e1) {
						/* if socket closed 	cliList.remove(newCli); */
					}
				}
			}

			public void writeToClient(final OutputStream os,
					LinkedList<ByteBuffer> bufs, int inputIndex)
					throws IOException {
				while(inputIndex < bufs.size()) {
					ByteBuffer b = bufs.get(inputIndex++);
					os.write(b.array(), b.position(), b.limit());
				}
				os.flush();
			}
		};
		clients++;
		new Thread(sender).start();

	}
	
	void sendRfbVersion(OutputStream os) throws IOException {
		os.write(versionMsg_3_855.getBytes());
//		os.write(versionMsg_3_8.getBytes());
	}
	int readVersionMsg(InputStream is, OutputStream os) throws IOException {

		byte[] b = new byte[12];

		is.read(b);

		if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
				|| (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
				|| (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
				|| (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
				|| (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
			throw new IOException("Host " + host + " port " + port
					+ " is not an RFB server");
		}

		int rfbMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
		int rfbMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');

		if (rfbMajor < 3) {
			throw new IOException(
			"RFB server does not support protocol version 3");
		}

		if (rfbMinor == 855) {
			sendProxyFlag(os);
//			if(proxyFlag)sendPortNumber(os);
		}
		return rfbMinor;
		
	}	void readVersionMsg(InputStream is) throws IOException {

		byte[] b = new byte[12];

		is.read(b);

		if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
				|| (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
				|| (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
				|| (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
				|| (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
			throw new IOException("Host " + host + " port " + port
					+ " is not an RFB server");
		}

		serverMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
		serverMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');

		if (serverMajor < 3) {
			throw new IOException(
					"RFB server does not support protocol version 3");
		}

	}
	
	void sendSecurityType(OutputStream os) throws IOException {
		// number-of-security-types
		os.write(1);
		// security-types
		// 1:None
		os.write(1);
	}
	void readSecType(InputStream is) throws IOException {
		byte[] b = new byte[1];
		is.read(b);

	}
	
	void sendSecResult(OutputStream os) throws IOException {
		byte[] b = castIntByte(0);
		os.write(b);
	}
	
	void readClientInit(InputStream in) throws IOException {
		byte[] b = new byte[0];
		in.read(b);
	}
	
	void sendInitData(OutputStream os) throws IOException {
		os.write(initData);
	}
	
	void sendProxyFlag(OutputStream os) throws IOException {
		if(proxyFlag) os.write(1);
		else os.write(0);
	}
	
	byte[] castIntByte(int len) {
		byte[] b = new byte[4];
		b[0] = (byte) ((len >>> 24) & 0xFF);
		b[1] = (byte) ((len >>> 16) & 0xFF);
		b[2] = (byte) ((len >>> 8) & 0xFF);
		b[3] = (byte) ((len >>> 0) & 0xFF);
		return b;
	}
	
	/**
	 * gzip byte arrays
	 * @param deflater
	 * @param inputs
	 *            byte data[]
	 * @param inputIndex 
	 * @param outputs
	 *            byte data[]
	 * @return  byte length in last byte array
	 * @throws IOException
	 */
	public int zip(Deflater deflater,LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
		int len = 0;
		ByteBuffer c1= ByteBuffer.allocate(INFLATE_BUFSIZE);
		while(inputIndex < inputs.size() ) {
			ByteBuffer b1 = inputs.get(inputIndex++);
			deflater.setInput(b1.array(),b1.position(),b1.remaining());
			/**
			 * If we finish() stream and reset() it, Deflater start new gzip stream, this makes continuous zlib reader unhappy.
			 * if we remove finish(), Deflater.deflate() never flushes its output. The original zlib deflate has flush flag. I'm pretty
			 * sure this a kind of bug of Java library.
			 */
			if (inputIndex==inputs.size())	
				deflater.finish();
			int len1 = 0;
			do {
				len1 = deflater.deflate(c1.array(),c1.position(),c1.remaining());
				if (len1>0) {
					len += len1;
					c1.position(c1.position()+len1); 
					if (c1.remaining()==0) {
						c1.flip();	outputs.addLast(c1);
						c1 = ByteBuffer.allocate(INFLATE_BUFSIZE);
					}
				}
			} while (len1 >0 || !deflater.needsInput()); // &&!deflater.finished());
		}
		if (c1.position()!=0) {
			c1.flip();	outputs.addLast(c1);
		}
		deflater.reset();
		return len;
	}
	
	
	/**
	 * gunzip byte arrays
	 * @param inflater
	 * @param inputs
	 *            byte data[]
	 * @param outputs
	 *            byte data[]
	 *@return  number of total bytes            
	 * @throws IOException
	 */
	public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs,
			int inputIndex, LinkedList<ByteBuffer> outputs, int bufSize)
			throws DataFormatException {
		int len = 0;
		ByteBuffer buf = ByteBuffer.allocate(bufSize);
		while (inputIndex < inputs.size()) {
			ByteBuffer input = inputs.get(inputIndex++);
			inflater.setInput(input.array(), input.position(), input.limit());
			// if (inputIndex==inputs.size()) if inflater/deflater has symmetry,
			// we need this
			// inflater.end(); but this won't work
			do {
				int len0 = inflater.inflate(buf.array(), buf.position(),
						buf.remaining());
				if (len0 > 0) {
					buf.position(buf.position() + len0);
					len += len0;
					if (buf.remaining() == 0) {
						buf.flip();
						outputs.addLast(buf);
						buf = ByteBuffer.allocate(bufSize);
					}
				}
			} while (!inflater.needsInput());
		}
		if (buf.position() != 0) {
			buf.flip();
			outputs.addLast(buf);
		}
		return len;
	}
	
	void readSendData(int dataLen,Reader reader) throws IOException, DataFormatException, TransportException {
		LinkedList<ByteBuffer>bufs = new LinkedList<ByteBuffer>();
		ByteBuffer header = ByteBuffer.allocate(16);
		reader.readBytes(header.array(),0,16); 
		header.limit(16);
		if (header.get(0)==FramebufferUpdate) {
			int encoding = header.getInt(12);
			if (encoding==EncodingType.ZRLE.getId()||encoding==EncodingType.ZLIB.getId()) { // ZRLEE is already recompressed
				ByteBuffer len = ByteBuffer.allocate(4);
				reader.readBytes(len.array(),0,4); len.limit(4);
				ByteBuffer inputData = ByteBuffer.allocate(dataLen-20);
				reader.readBytes(inputData.array(),0,inputData.capacity()); inputData.limit(dataLen-20);
				LinkedList<ByteBuffer>inputs = new LinkedList<ByteBuffer>();
				inputs.add(inputData);

				header.putInt(12, EncodingType.ZRLEE.getId()); // means recompress every time
				// using new Deflecter every time is incompatible with the protocol, clients have to be modified.
				Deflater nDeflater = deflater; // new Deflater();
				LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
				unzip(inflater, inputs, 0 , out, INFLATE_BUFSIZE);
				// dump32(inputs);
				int len2 = zip(nDeflater, out, 0, bufs);
				ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip();
				bufs.addFirst(blen);

				bufs.addFirst(header);
				multicastqueue.put(bufs);
				is.reset();
				return ;
			}
		} 
		bufs.add(header);
		if (dataLen>16) {
			ByteBuffer b = ByteBuffer.allocate(dataLen-16);
			reader.readBytes(b.array(),0,dataLen-16); b.limit(dataLen-16);
			bufs.add(b);
		}
		multicastqueue.put(bufs);
		is.reset();

		// It may be compressed. We can inflate here to avoid repeating clients decompressing here,
		// but it may generate too many large data. It is better to do it in each client.
		// But we have do inflation for all input data, so we have to do it here.
	}


	
}