comparison src/myVncProxy/MyRfbProto.java @ 113:8424f64dd736

time out and discarding. kill time out thread after client death.
author Shinji KONO <kono@ie.u-ryukyu.ac.jp>
date Fri, 05 Aug 2011 14:33:42 +0900
parents 1a5afcf100a5
children 02016fcb9105
comparison
equal deleted inserted replaced
112:1a5afcf100a5 113:8424f64dd736
25 25
26 import myVncProxy.MulticastQueue.Client; 26 import myVncProxy.MulticastQueue.Client;
27 27
28 import java.util.concurrent.ExecutorService; 28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicBoolean; 29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.zip.DataFormatException; 31 import java.util.zip.DataFormatException;
31 import java.util.zip.Deflater; 32 import java.util.zip.Deflater;
32 import java.util.zip.Inflater; 33 import java.util.zip.Inflater;
33 import java.io.OutputStream; 34 import java.io.OutputStream;
34 35
308 rectX = readU16(); // 4 309 rectX = readU16(); // 4
309 rectY = readU16(); // 6 310 rectY = readU16(); // 6
310 rectW = readU16(); // 8 311 rectW = readU16(); // 8
311 rectH = readU16(); // 10 312 rectH = readU16(); // 10
312 encoding = readU32(); // 12 313 encoding = readU32(); // 12
313 System.out.println("encoding = "+encoding); 314 // System.out.println("encoding = "+encoding);
314 if (encoding == EncodingZRLE|| encoding==EncodingZRLEE||encoding==EncodingZlib) 315 if (encoding == EncodingZRLE|| encoding==EncodingZRLEE||encoding==EncodingZlib)
315 zLen = readU32(); 316 zLen = readU32();
316 else 317 else
317 zLen = 0; 318 zLen = 0;
318 is.reset(); 319 is.reset();
451 long accTime = System.currentTimeMillis(); 452 long accTime = System.currentTimeMillis();
452 long time = accTime - startCheckTime; 453 long time = accTime - startCheckTime;
453 System.out.println("checkMillis: " + time); 454 System.out.println("checkMillis: " + time);
454 } 455 }
455 456
456 void printStatus() {
457 System.out.println();
458 }
459 457
460 synchronized void changeStatusFlag() { 458 synchronized void changeStatusFlag() {
461 printStatusFlag = true; 459 printStatusFlag = true;
462 } 460 }
463 461
640 // createBimgFlag = true; 638 // createBimgFlag = true;
641 // rfb.addSockTmp(newCli); 639 // rfb.addSockTmp(newCli);
642 // addSock(newCli); 640 // addSock(newCli);
643 final int myId = clients; 641 final int myId = clients;
644 final Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient(); 642 final Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient();
645 final AtomicBoolean writerRunning = new AtomicBoolean(); 643 final AtomicInteger writerRunning = new AtomicInteger();
644 writerRunning.set(1);
646 /** 645 /**
647 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory 646 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory
648 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running. 647 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running.
649 */ 648 */
650 final Runnable timer = new Runnable() { 649 final Runnable timer = new Runnable() {
651 public void run() { 650 public void run() {
651 int count = 0;
652 for(;;) { 652 for(;;) {
653 long timeout = 30000; 653 long timeout = 30000/4;
654 try { 654 try {
655 synchronized(this) { 655 synchronized(this) {
656 int state,flag;
657 writerRunning.set(0);
656 wait(timeout); 658 wait(timeout);
657 writerRunning.set(false); 659 flag = 0;
658 while (!writerRunning.get()) { 660 while((state=writerRunning.get())==0) {
659 c.poll(); // discard, should be timeout 661 c.poll(); // discard, should be timeout
660 System.out.println("Discarded "+myId); 662 count++;
661 wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow... 663 if (flag==0) {
664 System.out.println("Discarding "+myId + " count="+ count); flag = 1;
665 }
666 wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow...
667 }
668 if (flag==1) System.out.println("Resuming "+myId + " count="+count);
669 if (state!=1) {
670 System.out.println("Client died "+myId);
671 break;
662 } 672 }
663 } 673 }
664 } catch (InterruptedException e) { 674 } catch (InterruptedException e) {
665 } 675 }
666 } 676 }
675 byte b[] = new byte[4096]; 685 byte b[] = new byte[4096];
676 for(;;) { 686 for(;;) {
677 try { 687 try {
678 int c = is.read(b); 688 int c = is.read(b);
679 if (c<=0) throw new IOException(); 689 if (c<=0) throw new IOException();
680 System.out.println("client read "+c); 690 // System.out.println("client read "+c);
681 } catch (IOException e) { 691 } catch (IOException e) {
682 try { 692 try {
693 writerRunning.set(2);
683 os.close(); 694 os.close();
684 is.close(); 695 is.close();
685 } catch (IOException e1) { 696 } catch (IOException e1) {
686 } 697 }
687 return; 698 return;
692 /** 703 /**
693 * send packets to a client 704 * send packets to a client
694 */ 705 */
695 Runnable sender = new Runnable() { 706 Runnable sender = new Runnable() {
696 public void run() { 707 public void run() {
708 writerRunning.set(1);
697 try { 709 try {
698 /** 710 /**
699 * initial connection of RFB protocol 711 * initial connection of RFB protocol
700 */ 712 */
701 sendRfbVersion(os); 713 sendRfbVersion(os);
710 LinkedList<ByteBuffer> bufs = c.poll(); 722 LinkedList<ByteBuffer> bufs = c.poll();
711 int inputIndex = 0; 723 int inputIndex = 0;
712 ByteBuffer header = bufs.get(inputIndex); 724 ByteBuffer header = bufs.get(inputIndex);
713 if (header==null) continue; 725 if (header==null) continue;
714 if (header.get(0)==RfbProto.FramebufferUpdate) { 726 if (header.get(0)==RfbProto.FramebufferUpdate) {
715 System.out.println("client "+ myId); 727 // System.out.println("client "+ myId);
716 } 728 }
717 writeToClient(os, bufs, inputIndex); 729 writeToClient(os, bufs, inputIndex);
718 writerRunning.set(true); // yes my client is awaking. 730 writerRunning.set(1); // yes my client is awaking.
719 } 731 }
720 } catch (IOException e) { 732 } catch (IOException e) {
721 try { 733 try {
734 writerRunning.set(2);
722 os.close(); 735 os.close();
723 } catch (IOException e1) { 736 } catch (IOException e1) {
724 } 737 }
725 /* if socket closed cliList.remove(newCli); */ 738 /* if socket closed cliList.remove(newCli); */
726 } 739 }