Mercurial > hg > Database > Alice
annotate src/alice/daemon/IncomingTcpConnection.java @ 240:d9c9076d6b47
no use SEDA in IncomingTcpConnection
author | sugi |
---|---|
date | Tue, 09 Apr 2013 11:26:17 +0900 |
parents | 409d7679cf7b |
children | 88be2824a989 |
rev | line source |
---|---|
13 | 1 package alice.daemon; |
2 | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
3 import java.io.EOFException; |
13 | 4 import java.io.IOException; |
42 | 5 import java.nio.channels.ClosedChannelException; |
13 | 6 |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
7 import org.msgpack.unpacker.Unpacker; |
13 | 8 |
126 | 9 import alice.codesegment.SingletonMessage; |
13 | 10 import alice.datasegment.Command; |
11 import alice.datasegment.CommandType; | |
12 import alice.datasegment.DataSegment; | |
13 import alice.datasegment.DataSegmentKey; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
14 import alice.datasegment.DataSegmentManager; |
13 | 15 import alice.datasegment.LocalDataSegmentManager; |
16 | |
17 public class IncomingTcpConnection extends Thread { | |
18 | |
19 public Connection connection; | |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
20 public DataSegmentManager manager; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
21 public String reverseKey; |
127 | 22 private LocalDataSegmentManager lmanager = DataSegment.getLocal(); |
69 | 23 |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
24 public IncomingTcpConnection(Connection connection, DataSegmentManager manager, String reverseKey) { |
14
e3f1b21718b0
implements RemoteDataSegment
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
13
diff
changeset
|
25 this.manager = manager; |
13 | 26 this.connection = connection; |
28
98ab26e09a98
Configuration Manager work and implements reverseKey
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
17
diff
changeset
|
27 this.reverseKey = reverseKey; |
13 | 28 } |
29 | |
58 | 30 /** |
31 * pipeline thread for receiving | |
32 */ | |
13 | 33 public void run() { |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
34 Unpacker unpacker = null; |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
35 try { |
126 | 36 unpacker = SingletonMessage.getInstance().createUnpacker(connection.socket.getInputStream()); |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
37 } catch (IOException e2) { |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
38 e2.printStackTrace(); |
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
39 } |
13 | 40 while (true) { |
41 try { | |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
42 CommandMessage msg = unpacker.read(CommandMessage.class); |
13 | 43 CommandType type = CommandType.getCommandTypeFromId(msg.type); |
44 switch (type) { | |
45 case UPDATE: | |
240 | 46 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 47 break; |
48 case PUT: | |
240 | 49 getDataSegmentKey(msg).runCommand(new Command(type, null, null, msg.val, 0, 0, null, null, reverseKey)); |
13 | 50 break; |
51 case PEEK: | |
240 | 52 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); |
13 | 53 break; |
54 case TAKE: | |
240 | 55 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, msg.index, msg.seq, connection.sendQueue, null, null)); |
13 | 56 break; |
57 case REMOVE: | |
240 | 58 getDataSegmentKey(msg).runCommand(new Command(type, null, null, null, 0, 0, null, null, null)); |
13 | 59 break; |
60 case REPLY: | |
240 | 61 Command cmd = manager.getAndRemoveCmd(msg.seq); |
62 cmd.cs.ids.reply(cmd.receiver, new Command(type, null, null, msg.val, msg.index, msg.seq, null, null, null)); | |
63 cmd=null; | |
13 | 64 break; |
65 default: | |
66 break; | |
67 } | |
42 | 68 } catch (ClosedChannelException e) { |
69 | 69 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); |
42 | 70 return; |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
71 } catch (EOFException e) { |
69 | 72 connection.sendCommand(new Command(CommandType.CLOSE, null, null, null, 0, 0, null, null, null)); |
51
919389066887
change protocol header: remove an integer representing message length (work?)
kazz <kazz@cr.ie.u-ryukyu.ac.jp>
parents:
43
diff
changeset
|
73 return; |
13 | 74 } catch (IOException e) { |
75 e.printStackTrace(); | |
76 } | |
77 } | |
78 } | |
64 | 79 private DataSegmentKey getDataSegmentKey(CommandMessage msg) { |
80 return lmanager.getDataSegmentKey(msg.key); | |
81 } | |
13 | 82 } |