Mercurial > hg > Database > Christie
view src/main/java/christie/datagear/RemoteDataGearManager.java @ 210:9a1d9c99e708
fix Command run to execute
author | akahori |
---|---|
date | Mon, 11 Mar 2019 16:45:37 +0900 |
parents | 2ecb3a93b8ae |
children | e486c13d9ea9 |
line wrap: on
line source
package christie.datagear; import christie.codegear.CodeGearManager; import christie.daemon.Connection; import christie.daemon.IncomingTcpConnection; import christie.daemon.OutboundTcpConnection; import christie.datagear.command.*; import christie.datagear.dg.DataGear; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.util.concurrent.LinkedBlockingQueue; import static java.lang.Thread.MAX_PRIORITY; public class RemoteDataGearManager extends DataGearManager{ private Connection connection; private CodeGearManager cgm; boolean connect = false; Object lock = new Object(); public RemoteDataGearManager(final String dgmName, final String address, final int port, CodeGearManager cgm) { this.cgm = cgm; RemoteDataGearManager manager = this; new Thread("Connect-" + dgmName) { public void run() { do { try { SocketChannel sc = SocketChannel.open(new InetSocketAddress(address, port)); connection = new Connection(sc.socket(), cgm); connection.name = dgmName; connection.socket.setTcpNoDelay(true); synchronized (lock){ connect = true; lock.notify(); } } catch (IOException e) { try { Thread.sleep(50); } catch (InterruptedException e1) { e1.printStackTrace(); } } } while (!connect); IncomingTcpConnection in = new IncomingTcpConnection(connection); in.setManager(manager); in.setName(dgmName+"-IncomingTcp"); in.setPriority(MAX_PRIORITY); in.start(); OutboundTcpConnection out = new OutboundTcpConnection(connection); out.setName(dgmName + "-OutboundTcp"); out.setPriority(MAX_PRIORITY); out.start(); } }.start(); } @Override public void put(String key, Object data) { Command cm = new PutCommand(0, null, key, new DataGear(data)); // これ入れないと, connectionがnullの時があるのでしょうがなくwait. // コンストラクタで呼び出されるThreadをやめて実効すればいんだけどね... if(!connect) connectWait(); connection.write(cm); } @Override public void runCommand(Command cm) { waitList.add(cm); Command remoteCmd = null; switch (cm.type) { case PEEK: remoteCmd = new RemotePeekCommand(connection.name, cm, connection); break; case TAKE: remoteCmd = new RemoteTakeCommand(connection.name, cm, connection); break; } connection.write(remoteCmd); } @Override public void resolveWaitCommand(String key, DataGear dg) { Command cm = waitList.getAndRemoveCommand(key); cm.setDg(dg); cm.execute(); } @Override public void finish() { Command cmd = new FinishCommand(); connection.sendCommand(cmd); } @Override public void close() { Command cmd = new CloseCommand(); connection.sendCommand(cmd); } @Override public void shutdown() { connection.close(); LinkedBlockingQueue<Command> queue = connection.sendQueue; if (!queue.isEmpty()) queue.clear(); } public void connectWait(){ synchronized (lock){ while(!connect){ try { lock.wait(); } catch (InterruptedException e) { } } } } }