Mercurial > hg > Database > Alice
diff src/main/java/alice/daemon/IncomingTcpConnection.java @ 419:aefbe41fcf12 dispose
change tab to space
author | sugi |
---|---|
date | Tue, 15 Jul 2014 16:00:22 +0900 |
parents | aadea6a59376 |
children | 2f2623484b77 |
line wrap: on
line diff
--- a/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Jul 15 06:15:53 2014 +0900 +++ b/src/main/java/alice/daemon/IncomingTcpConnection.java Tue Jul 15 16:00:22 2014 +0900 @@ -15,89 +15,89 @@ import alice.topology.manager.keeparive.RespondData; public class IncomingTcpConnection extends Thread { - - public Connection connection; - public DataSegmentManager manager; - public String reverseKey; - private LocalDataSegmentManager lmanager = DataSegment.getLocal(); + + public Connection connection; + public DataSegmentManager manager; + public String reverseKey; + private LocalDataSegmentManager lmanager = DataSegment.getLocal(); + + public IncomingTcpConnection(DataSegmentManager manager) { + this.manager = manager; + } + + public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { + this.manager = manager; + this.connection = connection; + this.reverseKey = reverseKey; + } + + public LocalDataSegmentManager getLocalDataSegmentManager(){ + return lmanager; + } - public IncomingTcpConnection(DataSegmentManager manager) { - this.manager = manager; - } - - public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { - this.manager = manager; - this.connection = connection; - this.reverseKey = reverseKey; - } - - public LocalDataSegmentManager getLocalDataSegmentManager(){ - return lmanager; - } - - /** - * pipeline thread for receiving - */ - public void run() { - Unpacker unpacker = null; - try { - unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); - } catch (IOException e) { - e.printStackTrace(); - } - if (unpacker == null) { - return; - } - while (true) { - try { - CommandMessage msg = unpacker.read(CommandMessage.class); - CommandType type = CommandType.getCommandTypeFromId(msg.type); - switch (type) { - case UPDATE: - lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); - break; - case PUT: - lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); - break; - case PEEK: - lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); - break; - case TAKE: - lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); - break; - case REMOVE: - lmanager.getDataSegmentKey(msg.key) - .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); - break; - case REPLY: - Command cmd = manager.getAndRemoveCmd(msg.seq); - cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); - cmd=null; - break; - case PING: - DataSegment.get(reverseKey).response(msg.key); - break; - case RESPONSE: - DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); - break; - default: - break; - } - } catch (ClosedChannelException e) { - connection.putConnectionInfo(); - connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); - return; - } catch (EOFException e) { - connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); - return; - } catch (IOException e) { - e.printStackTrace(); - } - } - } + /** + * pipeline thread for receiving + */ + public void run() { + Unpacker unpacker = null; + try { + unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); + } catch (IOException e) { + e.printStackTrace(); + } + if (unpacker == null) { + return; + } + while (true) { + try { + CommandMessage msg = unpacker.read(CommandMessage.class); + CommandType type = CommandType.getCommandTypeFromId(msg.type); + switch (type) { + case UPDATE: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + break; + case PUT: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); + break; + case PEEK: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + break; + case TAKE: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection, null, null, msg.flag)); + break; + case REMOVE: + lmanager.getDataSegmentKey(msg.key) + .runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); + break; + case REPLY: + Command cmd = manager.getAndRemoveCmd(msg.seq); + cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); + cmd=null; + break; + case PING: + DataSegment.get(reverseKey).response(msg.key); + break; + case RESPONSE: + DataSegment.getLocal().put(msg.key, new RespondData(reverseKey, System.currentTimeMillis())); + break; + default: + break; + } + } catch (ClosedChannelException e) { + connection.putConnectionInfo(); + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + return; + } catch (EOFException e) { + connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); + return; + } catch (IOException e) { + e.printStackTrace(); + } + } + } }