changeset 23:54bf607118ae

change method to create RemoteDSM
author kazz <kazz@cr.ie.u-ryukyu.ac.jp>
date Tue, 17 Jan 2012 01:10:29 +0900
parents 2ca2d961a8d2
children ebd91e607b63
files src/alice/daemon/Config.java src/alice/daemon/Connection.java src/alice/datasegment/DataSegment.java src/alice/datasegment/RemoteDataSegmentManager.java src/topology/manager/IncomingHosts.java
diffstat 5 files changed, 63 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/alice/daemon/Config.java	Tue Jan 17 01:10:29 2012 +0900
@@ -0,0 +1,16 @@
+package alice.daemon;
+
+public class Config {
+
+	public int localPort = 10000;
+	
+	public Config(String[] args) {
+		for (int i = 0; i< args.length; i++) {
+			if ("-p".equals(args[i])) {
+				localPort = Integer.parseInt(args[++i]);
+				return;
+			}
+		}
+	}
+
+}
--- a/src/alice/daemon/Connection.java	Tue Jan 17 00:40:27 2012 +0900
+++ b/src/alice/daemon/Connection.java	Tue Jan 17 01:10:29 2012 +0900
@@ -14,6 +14,8 @@
 		this.socket = socket;
 	}
 	
+	public Connection() { }
+
 	public void sendCommand(Command cmd) {
 		try {
 			sendQueue.put(cmd);
--- a/src/alice/datasegment/DataSegment.java	Tue Jan 17 00:40:27 2012 +0900
+++ b/src/alice/datasegment/DataSegment.java	Tue Jan 17 01:10:29 2012 +0900
@@ -19,4 +19,10 @@
 		dataSegment.dataSegmentManageres.put(key, manager);
 	}
 	
+	public static RemoteDataSegmentManager connect(String key, String hostName, int port) {
+		RemoteDataSegmentManager manager = new RemoteDataSegmentManager(key, hostName, port);
+		regist(key, manager);
+		return manager;
+	}
+	
 }
--- a/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 00:40:27 2012 +0900
+++ b/src/alice/datasegment/RemoteDataSegmentManager.java	Tue Jan 17 01:10:29 2012 +0900
@@ -1,5 +1,9 @@
 package alice.datasegment;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+
 import org.msgpack.type.Value;
 
 import alice.codesegment.CodeSegment;
@@ -11,6 +15,8 @@
 	
 	Connection connection;
 	
+	// TODO: delete this constructor later
+	@Deprecated
 	public RemoteDataSegmentManager(Connection connection) {
 		this.connection = connection;
 		new IncomingTcpConnection(connection, this).start();
@@ -20,6 +26,34 @@
 		+ ":" + connection.socket.getPort()).start();
 	}
 	
+	public RemoteDataSegmentManager(String key, final String hostName, final int port) {
+		connection = new Connection();
+		final RemoteDataSegmentManager manager = this;
+		new Thread(replyThread, "RemoteDataSegmentManager-"
+				+ connection.socket.getInetAddress().getHostName()
+				+ ":" + connection.socket.getPort()).start();
+		new Thread("Connect-" + key) {
+			public void run() {
+				boolean connect = true;
+				do {
+					try {
+						SocketChannel sc = SocketChannel.open(new InetSocketAddress(hostName, port));
+						connection.socket = sc.socket();
+						connect = false;
+					} catch (IOException e) {
+						try {
+							Thread.sleep(500);
+						} catch (InterruptedException e1) {
+							e1.printStackTrace();
+						}
+					}
+				} while (connect);
+				new IncomingTcpConnection(connection, manager).start();
+				new OutboundTcpConnection(connection).start();
+			}
+		}.start();
+	}
+	
 	@Override
 	public void put(String key, Value val) {
 		connection.sendCommand(new Command(CommandType.PUT, null, key, val, 0, 0, null, null));
--- a/src/topology/manager/IncomingHosts.java	Tue Jan 17 00:40:27 2012 +0900
+++ b/src/topology/manager/IncomingHosts.java	Tue Jan 17 01:10:29 2012 +0900
@@ -6,9 +6,12 @@
 
 import org.apache.log4j.Logger;
 import org.msgpack.MessagePack;
+import org.msgpack.type.ValueFactory;
 
 import alice.codesegment.CodeSegment;
 import alice.datasegment.CommandType;
+import alice.datasegment.DataSegment;
+import alice.datasegment.DataSegmentManager;
 import alice.datasegment.DataSegmentReceiver;
 import alice.topology.HostMessage;
 
@@ -30,9 +33,8 @@
 		try {
 			HostMessage host = msgpack.convert(this.host.val, HostMessage.class);
 			String nodeName = nodeNames.poll();
-			
-			// TODO: send nodeName to node
-			
+			DataSegmentManager manager = DataSegment.connect(nodeName, host.name, host.port);
+			manager.put("host", ValueFactory.createRawValue(nodeName));
 		} catch (IOException e) {
 			logger.error("HostMessage format error");
 			e.printStackTrace();