Mercurial > hg > Database > Alice
view src/main/java/alice/datasegment/ReceiveData.java @ 527:bfec2c3ff1b8 dispose
change unzip
author | Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 30 Apr 2015 18:14:02 +0900 |
parents | 928907206d21 |
children | 6ebddfac7ff6 |
line wrap: on
line source
package alice.datasegment; import java.io.*; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.LinkedList; import java.util.zip.*; import alice.daemon.CommandMessage; import org.apache.log4j.Logger; import org.msgpack.MessagePack; import org.msgpack.type.Value; import alice.codesegment.SingletonMessage; import javax.xml.bind.DatatypeConverter; import java.io.ByteArrayInputStream; import static java.util.Arrays.*; /** * 送られてきたDSを一時的に取っておくクラス。inputでも使用。 */ public class ReceiveData { private Object val;//for Object DS private byte[] messagePack;//for byteArray(serialized) DS private byte[] zMessagePack;//for byteArray(compressed) DS private Class<?> clazz; public long time;//測定用 public boolean setTime = false; public int depth = 1; private Deflater deflater = new Deflater(); private Inflater inflater = new Inflater(); private ByteBuffer buf = null; private MessagePack msg = SingletonMessage.getInstance(); /** * コンストラクタ。Object型のDSと圧縮のメタ情報を受け取る。 * put/update/reply用? * @param obj DS本体(Object) */ public ReceiveData(Object obj) { clazz = obj.getClass(); val = obj; } /** * コンストラクタ。byteArray型のDSと圧縮のメタ情報を受け取り、byteArrayフラグを立てる。 * * @param messagePack DS本体(byteArray) */ public ReceiveData(byte[] messagePack) { clazz = messagePack.getClass(); if (messagePack[0] == 0xc1){ System.out.println("ReceiveData is zMessagePack"); this.zMessagePack = messagePack; } else { System.out.println("ReceiveData is MessagePack"); this.messagePack = messagePack; } } public boolean isByteArray(){ return messagePack != null | zMessagePack != null; } public boolean compressed(){ return zMessagePack != null; } public boolean serialized(){ return val == null; } public Object getObj(){ return asClass(Object.class); } public String asString(){ return asClass(String.class); } public int asInteger(){ return asClass(Integer.class); } public Float asFloat(){ return asClass(Float.class); } public Value getVal(){///get DS as Value type if (val == null){///val != null return asClass(Value.class); } else { try { return SingletonMessage.getInstance().unconvert(val);///convert to Value type by MassagePack } catch (IOException e) { e.printStackTrace(); } return null; } } /** * DSを任意の型で取得するメソッド。 * DSがbyteArrayでなければ指定された型に変換して返す。 * DSがbyteArrayなら解凍状態にして指定された型に変換して返す。 * * @param clazz * @param <T> * @return */ public <T> T asClass(Class<T> clazz) {///javasist System.out.println("in asClass val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); try { if (val != null) { return (T) val; } if (zMessagePack != null && messagePack == null) {//ToDo:fix messagePack = unzip(zMessagePack); System.out.println("unzip messagePack:" + messagePack); //zMessagePack = null;? } return SingletonMessage.getInstance().read(messagePack, clazz); } catch (IOException e) {// | DataFormatException e e.printStackTrace(); return null; } } public void setCompressFlag(boolean cFlag) {///compress LinkedList<ByteBuffer> input = new LinkedList<ByteBuffer>(); LinkedList<ByteBuffer> output = new LinkedList<ByteBuffer>(); if (cFlag){ System.out.println("in setCompressFlag val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); //messagePack = asByteArray();///ToDo:fix if (val != null){ try { messagePack = msg.write(val); } catch (IOException e) { e.printStackTrace(); } } else { messagePack = unzip(zMessagePack); } System.out.println("no zip messagePack:" + messagePack); System.out.print("no zip messagePack: "); for (int i = 0; i < messagePack.length; i++) { System.out.print(Integer.toHexString(messagePack[i] & 0xff)); } System.out.print("\n"); System.out.println("no zip messagePack length:" + messagePack.length); try { //System.out.println("in zip"); input.add(ByteBuffer.wrap(messagePack)); int len = zip(input, 0, output); byte[] ziped = new byte[len + 8]; ziped[0] = (byte) 0xc1;///set compressedFlag to header ziped[1] = ziped[2] = ziped[3] = (byte) 0x00; System.arraycopy(intToByteArray(messagePack.length), 0, ziped, 4, 4);///set data length to header System.out.println("zipedlen: " + len); //System.out.println("limit: " + output.get(0).limit()); //System.out.println("remaining: " + output.get(0).remaining()); int tmp = 0; for (int i = 0; i < output.size(); i++){///Is this copy OK??? System.arraycopy(output.get(i).array(), 0, ziped, 8 + tmp, output.get(i).limit());//limit? remaining? tmp += output.get(i).limit(); } System.out.print("ziped: "); for (int i = 0; i < ziped.length; i++) { System.out.print(Integer.toHexString(ziped[i] & 0xff)); } System.out.print("\n"); zMessagePack = ziped; val = null; messagePack = null; } catch (IOException e) { e.printStackTrace(); } } } public ByteBuffer setMPHeader(CommandMessage cm, CommandType type){ System.out.println("in setMPHeader val:" + val + ", MP:" + messagePack + ", zMP:" + zMessagePack); try { byte[] header = null;//DSのメタデータ用byteArray byte[] data = null;//DS本体用byteArray byte[] dataSize = null;//DSのサイズ if (type == CommandType.REPLY){ if (val != null) {//純粋なオブジェクトの場合シリアライズ data = msg.write(val); System.out.print("header MP data: "); for (int i = 0; i < data.length; i++) { System.out.print(Integer.toHexString(data[i] & 0xff)); } System.out.print("\n"); } else { // rData is RAW ByteArray or already serialized data = messagePack; } if (setTime) {//AliceVNCの計測用(消してもいい) cm.setTime = true; cm.time = time; cm.depth = depth + 1; } //MessagePackでDSを作成(ヘッダー・データ本体のサイズ・データ本体) header = msg.write(cm); dataSize = msg.write(data.length); buf = ByteBuffer.allocate(header.length+dataSize.length+data.length); buf.put(header); buf.put(dataSize); buf.put(data); } else { header = msg.write(cm); buf = ByteBuffer.allocate(header.length); buf.put(header); } buf.flip(); } catch (IOException e) { e.printStackTrace(); } messagePack = buf.array(); System.out.print("MP with header: "); for (int i = 0; i < messagePack.length; i++) { System.out.print(Integer.toHexString(messagePack[i] & 0xff)); } System.out.print("\n"); return buf; } public int zip(LinkedList<ByteBuffer> inputs, int inputIndex, LinkedList<ByteBuffer> outputs) throws IOException { int len = 0; int INFLATE_BUFSIZE = 1024 * 100; ByteBuffer c1 = allocate(INFLATE_BUFSIZE);//for output while (inputIndex < inputs.size()) { ByteBuffer b1 = inputs.get(inputIndex++); deflater.setInput(b1.array(), b1.position(), b1.remaining()); /** * If we finish() stream and reset() it, Deflater start new gzip * stream, this makes continuous zlib reader unhappy. if we remove * finish(), Deflater.deflate() never flushes its output. The * original zlib deflate has flush flag. I'm pretty sure this a kind * of bug of Java library. */ if (inputIndex == inputs.size()){ deflater.finish(); } int len1 = 0; do { len1 = deflater.deflate(c1.array(), c1.position(), c1.remaining());///Bytearray for ziped data、start offset、length if (len1 > 0) { len += len1; c1.position(c1.position() + len1); if (c1.remaining() == 0) { c1.flip(); outputs.addLast(c1); c1 = allocate(INFLATE_BUFSIZE); } } } while (len1 > 0 || !deflater.needsInput());//needsInput()...true if setInput is empty } if (c1.position() != 0) { c1.flip(); outputs.addLast(c1); } deflater.reset(); return len;///return length of ziped data } protected byte[] unzip(byte[] input) {///read header & unzip int length = input.length; int zippedLength = byteArrayToInt(copyOfRange(input, 4, 8));///read header...Is this copy OK?? byte [] output = new byte [zippedLength];///byteArray for unziped data inflater.setInput(input, 8, length - 8);///set unzip data without header try { System.out.println("return:" + inflater.inflate(output, 0, zippedLength));///unzip } catch (DataFormatException e) { e.printStackTrace(); } inflater.reset(); System.out.print("unziped: "); for (int i = 0; i < output.length; i++) { System.out.print(Integer.toHexString(output[i] & 0xff)); } System.out.print("\n"); return output; } public ByteBuffer allocate(int size) { ByteBuffer b = null; while(true){ try { b = ByteBuffer.allocate(size); } catch (OutOfMemoryError e) { b = null; System.err.println("multicastqueue : wait for heap : " + e); } if (b!=null) { break; } try { wait(); } catch (InterruptedException e) { System.out.println("thread has interrupted the current thread."); } } return b; } public byte[] asByteArray() throws IOException{///ToDo : delete ByteArrayOutputStream buff = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(buff); out.writeObject(this.val); byte[] bytes = buff.toByteArray(); out.close(); buff.close(); return bytes; } public static int byteArrayToInt(byte[] b) { return b[3] & 0xFF | (b[2] & 0xFF) << 8 | (b[1] & 0xFF) << 16 | (b[0] & 0xFF) << 24; } public static byte[] intToByteArray(int a) { return new byte[] { (byte) ((a >> 24) & 0xFF), (byte) ((a >> 16) & 0xFF), (byte) ((a >> 8) & 0xFF), (byte) (a & 0xFF) }; } }