154
|
1 package test.channeltest;
|
|
2
|
|
3 import java.io.IOException;
|
157
|
4 import java.net.SocketAddress;
|
154
|
5 import java.util.LinkedList;
|
157
|
6 import java.net.InetSocketAddress;
|
159
|
7 import java.nio.channels.SelectionKey;
|
|
8 import java.nio.channels.Selector;
|
154
|
9
|
157
|
10 import rep.channel.REPLogger;
|
205
|
11 import rep.channel.REPPack;
|
|
12 import rep.channel.REPSelectionKey;
|
159
|
13 import rep.channel.REPSelector;
|
|
14 import rep.channel.REPServerSocketChannel;
|
|
15 import rep.channel.REPSocketChannel;
|
154
|
16
|
|
17 public class testSeMaSlave extends Thread{
|
|
18
|
157
|
19 SocketAddress ownIP;
|
|
20 SocketAddress masterIP;
|
154
|
21 boolean running=true;
|
161
|
22 REPLogger ns;
|
154
|
23 LinkedList<ClientInfo> cis;
|
|
24
|
157
|
25 public testSeMaSlave(String name, String oname,int oport, String mname,int mport){
|
154
|
26 super(name);
|
157
|
27 ownIP = new InetSocketAddress(oname,oport);
|
|
28 masterIP = new InetSocketAddress(mname,mport);
|
154
|
29 cis = new LinkedList<ClientInfo>();
|
174
|
30 ns = REPLogger.singleton();
|
154
|
31 }
|
|
32 public void init(){
|
|
33
|
|
34 }
|
|
35
|
|
36 public void run() {
|
205
|
37 REPSelector<String> selector;
|
159
|
38 REPSocketChannel<String> masterCH ;
|
|
39 try {
|
205
|
40 REPPack<String> pack = new StringPacker();
|
|
41 REPServerSocketChannel<String> scs = REPServerSocketChannel.<String>open(pack);
|
159
|
42 selector = REPSelector.create();
|
|
43 masterCH = connectToMaster(selector);
|
|
44 scs.socket().bind(ownIP);
|
206
|
45 scs.configureBlocking(false);
|
180
|
46 scs.register(selector, SelectionKey.OP_ACCEPT,null);
|
206
|
47 masterCH.configureBlocking(false);
|
180
|
48 masterCH.register(selector, SelectionKey.OP_READ,null);
|
154
|
49
|
159
|
50
|
208
|
51 ns.writeLog("Slave SessionManager starts main routin.", 1);
|
154
|
52
|
159
|
53 /* Main Loop */
|
|
54 while(running){
|
154
|
55
|
248
|
56 selector.select();
|
154
|
57
|
205
|
58 for(REPSelectionKey<String> key : selector.selectedKeys1()){
|
154
|
59
|
159
|
60 if(key.isAcceptable()){
|
205
|
61 REPSocketChannel<String> channel = key.accept(pack);
|
|
62 if(channel==null) continue;
|
|
63 channel.configureBlocking(false);
|
248
|
64 //selector.register(channel, SelectionKey.OP_READ,null);
|
|
65 channel.register(selector, SelectionKey.OP_READ, null);
|
174
|
66 ns.writeLog("accepts a client.", 1);
|
159
|
67
|
|
68 }else if(key.isReadable()){
|
268
|
69 try {
|
308
|
70 REPSocketChannel<String> channel = key.channel1();
|
268
|
71 String packet = channel.read();
|
|
72 if (packet==null) continue;
|
|
73 //if (channel==masterCH){
|
|
74 if (packet.matches("^SeMa\\d.*")){
|
|
75 ns.writeLog("receives String from master ==> `"+packet+"\'", 1);
|
|
76 for (ClientInfo ci: cis){
|
|
77 if (!packet.matches(".*"+ci.str+".*")) continue;
|
|
78 ci.channel.write(this.getName()+": loopback master ==>`"+packet+"\'");
|
|
79 }
|
|
80 }else{
|
|
81 ns.writeLog("receives String==> `"+packet+"\'", 1);
|
|
82 //channlel.write("from "+this.getName()+": save");
|
|
83 masterCH.write(this.getName()+": pass packet`"+packet+"\'");
|
|
84 cis.add(new ClientInfo(packet, channel));
|
159
|
85 }
|
268
|
86 }catch (IOException e1) {
|
|
87 ns.writeLog("channel "+ns+" closed.");
|
|
88 key.cancel();
|
154
|
89 }
|
|
90 }
|
|
91 }
|
|
92 }
|
159
|
93 } catch (IOException e1) {
|
|
94 e1.printStackTrace();
|
154
|
95 }
|
159
|
96
|
154
|
97 }
|
159
|
98
|
|
99 private REPSocketChannel<String> connectToMaster(Selector _selector) throws IOException {
|
187
|
100 REPSocketChannel<String> channel = REPSocketChannel.<String>create(new StringPacker());
|
174
|
101 ns.writeLog("is connecting to masterSeMa whose ip is"+masterIP, 1);
|
154
|
102 while(!channel.connect(masterIP)){
|
174
|
103 ns.writeLog("SeMa not listen to socket yet, wait", 1);
|
154
|
104 Thread.yield();
|
|
105 }
|
174
|
106 ns.writeLog("connecting was successful.", 1);
|
154
|
107
|
|
108 return channel;
|
|
109 }
|
|
110 }
|
|
111 class ClientInfo{
|
|
112 String str;
|
159
|
113 REPSocketChannel<String> channel;
|
|
114 ClientInfo(String _str, REPSocketChannel<String> _channel){
|
154
|
115 str = _str;
|
|
116 channel = _channel;
|
|
117 }
|
|
118 } |