154
|
1 package test.channeltest;
|
|
2
|
|
3 import java.io.IOException;
|
157
|
4 import java.net.SocketAddress;
|
154
|
5 import java.util.LinkedList;
|
157
|
6 import java.net.InetSocketAddress;
|
159
|
7 import java.nio.channels.SelectionKey;
|
|
8 import java.nio.channels.Selector;
|
154
|
9
|
157
|
10 import rep.channel.REPLogger;
|
205
|
11 import rep.channel.REPPack;
|
|
12 import rep.channel.REPSelectionKey;
|
159
|
13 import rep.channel.REPSelector;
|
|
14 import rep.channel.REPServerSocketChannel;
|
|
15 import rep.channel.REPSocketChannel;
|
154
|
16
|
|
17 public class testSeMaSlave extends Thread{
|
|
18
|
157
|
19 SocketAddress ownIP;
|
|
20 SocketAddress masterIP;
|
154
|
21 boolean running=true;
|
161
|
22 REPLogger ns;
|
154
|
23 LinkedList<ClientInfo> cis;
|
|
24
|
157
|
25 public testSeMaSlave(String name, String oname,int oport, String mname,int mport){
|
154
|
26 super(name);
|
157
|
27 ownIP = new InetSocketAddress(oname,oport);
|
|
28 masterIP = new InetSocketAddress(mname,mport);
|
154
|
29 cis = new LinkedList<ClientInfo>();
|
174
|
30 ns = REPLogger.singleton();
|
154
|
31 }
|
|
32 public void init(){
|
|
33
|
|
34 }
|
|
35
|
205
|
36 @SuppressWarnings("unchecked")
|
154
|
37 public void run() {
|
205
|
38 REPSelector<String> selector;
|
159
|
39 REPSocketChannel<String> masterCH ;
|
|
40 try {
|
205
|
41 REPPack<String> pack = new StringPacker();
|
|
42 REPServerSocketChannel<String> scs = REPServerSocketChannel.<String>open(pack);
|
159
|
43 selector = REPSelector.create();
|
|
44 masterCH = connectToMaster(selector);
|
|
45 scs.socket().bind(ownIP);
|
206
|
46 scs.configureBlocking(false);
|
180
|
47 scs.register(selector, SelectionKey.OP_ACCEPT,null);
|
206
|
48 masterCH.configureBlocking(false);
|
180
|
49 masterCH.register(selector, SelectionKey.OP_READ,null);
|
154
|
50
|
159
|
51
|
208
|
52 ns.writeLog("Slave SessionManager starts main routin.", 1);
|
154
|
53
|
159
|
54 /* Main Loop */
|
|
55 while(running){
|
154
|
56
|
248
|
57 selector.select();
|
154
|
58
|
205
|
59 for(REPSelectionKey<String> key : selector.selectedKeys1()){
|
154
|
60
|
159
|
61 if(key.isAcceptable()){
|
205
|
62 REPSocketChannel<String> channel = key.accept(pack);
|
|
63 if(channel==null) continue;
|
|
64 channel.configureBlocking(false);
|
248
|
65 //selector.register(channel, SelectionKey.OP_READ,null);
|
|
66 channel.register(selector, SelectionKey.OP_READ, null);
|
174
|
67 ns.writeLog("accepts a client.", 1);
|
159
|
68
|
|
69 }else if(key.isReadable()){
|
268
|
70 try {
|
273
|
71 REPSocketChannel<String> channel = (REPSocketChannel<String>) key.channel();
|
268
|
72 String packet = channel.read();
|
|
73 if (packet==null) continue;
|
|
74 //if (channel==masterCH){
|
|
75 if (packet.matches("^SeMa\\d.*")){
|
|
76 ns.writeLog("receives String from master ==> `"+packet+"\'", 1);
|
|
77 for (ClientInfo ci: cis){
|
|
78 if (!packet.matches(".*"+ci.str+".*")) continue;
|
|
79 ci.channel.write(this.getName()+": loopback master ==>`"+packet+"\'");
|
|
80 }
|
|
81 }else{
|
|
82 ns.writeLog("receives String==> `"+packet+"\'", 1);
|
|
83 //channlel.write("from "+this.getName()+": save");
|
|
84 masterCH.write(this.getName()+": pass packet`"+packet+"\'");
|
|
85 cis.add(new ClientInfo(packet, channel));
|
159
|
86 }
|
268
|
87 }catch (IOException e1) {
|
|
88 ns.writeLog("channel "+ns+" closed.");
|
|
89 key.cancel();
|
154
|
90 }
|
|
91 }
|
|
92 }
|
|
93 }
|
159
|
94 } catch (IOException e1) {
|
|
95 e1.printStackTrace();
|
154
|
96 }
|
159
|
97
|
154
|
98 }
|
159
|
99
|
|
100 private REPSocketChannel<String> connectToMaster(Selector _selector) throws IOException {
|
187
|
101 REPSocketChannel<String> channel = REPSocketChannel.<String>create(new StringPacker());
|
174
|
102 ns.writeLog("is connecting to masterSeMa whose ip is"+masterIP, 1);
|
154
|
103 while(!channel.connect(masterIP)){
|
174
|
104 ns.writeLog("SeMa not listen to socket yet, wait", 1);
|
154
|
105 Thread.yield();
|
|
106 }
|
174
|
107 ns.writeLog("connecting was successful.", 1);
|
154
|
108
|
|
109 return channel;
|
|
110 }
|
|
111 }
|
|
112 class ClientInfo{
|
|
113 String str;
|
159
|
114 REPSocketChannel<String> channel;
|
|
115 ClientInfo(String _str, REPSocketChannel<String> _channel){
|
154
|
116 str = _str;
|
|
117 channel = _channel;
|
|
118 }
|
|
119 } |