# HG changeset patch # User oshiro # Date 1549007156 -32400 # Node ID 57e0d052b126a64a0409de5f3aca7e082f4e2b4a # Parent 13b08ea73237bdc9c4868343ce177aaa3ce3b51e add blockedReadSendData diff -r 13b08ea73237 -r 57e0d052b126 src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java --- a/src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java Thu Jan 31 19:09:26 2019 +0900 +++ b/src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java Fri Feb 01 16:45:56 2019 +0900 @@ -63,6 +63,19 @@ tail.set(next); tail = next; } + + /** + * waitput item to the queue + * all client threads start read it + * @param item + */ + public synchronized void waitput(T item) throws InterruptedException + { + Node next = new Node(item); + tail.set(next); + tail = next; + wait(); //wait for send completion + } /** * register new clients. Clients read this queue, if all clients read the queue, item is removed diff -r 13b08ea73237 -r 57e0d052b126 src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java --- a/src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java Thu Jan 31 19:09:26 2019 +0900 +++ b/src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java Fri Feb 01 16:45:56 2019 +0900 @@ -72,6 +72,7 @@ private byte[] originalInitData = null; private boolean childrenMulticast = true; private static int uniqueNodeId = 0; // uniquenodeid in all trees (less than MAX_UNIQUE_NODE_ID) + private int deflate_size = 65507; public TreeRFBProto(boolean isTreeManager, ViewerInterface viewer) { nets.setMyRfb(this); @@ -683,8 +684,138 @@ if (addSerialNum) { addSerialNumber(bufs); } - multicastqueue.put(bufs); - } catch (IOException e) { + multicastqueue.waitput(bufs); + } catch (IOException | InterruptedException e) { + throw new TransportException(e); + } catch (DataFormatException e) { + throw new TransportException(e); + } + return; + } + + // ZRLEE is already compressed + bufs.add(header); + if (addSerialNum) { + this.addSerialNumber(bufs); + } + if (dataLen > headerLen) { + ByteBuffer b = multicastqueue.allocate(dataLen - headerLen); + reader.readBytes(b.array(), 0, dataLen - headerLen); + b.limit(dataLen - headerLen); + bufs.add(b); + } + multicastqueue.put(bufs); + + return; + } + // It may be compressed. We can inflate here to avoid repeating clients + // decompressing here, + // but it may generate too many large data. It is better to do it in + // each client. + // But we have do inflation for all input data, so we have to do it + // here. + } + + /** + * Multicast framebufferUpdate to children. + * read FrameBuffferUpdate. If it is ZLE, make it ZLEE which is self contained compressed packet. + * put the packet to the multicastqueue. Then normal rendering engine read the same stream using is.reset(). + * + * @param dataLen + * @param reader + * @throws TransportException + * @throws UnsupportedEncodingException + */ + public void blockedReadSendData(int dataLen, Reader reader, byte[] bytes, FramebufferUpdateRectangle rect) + throws TransportException, UnsupportedEncodingException { + LinkedList bufs = new LinkedList(); + int BLOCKSIZE = 64 * 1024; + int headerLen = rect.getEncodingType() == EncodingType.CHECK_DELAY ? 24 : 16; + ByteBuffer header = multicastqueue.allocate(headerLen); + ByteBuffer serial = multicastqueue.allocate(4 + 8); + if (!isTreeManager() && addSerialNum) { + reader.readBytes(serial.array(), 0, 4 + 8); + serial.limit(4 + 8); + } + reader.mark(dataLen); + reader.readBytes(header.array(), 0, headerLen); + header.limit(headerLen); + if (header.get(0) == FramebufferUpdate) { + int encoding = header.getInt(12); + + if (encoding == EncodingType.ZRLE.getId() + || encoding == EncodingType.ZLIB.getId()) { + // recompress into ZREE + // uncompressed result is remain in bytes + ByteBuffer len = multicastqueue.allocate(4); + reader.readBytes(len.array(), 0, 4); + len.limit(4); + ByteBuffer inputData = multicastqueue.allocate(dataLen - 20); + reader.readBytes(inputData.array(), 0, inputData.capacity()); + inputData.limit(dataLen - 20); + LinkedList inputs = new LinkedList(); + inputs.add(inputData); + header.putInt(12, EncodingType.ZRLEE.getId()); // means + // recompress + // every time + // using new Deflecter every time is incompatible with the + // protocol, clients have to be modified. + Deflater nDeflater = deflater; // new Deflater(); + LinkedList out = new LinkedList(); + try { + unzip(inflater, inputs, 0, bytes, INFLATE_BUFSIZE); + // dump32(inputs); + out.add(ByteBuffer.wrap(bytes)); + int len2 = 0; + int inputIndex = 0; + ByteBuffer c1 = multicastqueue.allocate(deflate_size); + while (inputIndex < inputs.size()) { + ByteBuffer b1 = inputs.get(inputIndex++); + deflater.setInput(b1.array(), b1.position(), b1.remaining()); + /** + * If we finish() stream and reset() it, Deflater start new gzip + * stream, this makes continuous zlib reader unhappy. if we remove + * finish(), Deflater.deflate() never flushes its output. The + * original zlib deflate has flush flag. I'm pretty sure this a kind + * of bug of Java library. + */ + if (inputIndex == inputs.size()) + deflater.finish(); + int len1 = 0; + do { + len1 = deflater.deflate(c1.array(), c1.position(), + c1.remaining()); + if (len1 > 0) { + len2 += len1; + c1.position(c1.position() + len1); + if (c1.remaining() == 0) { + c1.flip(); + bufs.addLast(c1); + c1 = multicastqueue.allocate(deflate_size); + } + } + } while (len1 > 0 || !deflater.needsInput()); // &&!deflater.finished()); + } + if (c1.position() != 0) { + c1.flip(); + bufs.addLast(c1); + } + deflater.reset(); + + ByteBuffer blen = multicastqueue.allocate(4); + blen.putInt(len2); + blen.flip(); + bufs.addFirst(blen); + if (checkDelay) { + bufs = createCheckDelayHeader(bufs, header); + } else { + bufs.addFirst(header); + } + if (addSerialNum) { + addSerialNumber(bufs); + } + multicastqueue.waitput(bufs); + } catch (InterruptedException e) { throw new TransportException(e); } catch (DataFormatException e) { throw new TransportException(e);