Mercurial > hg > Members > nobuyasu > tightVNCProxy
comparison src/myVncProxy/MyRfbProto.java @ 113:8424f64dd736
time out and discarding. kill time out thread after client death.
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Fri, 05 Aug 2011 14:33:42 +0900 |
parents | 1a5afcf100a5 |
children | 02016fcb9105 |
comparison
equal
deleted
inserted
replaced
112:1a5afcf100a5 | 113:8424f64dd736 |
---|---|
25 | 25 |
26 import myVncProxy.MulticastQueue.Client; | 26 import myVncProxy.MulticastQueue.Client; |
27 | 27 |
28 import java.util.concurrent.ExecutorService; | 28 import java.util.concurrent.ExecutorService; |
29 import java.util.concurrent.atomic.AtomicBoolean; | 29 import java.util.concurrent.atomic.AtomicBoolean; |
30 import java.util.concurrent.atomic.AtomicInteger; | |
30 import java.util.zip.DataFormatException; | 31 import java.util.zip.DataFormatException; |
31 import java.util.zip.Deflater; | 32 import java.util.zip.Deflater; |
32 import java.util.zip.Inflater; | 33 import java.util.zip.Inflater; |
33 import java.io.OutputStream; | 34 import java.io.OutputStream; |
34 | 35 |
308 rectX = readU16(); // 4 | 309 rectX = readU16(); // 4 |
309 rectY = readU16(); // 6 | 310 rectY = readU16(); // 6 |
310 rectW = readU16(); // 8 | 311 rectW = readU16(); // 8 |
311 rectH = readU16(); // 10 | 312 rectH = readU16(); // 10 |
312 encoding = readU32(); // 12 | 313 encoding = readU32(); // 12 |
313 System.out.println("encoding = "+encoding); | 314 // System.out.println("encoding = "+encoding); |
314 if (encoding == EncodingZRLE|| encoding==EncodingZRLEE||encoding==EncodingZlib) | 315 if (encoding == EncodingZRLE|| encoding==EncodingZRLEE||encoding==EncodingZlib) |
315 zLen = readU32(); | 316 zLen = readU32(); |
316 else | 317 else |
317 zLen = 0; | 318 zLen = 0; |
318 is.reset(); | 319 is.reset(); |
451 long accTime = System.currentTimeMillis(); | 452 long accTime = System.currentTimeMillis(); |
452 long time = accTime - startCheckTime; | 453 long time = accTime - startCheckTime; |
453 System.out.println("checkMillis: " + time); | 454 System.out.println("checkMillis: " + time); |
454 } | 455 } |
455 | 456 |
456 void printStatus() { | |
457 System.out.println(); | |
458 } | |
459 | 457 |
460 synchronized void changeStatusFlag() { | 458 synchronized void changeStatusFlag() { |
461 printStatusFlag = true; | 459 printStatusFlag = true; |
462 } | 460 } |
463 | 461 |
640 // createBimgFlag = true; | 638 // createBimgFlag = true; |
641 // rfb.addSockTmp(newCli); | 639 // rfb.addSockTmp(newCli); |
642 // addSock(newCli); | 640 // addSock(newCli); |
643 final int myId = clients; | 641 final int myId = clients; |
644 final Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient(); | 642 final Client <LinkedList<ByteBuffer>> c = multicastqueue.newClient(); |
645 final AtomicBoolean writerRunning = new AtomicBoolean(); | 643 final AtomicInteger writerRunning = new AtomicInteger(); |
644 writerRunning.set(1); | |
646 /** | 645 /** |
647 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory | 646 * Timeout thread. If a client is suspended, it has top of queue indefinitely, which caused memory |
648 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running. | 647 * overflow. After the timeout, we poll the queue and discard it. Start long wait if writer is running. |
649 */ | 648 */ |
650 final Runnable timer = new Runnable() { | 649 final Runnable timer = new Runnable() { |
651 public void run() { | 650 public void run() { |
651 int count = 0; | |
652 for(;;) { | 652 for(;;) { |
653 long timeout = 30000; | 653 long timeout = 30000/4; |
654 try { | 654 try { |
655 synchronized(this) { | 655 synchronized(this) { |
656 int state,flag; | |
657 writerRunning.set(0); | |
656 wait(timeout); | 658 wait(timeout); |
657 writerRunning.set(false); | 659 flag = 0; |
658 while (!writerRunning.get()) { | 660 while((state=writerRunning.get())==0) { |
659 c.poll(); // discard, should be timeout | 661 c.poll(); // discard, should be timeout |
660 System.out.println("Discarded "+myId); | 662 count++; |
661 wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow... | 663 if (flag==0) { |
664 System.out.println("Discarding "+myId + " count="+ count); flag = 1; | |
665 } | |
666 wait(10); // if this is too short, writer cannot take the poll, if this is too long, memory will overflow... | |
667 } | |
668 if (flag==1) System.out.println("Resuming "+myId + " count="+count); | |
669 if (state!=1) { | |
670 System.out.println("Client died "+myId); | |
671 break; | |
662 } | 672 } |
663 } | 673 } |
664 } catch (InterruptedException e) { | 674 } catch (InterruptedException e) { |
665 } | 675 } |
666 } | 676 } |
675 byte b[] = new byte[4096]; | 685 byte b[] = new byte[4096]; |
676 for(;;) { | 686 for(;;) { |
677 try { | 687 try { |
678 int c = is.read(b); | 688 int c = is.read(b); |
679 if (c<=0) throw new IOException(); | 689 if (c<=0) throw new IOException(); |
680 System.out.println("client read "+c); | 690 // System.out.println("client read "+c); |
681 } catch (IOException e) { | 691 } catch (IOException e) { |
682 try { | 692 try { |
693 writerRunning.set(2); | |
683 os.close(); | 694 os.close(); |
684 is.close(); | 695 is.close(); |
685 } catch (IOException e1) { | 696 } catch (IOException e1) { |
686 } | 697 } |
687 return; | 698 return; |
692 /** | 703 /** |
693 * send packets to a client | 704 * send packets to a client |
694 */ | 705 */ |
695 Runnable sender = new Runnable() { | 706 Runnable sender = new Runnable() { |
696 public void run() { | 707 public void run() { |
708 writerRunning.set(1); | |
697 try { | 709 try { |
698 /** | 710 /** |
699 * initial connection of RFB protocol | 711 * initial connection of RFB protocol |
700 */ | 712 */ |
701 sendRfbVersion(os); | 713 sendRfbVersion(os); |
710 LinkedList<ByteBuffer> bufs = c.poll(); | 722 LinkedList<ByteBuffer> bufs = c.poll(); |
711 int inputIndex = 0; | 723 int inputIndex = 0; |
712 ByteBuffer header = bufs.get(inputIndex); | 724 ByteBuffer header = bufs.get(inputIndex); |
713 if (header==null) continue; | 725 if (header==null) continue; |
714 if (header.get(0)==RfbProto.FramebufferUpdate) { | 726 if (header.get(0)==RfbProto.FramebufferUpdate) { |
715 System.out.println("client "+ myId); | 727 // System.out.println("client "+ myId); |
716 } | 728 } |
717 writeToClient(os, bufs, inputIndex); | 729 writeToClient(os, bufs, inputIndex); |
718 writerRunning.set(true); // yes my client is awaking. | 730 writerRunning.set(1); // yes my client is awaking. |
719 } | 731 } |
720 } catch (IOException e) { | 732 } catch (IOException e) { |
721 try { | 733 try { |
734 writerRunning.set(2); | |
722 os.close(); | 735 os.close(); |
723 } catch (IOException e1) { | 736 } catch (IOException e1) { |
724 } | 737 } |
725 /* if socket closed cliList.remove(newCli); */ | 738 /* if socket closed cliList.remove(newCli); */ |
726 } | 739 } |