comparison src/main/java/jp/ac/u_ryukyu/treevnc/client/MyRfbProtoClient.java @ 12:12c3a73be47f

rename package
author one
date Tue, 21 Aug 2012 14:24:38 +0900
parents src/main/java/ac/ryukyu/treevnc/client/MyRfbProtoClient.java@30bd62abf424
children 98519d16a8c3
comparison
equal deleted inserted replaced
11:57ae9fbb1245 12:12c3a73be47f
1 package jp.ac.u_ryukyu.treevnc.client;
2
3 import java.io.DataInputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.net.BindException;
8 import java.net.ServerSocket;
9 import java.net.Socket;
10 import java.nio.ByteBuffer;
11 import java.util.LinkedList;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.zip.DataFormatException;
14 import java.util.zip.Deflater;
15 import java.util.zip.Inflater;
16
17 import jp.ac.u_ryukyu.treevnc.MulticastQueue;
18
19
20 import com.glavsoft.exceptions.TransportException;
21 import com.glavsoft.rfb.encoding.EncodingType;
22 import com.glavsoft.transport.Reader;
23 import com.glavsoft.viewer.ContainerManager;
24 import com.glavsoft.viewer.swing.ParametersHandler.ConnectionParams;
25
26 public class MyRfbProtoClient {
27 final static int FramebufferUpdate = 0;
28 final static int CheckDelay = 11;
29 final static String versionMsg_3_855 = "RFB 003.855\n";
30 private static final int INFLATE_BUFSIZE = 1024*100;
31 private Reader reader;
32 private EchoClient echoValue = new EchoClient();
33 private String host,treenum,parent,pHost,leaderflag;
34 private int echoPort,port,acceptPort;
35 Socket clientSocket,sock;
36 DataInputStream is;
37 OutputStream os;
38 private ContainerManager containerManager;
39 private ServerSocket servSock;
40 private byte initData[];
41 private int clients = 0;
42 private MulticastQueue<LinkedList<ByteBuffer>> multicastqueue = new MulticastQueue<LinkedList<ByteBuffer>>();
43 boolean proxyFlag = false;
44 int serverMajor, serverMinor;
45 int clientMajor, clientMinor;
46
47 private Inflater inflater = new Inflater();
48 private Deflater deflater = new Deflater();
49
50 public MyRfbProtoClient(Reader reader,String host,String port) {
51 this.reader = reader;
52 }
53
54 public void setParam(ConnectionParams connectionParams) {
55 pHost = connectionParams.hostName;
56 echoPort = connectionParams.portNumber;
57 }
58
59 public boolean readProxyFlag() throws TransportException {
60 int flag = reader.readUInt8();
61 if(flag == 1)
62 return true;
63 else
64 return false;
65 }
66
67 public byte[] readEchoPort() throws Exception {
68 byte[] b = new byte[4];
69 reader.readBytes(b, 0, b.length);
70 //readFully(b);
71 return b;
72 }
73
74 public void getParentName() {
75 if (echoValue == null) {
76
77 if (clientSocket == null) {
78
79 // echo = new EchoClient(pHost, this);
80 echoValue = new EchoClient(pHost, echoPort);
81 echoValue.openport();
82
83 echoValue = echoValue.requestHostName("1");
84 } else {
85 echoValue = new EchoClient();
86 echoValue = echoValue.Interruption(clientSocket);
87 }
88 }
89 // proxyからの返信で接続先を決定する
90 host = echoValue.responseLine;
91 parent = echoValue.parent;
92 if (echoValue.treenum != null) {
93 treenum = echoValue.treenum;
94 } else {
95 treenum = echoValue.treenum;
96 }
97
98 if (echoValue.leaderflag != null) {
99 leaderflag = echoValue.leaderflag;
100 } else {
101 leaderflag = echoValue.leaderflag;
102 }
103 }
104
105 int castByteInt(byte[] b) {
106 ByteBuffer bb = ByteBuffer.wrap(b);
107 int value = bb.getInt();
108 return value;
109 }
110
111 Socket changeParent(String host, int port) throws IOException {
112 sock = new Socket(host, port);
113 return sock;
114 }
115
116 public Socket accept() throws IOException {
117 return servSock.accept();
118 }
119
120 void initServSock(int port) throws IOException {
121 servSock = new ServerSocket(port);
122 acceptPort = port;
123 }
124
125 public void selectPort(int p) {
126 int port = p;
127 while (true) {
128 try {
129 initServSock(port);
130 break;
131 } catch (BindException e) {
132 port++;
133 continue;
134 } catch (IOException e) {
135
136 }
137 }
138 System.out.println("accept port = " + port);
139 }
140
141 public void newClient(AcceptThread acceptThread, final Socket newCli,
142 final OutputStream os, final InputStream is) throws IOException {
143 // createBimgFlag = true;
144 // rfb.addSockTmp(newCli);
145 // addSock(newCli);
146 final int myId = clients;
147 final MulticastQueue.Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient();
148 final AtomicInteger writerRunning = new AtomicInteger();
149 writerRunning.set(1);
150 /**
151 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory
152 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running.
153 */
154 final Runnable timer = new Runnable() {
155 public void run() {
156 int count = 0;
157 for(;;) {
158 long timeout = 40000/8;
159 try {
160 synchronized(this) {
161 int state,flag;
162 writerRunning.set(0);
163 wait(timeout);
164 flag = 0;
165 while((state=writerRunning.get())==0) {
166 c.poll(); // discard, should be timeout
167 count++;
168 if (flag==0) {
169 System.out.println("Discarding "+myId + " count="+ count); flag = 1;
170 }
171 wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow...
172 }
173 if (flag==1) System.out.println("Resuming "+myId + " count="+count);
174 if (state!=1) {
175 System.out.println("Client died "+myId);
176 break;
177 }
178 }
179 } catch (InterruptedException e) {
180 }
181 }
182 }
183 };
184 new Thread(timer).start();
185 /**
186 * discard all incoming from clients
187 */
188 final Runnable reader = new Runnable() {
189 public void run() {
190 byte b[] = new byte[4096];
191 for(;;) {
192 try {
193 int c = is.read(b);
194 if (c<=0) throw new IOException();
195 // System.out.println("client read "+c);
196 } catch (IOException e) {
197 try {
198 writerRunning.set(2);
199 os.close();
200 is.close();
201 } catch (IOException e1) {
202 }
203 return;
204 }
205 }
206 }
207 };
208 /**
209 * send packets to a client
210 */
211 Runnable sender = new Runnable() {
212 public void run() {
213 writerRunning.set(1);
214 try {
215 /**
216 * initial connection of RFB protocol
217 */
218 sendRfbVersion(os);
219 // readVersionMsg(is);
220 int rfbMinor = readVersionMsg(is,os);
221 sendSecurityType(os);
222 readSecType(is);
223 sendSecResult(os);
224 readClientInit(is);
225 sendInitData(os);
226 new Thread(reader).start(); // discard incoming packet here after.
227 if(rfbMinor == 855){
228 //checkDilay(os);
229 // send jpeg data of full screen.
230 // sendFullScreen("jpeg" ,os);
231 } else {
232 // send raw data of full screen.
233
234 }
235 for (;;) {
236 LinkedList<ByteBuffer> bufs = c.poll();
237 int inputIndex = 0;
238 ByteBuffer header = bufs.get(inputIndex);
239 if (header==null) continue;
240 if (header.get(0)==CheckDelay) {
241 System.out.println("--------------------");
242 //writeToClient(os, bufs, inputIndex);
243 }
244 if (header.get(0)==FramebufferUpdate) {
245 // System.out.println("client "+ myId);
246 }
247 writeToClient(os, bufs, inputIndex);
248 writerRunning.set(1); // yes my client is awaking.
249 }
250 } catch (IOException e) {
251 try {
252 writerRunning.set(2);
253 os.close();
254 } catch (IOException e1) {
255 /* if socket closed cliList.remove(newCli); */
256 }
257 }
258 }
259
260 public void writeToClient(final OutputStream os,
261 LinkedList<ByteBuffer> bufs, int inputIndex)
262 throws IOException {
263 while(inputIndex < bufs.size()) {
264 ByteBuffer b = bufs.get(inputIndex++);
265 os.write(b.array(), b.position(), b.limit());
266 }
267 os.flush();
268 }
269 };
270 clients++;
271 new Thread(sender).start();
272
273 }
274
275 void sendRfbVersion(OutputStream os) throws IOException {
276 os.write(versionMsg_3_855.getBytes());
277 // os.write(versionMsg_3_8.getBytes());
278 }
279 int readVersionMsg(InputStream is, OutputStream os) throws IOException {
280
281 byte[] b = new byte[12];
282
283 is.read(b);
284
285 if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
286 || (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
287 || (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
288 || (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
289 || (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
290 throw new IOException("Host " + host + " port " + port
291 + " is not an RFB server");
292 }
293
294 int rfbMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
295 int rfbMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');
296
297 if (rfbMajor < 3) {
298 throw new IOException(
299 "RFB server does not support protocol version 3");
300 }
301
302 if (rfbMinor == 855) {
303 sendProxyFlag(os);
304 // if(proxyFlag)sendPortNumber(os);
305 }
306 return rfbMinor;
307
308 } void readVersionMsg(InputStream is) throws IOException {
309
310 byte[] b = new byte[12];
311
312 is.read(b);
313
314 if ((b[0] != 'R') || (b[1] != 'F') || (b[2] != 'B') || (b[3] != ' ')
315 || (b[4] < '0') || (b[4] > '9') || (b[5] < '0') || (b[5] > '9')
316 || (b[6] < '0') || (b[6] > '9') || (b[7] != '.')
317 || (b[8] < '0') || (b[8] > '9') || (b[9] < '0') || (b[9] > '9')
318 || (b[10] < '0') || (b[10] > '9') || (b[11] != '\n')) {
319 throw new IOException("Host " + host + " port " + port
320 + " is not an RFB server");
321 }
322
323 serverMajor = (b[4] - '0') * 100 + (b[5] - '0') * 10 + (b[6] - '0');
324 serverMinor = (b[8] - '0') * 100 + (b[9] - '0') * 10 + (b[10] - '0');
325
326 if (serverMajor < 3) {
327 throw new IOException(
328 "RFB server does not support protocol version 3");
329 }
330
331 }
332
333 void sendSecurityType(OutputStream os) throws IOException {
334 // number-of-security-types
335 os.write(1);
336 // security-types
337 // 1:None
338 os.write(1);
339 }
340 void readSecType(InputStream is) throws IOException {
341 byte[] b = new byte[1];
342 is.read(b);
343
344 }
345
346 void sendSecResult(OutputStream os) throws IOException {
347 byte[] b = castIntByte(0);
348 os.write(b);
349 }
350
351 void readClientInit(InputStream in) throws IOException {
352 byte[] b = new byte[0];
353 in.read(b);
354 }
355
356 void sendInitData(OutputStream os) throws IOException {
357 os.write(initData);
358 }
359
360 void sendProxyFlag(OutputStream os) throws IOException {
361 if(proxyFlag) os.write(1);
362 else os.write(0);
363 }
364
365 byte[] castIntByte(int len) {
366 byte[] b = new byte[4];
367 b[0] = (byte) ((len >>> 24) & 0xFF);
368 b[1] = (byte) ((len >>> 16) & 0xFF);
369 b[2] = (byte) ((len >>> 8) & 0xFF);
370 b[3] = (byte) ((len >>> 0) & 0xFF);
371 return b;
372 }
373
374 /**
375 * gzip byte arrays
376 * @param deflater
377 * @param inputs
378 * byte data[]
379 * @param inputIndex
380 * @param outputs
381 * byte data[]
382 * @return byte length in last byte array
383 * @throws IOException
384 */
385 public int zip(Deflater deflater,LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
386 int len = 0;
387 ByteBuffer c1= ByteBuffer.allocate(INFLATE_BUFSIZE);
388 while(inputIndex < inputs.size() ) {
389 ByteBuffer b1 = inputs.get(inputIndex++);
390 deflater.setInput(b1.array(),b1.position(),b1.remaining());
391 /**
392 * If we finish() stream and reset() it, Deflater start new gzip stream, this makes continuous zlib reader unhappy.
393 * if we remove finish(), Deflater.deflate() never flushes its output. The original zlib deflate has flush flag. I'm pretty
394 * sure this a kind of bug of Java library.
395 */
396 if (inputIndex==inputs.size())
397 deflater.finish();
398 int len1 = 0;
399 do {
400 len1 = deflater.deflate(c1.array(),c1.position(),c1.remaining());
401 if (len1>0) {
402 len += len1;
403 c1.position(c1.position()+len1);
404 if (c1.remaining()==0) {
405 c1.flip(); outputs.addLast(c1);
406 c1 = ByteBuffer.allocate(INFLATE_BUFSIZE);
407 }
408 }
409 } while (len1 >0 || !deflater.needsInput()); // &&!deflater.finished());
410 }
411 if (c1.position()!=0) {
412 c1.flip(); outputs.addLast(c1);
413 }
414 deflater.reset();
415 return len;
416 }
417
418
419 /**
420 * gunzip byte arrays
421 * @param inflater
422 * @param inputs
423 * byte data[]
424 * @param outputs
425 * byte data[]
426 *@return number of total bytes
427 * @throws IOException
428 */
429 public int unzip(Inflater inflater, LinkedList<ByteBuffer> inputs,
430 int inputIndex, LinkedList<ByteBuffer> outputs, int bufSize)
431 throws DataFormatException {
432 int len = 0;
433 ByteBuffer buf = ByteBuffer.allocate(bufSize);
434 while (inputIndex < inputs.size()) {
435 ByteBuffer input = inputs.get(inputIndex++);
436 inflater.setInput(input.array(), input.position(), input.limit());
437 // if (inputIndex==inputs.size()) if inflater/deflater has symmetry,
438 // we need this
439 // inflater.end(); but this won't work
440 do {
441 int len0 = inflater.inflate(buf.array(), buf.position(),
442 buf.remaining());
443 if (len0 > 0) {
444 buf.position(buf.position() + len0);
445 len += len0;
446 if (buf.remaining() == 0) {
447 buf.flip();
448 outputs.addLast(buf);
449 buf = ByteBuffer.allocate(bufSize);
450 }
451 }
452 } while (!inflater.needsInput());
453 }
454 if (buf.position() != 0) {
455 buf.flip();
456 outputs.addLast(buf);
457 }
458 return len;
459 }
460
461 void readSendData(int dataLen,Reader reader) throws IOException, DataFormatException, TransportException {
462 LinkedList<ByteBuffer>bufs = new LinkedList<ByteBuffer>();
463 ByteBuffer header = ByteBuffer.allocate(16);
464 reader.readBytes(header.array(),0,16);
465 header.limit(16);
466 if (header.get(0)==FramebufferUpdate) {
467 int encoding = header.getInt(12);
468 if (encoding==EncodingType.ZRLE.getId()||encoding==EncodingType.ZLIB.getId()) { // ZRLEE is already recompressed
469 ByteBuffer len = ByteBuffer.allocate(4);
470 reader.readBytes(len.array(),0,4); len.limit(4);
471 ByteBuffer inputData = ByteBuffer.allocate(dataLen-20);
472 reader.readBytes(inputData.array(),0,inputData.capacity()); inputData.limit(dataLen-20);
473 LinkedList<ByteBuffer>inputs = new LinkedList<ByteBuffer>();
474 inputs.add(inputData);
475
476 header.putInt(12, EncodingType.ZRLEE.getId()); // means recompress every time
477 // using new Deflecter every time is incompatible with the protocol, clients have to be modified.
478 Deflater nDeflater = deflater; // new Deflater();
479 LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
480 unzip(inflater, inputs, 0 , out, INFLATE_BUFSIZE);
481 // dump32(inputs);
482 int len2 = zip(nDeflater, out, 0, bufs);
483 ByteBuffer blen = ByteBuffer.allocate(4); blen.putInt(len2); blen.flip();
484 bufs.addFirst(blen);
485
486 bufs.addFirst(header);
487 multicastqueue.put(bufs);
488 is.reset();
489 return ;
490 }
491 }
492 bufs.add(header);
493 if (dataLen>16) {
494 ByteBuffer b = ByteBuffer.allocate(dataLen-16);
495 reader.readBytes(b.array(),0,dataLen-16); b.limit(dataLen-16);
496 bufs.add(b);
497 }
498 multicastqueue.put(bufs);
499 is.reset();
500
501 // It may be compressed. We can inflate here to avoid repeating clients decompressing here,
502 // but it may generate too many large data. It is better to do it in each client.
503 // But we have do inflation for all input data, so we have to do it here.
504 }
505
506
507
508 }