view src/main/java/alice/datasegment/ReceiveData.java @ 527:bfec2c3ff1b8 dispose

change unzip
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Thu, 30 Apr 2015 18:14:02 +0900
parents 928907206d21
children 6ebddfac7ff6
line wrap: on
line source

package alice.datasegment;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.*;

import alice.daemon.CommandMessage;
import org.apache.log4j.Logger;
import org.msgpack.MessagePack;
import org.msgpack.type.Value;
import alice.codesegment.SingletonMessage;

import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;

import static java.util.Arrays.*;

/**
 * 送られてきたDSを一時的に取っておくクラス。inputでも使用。
 */
public class ReceiveData {
    private Object val;//for Object DS
    private byte[] messagePack;//for byteArray(serialized) DS
    private byte[] zMessagePack;//for byteArray(compressed) DS
    private Class<?> clazz;

    public long time;//測定用
    public boolean setTime = false;
    public int depth = 1;

    private Deflater deflater = new Deflater();
    private Inflater inflater = new Inflater();
    private ByteBuffer buf = null;
    private MessagePack msg = SingletonMessage.getInstance();

    /**
     * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。
     * put/update/reply用?
     * @param obj DS本体(Object)
     */
    public ReceiveData(Object obj) {
        clazz = obj.getClass();
        val = obj;
    }

    /**
     * コンストラクタ。byteArray型のDSと圧縮のメタ情報を受け取り、byteArrayフラグを立てる。
     *
     * @param messagePack DS本体(byteArray)
     */
    public ReceiveData(byte[] messagePack) {
        clazz = messagePack.getClass();

        if (messagePack[0] == 0xc1){
            System.out.println("ReceiveData is zMessagePack");
            this.zMessagePack = messagePack;
        } else {
            System.out.println("ReceiveData is MessagePack");
            this.messagePack = messagePack;
        }
    }


    public boolean isByteArray(){
        return messagePack != null | zMessagePack != null;
    }

    public boolean compressed(){
        return zMessagePack != null;
    }

    public boolean serialized(){
        return val == null;
    }

    public Object getObj(){
        return asClass(Object.class);
    }

    public String asString(){
        return asClass(String.class);
    }

    public int asInteger(){
        return asClass(Integer.class);
    }

    public Float asFloat(){
        return asClass(Float.class);
    }

    public Value getVal(){///get DS as Value type
        if (val == null){///val != null
            return asClass(Value.class);
        } else {
            try {
                return SingletonMessage.getInstance().unconvert(val);///convert to Value type by MassagePack
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    /**
     * DSを任意の型で取得するメソッド。
     * DSがbyteArrayでなければ指定された型に変換して返す。
     * DSがbyteArrayなら解凍状態にして指定された型に変換して返す。
     *
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T asClass(Class<T> clazz) {///javasist
        System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);

        try {
            if (val != null) {
                return (T) val;
            }

            if (zMessagePack != null && messagePack == null) {//ToDo:fix
                messagePack = unzip(zMessagePack);
                System.out.println("unzip messagePack:" + messagePack);
                //zMessagePack = null;?
            }

            return SingletonMessage.getInstance().read(messagePack, clazz);

        } catch (IOException e) {// | DataFormatException e
            e.printStackTrace();
            return null;
        }
    }

    public void setCompressFlag(boolean cFlag) {///compress
        LinkedList<ByteBuffer> input = new LinkedList<ByteBuffer>();
        LinkedList<ByteBuffer> output = new LinkedList<ByteBuffer>();

        if (cFlag){
            System.out.println("in setCompressFlag  val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);
            //messagePack =  asByteArray();///ToDo:fix
            if (val != null){
                try {
                    messagePack = msg.write(val);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } else {
                messagePack = unzip(zMessagePack);
            }
            System.out.println("no zip messagePack:" + messagePack);
            System.out.print("no zip messagePack: ");
            for (int i = 0; i < messagePack.length; i++) {
                System.out.print(Integer.toHexString(messagePack[i] & 0xff));
            }
            System.out.print("\n");
            System.out.println("no zip messagePack length:" + messagePack.length);

            try {
                //System.out.println("in zip");
                input.add(ByteBuffer.wrap(messagePack));
                int len = zip(input, 0, output);

                byte[] ziped = new byte[len + 8];
                ziped[0] = (byte) 0xc1;///set compressedFlag to header
                ziped[1] = ziped[2] = ziped[3] = (byte) 0x00;
                System.arraycopy(intToByteArray(messagePack.length), 0, ziped, 4, 4);///set data length to header

                System.out.println("zipedlen: " + len);
                //System.out.println("limit: " + output.get(0).limit());
                //System.out.println("remaining: " + output.get(0).remaining());
                int tmp = 0;
                for (int i = 0; i < output.size(); i++){///Is this copy OK???
                    System.arraycopy(output.get(i).array(), 0, ziped, 8 + tmp, output.get(i).limit());//limit? remaining?
                    tmp += output.get(i).limit();
                }

                System.out.print("ziped: ");
                for (int i = 0; i < ziped.length; i++) {
                    System.out.print(Integer.toHexString(ziped[i] & 0xff));
                }
                System.out.print("\n");

                zMessagePack = ziped;
                val = null;
                messagePack = null;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public ByteBuffer setMPHeader(CommandMessage cm, CommandType type){

        System.out.println("in setMPHeader  val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack);

        try {
            byte[] header = null;//DSのメタデータ用byteArray
            byte[] data = null;//DS本体用byteArray
            byte[] dataSize = null;//DSのサイズ

            if (type == CommandType.REPLY){
                if (val != null) {//純粋なオブジェクトの場合シリアライズ
                    data = msg.write(val);
                    System.out.print("header MP data: ");
                    for (int i = 0; i < data.length; i++) {
                        System.out.print(Integer.toHexString(data[i] & 0xff));
                    }
                    System.out.print("\n");
                } else { // rData is RAW ByteArray or already serialized
                    data = messagePack;
                }

                if (setTime) {//AliceVNCの計測用(消してもいい)
                    cm.setTime = true;
                    cm.time = time;
                    cm.depth = depth + 1;
                }

                //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体)
                header = msg.write(cm);
                dataSize = msg.write(data.length);
                buf = ByteBuffer.allocate(header.length+dataSize.length+data.length);
                buf.put(header);
                buf.put(dataSize);
                buf.put(data);
            } else {
                header = msg.write(cm);
                buf = ByteBuffer.allocate(header.length);
                buf.put(header);
            }

            buf.flip();
        } catch (IOException e) {
            e.printStackTrace();
        }

        messagePack = buf.array();

        System.out.print("MP with header: ");
        for (int i = 0; i < messagePack.length; i++) {
            System.out.print(Integer.toHexString(messagePack[i] & 0xff));
        }
        System.out.print("\n");

        return buf;
    }


    public int zip(LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException {
        int len = 0;
        int INFLATE_BUFSIZE = 1024 * 100;
        ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output

        while (inputIndex < inputs.size()) {
            ByteBuffer b1 = inputs.get(inputIndex++);
            deflater.setInput(b1.array(), b1.position(), b1.remaining());
            /**
             * If we finish() stream and reset() it, Deflater start new gzip
             * stream, this makes continuous zlib reader unhappy. if we remove
             * finish(), Deflater.deflate() never flushes its output. The
             * original zlib deflate has flush flag. I'm pretty sure this a kind
             * of bug of Java library.
             */
            if (inputIndex == inputs.size()){
                deflater.finish();
            }

            int len1 = 0;
            do {
                len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());///Bytearray for ziped data、start offset、length
                if (len1 > 0) {
                    len += len1;
                    c1.position(c1.position() + len1);
                    if (c1.remaining() == 0) {
                        c1.flip();
                        outputs.addLast(c1);
                        c1 = allocate(INFLATE_BUFSIZE);
                    }
                }
            } while (len1 > 0 || !deflater.needsInput());//needsInput()...true if setInput is empty
        }
        if (c1.position() != 0) {
            c1.flip();
            outputs.addLast(c1);
        }
        deflater.reset();
        return len;///return length of ziped data
    }

    protected byte[] unzip(byte[] input) {///read header & unzip
        int length = input.length;
        int zippedLength = byteArrayToInt(copyOfRange(input, 4, 8));///read header...Is this copy OK??

        byte [] output = new byte [zippedLength];///byteArray for unziped data
        inflater.setInput(input, 8, length - 8);///set unzip data without header

        try {
            System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip
        } catch (DataFormatException e) {
            e.printStackTrace();
        }

        inflater.reset();

        System.out.print("unziped: ");
        for (int i = 0; i < output.length; i++) {
            System.out.print(Integer.toHexString(output[i] & 0xff));
        }
        System.out.print("\n");

        return output;
 	}


    public ByteBuffer allocate(int size)
    {
        ByteBuffer b = null;
        while(true){
            try {
                b = ByteBuffer.allocate(size);
            } catch (OutOfMemoryError e) {
                b = null;
                System.err.println("multicastqueue : wait for heap : " + e);
            }
            if (b!=null) {
                break;
            }
            try {
                wait();
            } catch (InterruptedException e) {
                System.out.println("thread has interrupted the current thread.");
            }
        }
        return b;
    }

    public byte[] asByteArray() throws IOException{///ToDo : delete
        ByteArrayOutputStream buff = new ByteArrayOutputStream();
        ObjectOutput out = new ObjectOutputStream(buff);
        out.writeObject(this.val);
        byte[] bytes = buff.toByteArray();
        out.close();
        buff.close();
        return bytes;
    }

    public static int byteArrayToInt(byte[] b)
    {
        return   b[3] & 0xFF |
                (b[2] & 0xFF) << 8 |
                (b[1] & 0xFF) << 16 |
                (b[0] & 0xFF) << 24;
    }

    public static byte[] intToByteArray(int a)
    {
        return new byte[] {
                (byte) ((a >> 24) & 0xFF),
                (byte) ((a >> 16) & 0xFF),
                (byte) ((a >> 8) & 0xFF),
                (byte) (a & 0xFF)
        };
    }

}