changeset 492:57e0d052b126

add blockedReadSendData
author oshiro
date Fri, 01 Feb 2019 16:45:56 +0900
parents 13b08ea73237
children 6f21b6176984
files src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java
diffstat 2 files changed, 146 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java	Thu Jan 31 19:09:26 2019 +0900
+++ b/src/main/java/jp/ac/u_ryukyu/treevnc/MulticastQueue.java	Fri Feb 01 16:45:56 2019 +0900
@@ -63,6 +63,19 @@
 		tail.set(next);
 		tail = next;
 	}
+
+	/**
+	 * waitput item to the queue
+	 *   all client threads start read it
+	 * @param item
+	 */
+	public synchronized void waitput(T item) throws InterruptedException
+	{
+		Node<T> next = new Node<T>(item);
+		tail.set(next);
+		tail = next;
+		wait(); //wait for send completion
+	}
 	
 	/**
 	 * register new clients. Clients read this queue, if all clients read the queue, item is removed
--- a/src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java	Thu Jan 31 19:09:26 2019 +0900
+++ b/src/main/java/jp/ac/u_ryukyu/treevnc/TreeRFBProto.java	Fri Feb 01 16:45:56 2019 +0900
@@ -72,6 +72,7 @@
     private byte[] originalInitData = null;
     private boolean childrenMulticast = true;
     private static int uniqueNodeId = 0; // uniquenodeid in all trees (less than MAX_UNIQUE_NODE_ID)
+    private int deflate_size = 65507;
 
     public TreeRFBProto(boolean isTreeManager, ViewerInterface viewer) {
         nets.setMyRfb(this);
@@ -683,8 +684,138 @@
                     if (addSerialNum) {
                         addSerialNumber(bufs);
                     }
-                    multicastqueue.put(bufs);
-                } catch (IOException e) {
+                    multicastqueue.waitput(bufs);
+                } catch (IOException | InterruptedException e) {
+                    throw new TransportException(e);
+                } catch (DataFormatException e) {
+                    throw new TransportException(e);
+                }
+                return;
+            }
+
+            //    ZRLEE is already compressed
+            bufs.add(header);
+            if (addSerialNum) {
+                this.addSerialNumber(bufs);
+            }
+            if (dataLen > headerLen) {
+                ByteBuffer b = multicastqueue.allocate(dataLen - headerLen);
+                reader.readBytes(b.array(), 0, dataLen - headerLen);
+                b.limit(dataLen - headerLen);
+                bufs.add(b);
+            }
+            multicastqueue.put(bufs);
+
+            return;
+        }
+        // It may be compressed. We can inflate here to avoid repeating clients
+        // decompressing here,
+        // but it may generate too many large data. It is better to do it in
+        // each client.
+        // But we have do inflation for all input data, so we have to do it
+        // here.
+    }
+
+    /**
+     * Multicast framebufferUpdate to children.
+     * read FrameBuffferUpdate. If it is ZLE, make it ZLEE which is self contained compressed packet.
+     * put the packet to the multicastqueue. Then normal rendering engine read the same stream using is.reset().
+     *
+     * @param dataLen
+     * @param reader
+     * @throws TransportException
+     * @throws UnsupportedEncodingException
+     */
+    public void blockedReadSendData(int dataLen, Reader reader, byte[] bytes, FramebufferUpdateRectangle rect)
+            throws TransportException, UnsupportedEncodingException {
+        LinkedList<ByteBuffer> bufs = new LinkedList<ByteBuffer>();
+        int BLOCKSIZE = 64 * 1024;
+        int headerLen = rect.getEncodingType() == EncodingType.CHECK_DELAY ? 24 : 16;
+        ByteBuffer header = multicastqueue.allocate(headerLen);
+        ByteBuffer serial = multicastqueue.allocate(4 + 8);
+        if (!isTreeManager() && addSerialNum) {
+            reader.readBytes(serial.array(), 0, 4 + 8);
+            serial.limit(4 + 8);
+        }
+        reader.mark(dataLen);
+        reader.readBytes(header.array(), 0, headerLen);
+        header.limit(headerLen);
+        if (header.get(0) == FramebufferUpdate) {
+            int encoding = header.getInt(12);
+
+            if (encoding == EncodingType.ZRLE.getId()
+                    || encoding == EncodingType.ZLIB.getId()) {
+                // recompress into ZREE
+                // uncompressed result is remain in bytes
+                ByteBuffer len = multicastqueue.allocate(4);
+                reader.readBytes(len.array(), 0, 4);
+                len.limit(4);
+                ByteBuffer inputData = multicastqueue.allocate(dataLen - 20);
+                reader.readBytes(inputData.array(), 0, inputData.capacity());
+                inputData.limit(dataLen - 20);
+                LinkedList<ByteBuffer> inputs = new LinkedList<ByteBuffer>();
+                inputs.add(inputData);
+                header.putInt(12, EncodingType.ZRLEE.getId()); // means
+                // recompress
+                // every time
+                // using new Deflecter every time is incompatible with the
+                // protocol, clients have to be modified.
+                Deflater nDeflater = deflater; // new Deflater();
+                LinkedList<ByteBuffer> out = new LinkedList<ByteBuffer>();
+                try {
+                    unzip(inflater, inputs, 0, bytes, INFLATE_BUFSIZE);
+                    // dump32(inputs);
+                    out.add(ByteBuffer.wrap(bytes));
+                    int len2 = 0;
+                    int inputIndex = 0;
+                    ByteBuffer c1 = multicastqueue.allocate(deflate_size);
+                    while (inputIndex < inputs.size()) {
+                        ByteBuffer b1 = inputs.get(inputIndex++);
+                        deflater.setInput(b1.array(), b1.position(), b1.remaining());
+                        /**
+                         * If we finish() stream and reset() it, Deflater start new gzip
+                         * stream, this makes continuous zlib reader unhappy. if we remove
+                         * finish(), Deflater.deflate() never flushes its output. The
+                         * original zlib deflate has flush flag. I'm pretty sure this a kind
+                         * of bug of Java library.
+                         */
+                        if (inputIndex == inputs.size())
+                            deflater.finish();
+                        int len1 = 0;
+                        do {
+                            len1 = deflater.deflate(c1.array(), c1.position(),
+                                    c1.remaining());
+                            if (len1 > 0) {
+                                len2 += len1;
+                                c1.position(c1.position() + len1);
+                                if (c1.remaining() == 0) {
+                                    c1.flip();
+                                    bufs.addLast(c1);
+                                    c1 = multicastqueue.allocate(deflate_size);
+                                }
+                            }
+                        } while (len1 > 0 || !deflater.needsInput()); // &&!deflater.finished());
+                    }
+                    if (c1.position() != 0) {
+                        c1.flip();
+                        bufs.addLast(c1);
+                    }
+                    deflater.reset();
+
+                    ByteBuffer blen = multicastqueue.allocate(4);
+                    blen.putInt(len2);
+                    blen.flip();
+                    bufs.addFirst(blen);
+                    if (checkDelay) {
+                        bufs = createCheckDelayHeader(bufs, header);
+                    } else {
+                        bufs.addFirst(header);
+                    }
+                    if (addSerialNum) {
+                        addSerialNumber(bufs);
+                    }
+                    multicastqueue.waitput(bufs);
+                } catch (InterruptedException e) {
                     throw new TransportException(e);
                 } catch (DataFormatException e) {
                     throw new TransportException(e);