view src/main/java/alice/datasegment/DataSegmentKey.java @ 547:e91a574b69de dispose

remove index
author Nozomi Teruya <e125769@ie.u-ryukyu.ac.jp>
date Tue, 18 Aug 2015 16:15:17 +0900
parents f3f7e256ee03
children
line wrap: on
line source

package alice.datasegment;

import java.util.ArrayList;
import java.util.Iterator;

/**
 * ここがコマンドの中身部分
 *
 * Synchronized DataSegment for each DataSegment key
 * @author kazz
 *
 */
public class DataSegmentKey {

    private ArrayList<DataSegmentValue> dataList = new ArrayList<DataSegmentValue>();
    private ArrayList<Command> waitList = new ArrayList<Command>();
    private int tailIndex = 1;//keyに対して存在するDSの数?

    public synchronized void runCommand(Command cmd) {
        switch (cmd.type) {
        case UPDATE:
            if (dataList.size() != 0) {
                dataList.remove(0);
            }
        case PUT:
            int index = tailIndex;
            tailIndex++;
            DataSegmentValue dsv = new DataSegmentValue(index, cmd.rData, cmd.reverseKey);
            dataList.add(dsv);
            // Process waiting peek and take commands
            for (Iterator<Command> iter = waitList.iterator(); iter.hasNext(); ) {
                Command waitCmd = iter.next();

                replyValue(waitCmd, dsv, cmd.getCompressFlag());
                iter.remove();
                if (waitCmd.type == CommandType.TAKE) { // someone is waiting for this put or update command
                    dataList.remove(dsv);
                    break;
                }

            }
            break;
        case PEEK:
            if (dataList.isEmpty()) {
                waitList.add(cmd);
                break;
            }

            replyValue(cmd, dataList.get(0), cmd.getCompressFlag());

            break;
        case TAKE:
            if (dataList.isEmpty()) {
                waitList.add(cmd);
                break;
            }
            replyValue(cmd, dataList.remove(0), cmd.getCompressFlag());
            break;
        case REMOVE:
            // TODO: implements later
            break;
        default:
        }

    }

    public void replyValue(Command cmd, DataSegmentValue data, boolean cFlag){
        Command rCmd = new Command(CommandType.REPLY, null, null, data.rData, cmd.seq, null, null, data.from);
        rCmd.setCompressFlag(cFlag);

        if (cmd.cs!=null){ // if cmd has cs-instance, it means Command from local.
            cmd.cs.ids.reply(cmd.receiver, rCmd);
        } else {
            try {
                if (!cmd.getQuickFlag()) {
                    cmd.connection.sendQueue.put(rCmd);
                } else {
                    cmd.connection.write(rCmd);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}