Support for multiple namenodes
diff --git a/client/src/main/java/com/ibm/crail/conf/CrailConstants.java b/client/src/main/java/com/ibm/crail/conf/CrailConstants.java
index d97b43f..8369d19 100644
--- a/client/src/main/java/com/ibm/crail/conf/CrailConstants.java
+++ b/client/src/main/java/com/ibm/crail/conf/CrailConstants.java
@@ -22,6 +22,7 @@
 package com.ibm.crail.conf;
 
 import java.io.IOException;
+
 import org.slf4j.Logger;
 
 import com.ibm.crail.utils.CrailUtils;
@@ -30,7 +31,7 @@
 	private static final Logger LOG = CrailUtils.getLogger();
 	
 	public static final String VERSION_KEY = "crail.version";
-	public static int VERSION = 2900;
+	public static int VERSION = 2990;
 	
 	public static final String DIRECTORY_DEPTH_KEY = "crail.directorydepth";
 	public static int DIRECTORY_DEPTH = 16;
@@ -102,6 +103,9 @@
 	public static final String NAMENODE_RPC_TYPE_KEY = "crail.namenode.rpctype";
 	public static String NAMENODE_RPC_TYPE = "com.ibm.crail.namenode.rpc.darpc.DaRPCNameNode";
 	
+	public static final String NAMENODE_RPC_SERVICE_KEY = "crail.namenode.rpcservice";
+	public static String NAMENODE_RPC_SERVICE = "com.ibm.crail.namenode.NameNodeService";	
+	
 	//storage interface
 	public static final String STORAGE_TYPES_KEY = "crail.storage.types";
 	public static String STORAGE_TYPES = "com.ibm.crail.storage.rdma.RdmaStorageTier";		
diff --git a/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java b/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java
index ae3a7c8..d178995 100644
--- a/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java
+++ b/client/src/main/java/com/ibm/crail/core/CoreFileSystem.java
@@ -29,6 +29,7 @@
 import java.util.LinkedList;
 import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +54,7 @@
 import com.ibm.crail.metadata.DataNodeInfo;
 import com.ibm.crail.metadata.FileInfo;
 import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.rpc.RpcDispatcher;
 import com.ibm.crail.rpc.RpcErrors;
 import com.ibm.crail.rpc.RpcClient;
 import com.ibm.crail.rpc.RpcConnection;
@@ -77,8 +79,8 @@
 	private static AtomicInteger fsCount = new AtomicInteger(0);
 	
 	//namenode operations
-	private RpcClient rpcNameNode;
-	private RpcConnection namenodeClientRpc;
+	private RpcClient rpcClient;
+	private RpcConnection rpcConnection;
 	
 	//datanode operations
 	private EndpointCache datanodeEndpointCache;
@@ -121,11 +123,22 @@
 		
 		//Namenode
 		InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
-		this.rpcNameNode = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
-		rpcNameNode.init(conf, null);
-		rpcNameNode.printConf(LOG);		
-		this.namenodeClientRpc = rpcNameNode.connect(nnAddr);
-		LOG.info("connected to namenode at " + nnAddr);		
+		this.rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
+		rpcClient.init(conf, null);
+		rpcClient.printConf(LOG);	
+		ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
+		ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
+		while(!namenodeList.isEmpty()){
+			InetSocketAddress address = namenodeList.poll();
+			RpcConnection connection = rpcClient.connect(address);
+			connectionList.add(connection);
+		}
+		if (connectionList.size() == 1){
+			this.rpcConnection = connectionList.poll();
+		} else {
+			this.rpcConnection = new RpcDispatcher(connectionList);
+		}
+		LOG.info("connected to namenode(s) " + rpcConnection);		
 		
 		//Client
 		this.fsId = fsCount.getAndIncrement();
@@ -159,7 +172,7 @@
 			LOG.info("createNode: name " + path + ", type " + type + ", storageAffinity " + storageClass + ", locationAffinity " + locationClass);
 		}
 
-		RpcFuture<RpcCreateFile> fileRes = namenodeClientRpc.createFile(name, type, storageClass.value(), locationClass.value());
+		RpcFuture<RpcCreateFile> fileRes = rpcConnection.createFile(name, type, storageClass.value(), locationClass.value());
 		return new CreateNodeFuture(this, path, type, fileRes);
 	}	
 	
@@ -214,7 +227,7 @@
 			LOG.info("lookupDirectory: path " + path);
 		}
 		
-		RpcFuture<RpcGetFile> fileRes = namenodeClientRpc.getFile(name, false);
+		RpcFuture<RpcGetFile> fileRes = rpcConnection.getFile(name, false);
 		return new LookupNodeFuture(this, path, fileRes);
 	}	
 	
@@ -251,7 +264,7 @@
 			LOG.info("rename: srcname " + src + ", dstname " + dst);
 		}
 		
-		RpcFuture<RpcRenameFile> renameRes = namenodeClientRpc.renameFile(srcPath, dstPath);
+		RpcFuture<RpcRenameFile> renameRes = rpcConnection.renameFile(srcPath, dstPath);
 		return new RenameNodeFuture(this, src, dst, renameRes);
 	}
 	
@@ -319,7 +332,7 @@
 			LOG.info("delete: name " + path + ", recursive " + recursive);
 		}
 
-		RpcFuture<RpcDeleteFile> fileRes = namenodeClientRpc.removeFile(name, recursive);
+		RpcFuture<RpcDeleteFile> fileRes = rpcConnection.removeFile(name, recursive);
 		return new DeleteNodeFuture(this, path, recursive, fileRes);
 	}	
 	
@@ -364,7 +377,7 @@
 			LOG.info("getDirectoryList: " + name);
 		}
 
-		RpcGetFile fileRes = namenodeClientRpc.getFile(directory, false).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+		RpcGetFile fileRes = rpcConnection.getFile(directory, false).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
 		if (fileRes.getError() != RpcErrors.ERR_OK) {
 			LOG.info("getDirectoryList: " + RpcErrors.messages[fileRes.getError()]);
 			throw new FileNotFoundException(RpcErrors.messages[fileRes.getError()]);
@@ -408,7 +421,7 @@
 		HashMap<Long, DataNodeInfo> offset2DataNode = new HashMap<Long, DataNodeInfo>();
 	
 		for (long current = CrailUtils.blockStartAddress(start); current < start + len; current += CrailConstants.BLOCK_SIZE){
-			RpcGetLocation getLocationRes = namenodeClientRpc.getLocation(name, current).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+			RpcGetLocation getLocationRes = rpcConnection.getLocation(name, current).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
 			if (getLocationRes.getError() != RpcErrors.ERR_OK) {
 				LOG.info("location: " + RpcErrors.messages[getLocationRes.getError()]);
 				throw new IOException(RpcErrors.messages[getLocationRes.getError()]);
@@ -475,11 +488,11 @@
 	}
 	
 	public void dumpNameNode() throws Exception {
-		namenodeClientRpc.dumpNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+		rpcConnection.dumpNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
 	}	
 	
 	public void ping() throws Exception {
-		RpcPing pingRes = namenodeClientRpc.pingNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+		RpcPing pingRes = rpcConnection.pingNameNode().get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
 		if (pingRes.getError() != RpcErrors.ERR_OK) {
 			LOG.info("Ping: " + RpcErrors.messages[pingRes.getError()]);
 			throw new IOException(RpcErrors.messages[pingRes.getError()]);
@@ -541,13 +554,14 @@
 	
 		bufferCache.close();
 		datanodeEndpointCache.close();
-		rpcNameNode.close();
+		rpcConnection.close();
+		rpcClient.close();
 		this.isOpen = false;
 	}
 
 	public void closeFile(FileInfo fileInfo) throws Exception {
 		if (fileInfo.getToken() > 0){
-			namenodeClientRpc.setFile(fileInfo, true).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);				
+			rpcConnection.setFile(fileInfo, true).get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);				
 		}
 	}
 
@@ -633,7 +647,7 @@
 	}	
 
 	RpcConnection getNamenodeClientRpc() {
-		return namenodeClientRpc;
+		return rpcConnection;
 	}
 
 	EndpointCache getDatanodeEndpointCache() {
diff --git a/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java b/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java
index 4e9ea3b..8662cb0 100644
--- a/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java
+++ b/client/src/main/java/com/ibm/crail/metadata/DataNodeStatistics.java
@@ -25,24 +25,24 @@
 import java.nio.ByteBuffer;
 
 public class DataNodeStatistics {
-	public static final int CSIZE = 4;
+	public static final int CSIZE = 12;
 	
+	private long serviceId;
 	private int freeBlockCount;
 	
 	public DataNodeStatistics(){
+		this.serviceId = 0;
 		this.freeBlockCount = 0;
 	}
 	
-	public DataNodeStatistics(int freeBlockCount){
-		this.freeBlockCount = freeBlockCount;
-	}
-	
 	public int write(ByteBuffer buffer){
+		buffer.putLong(serviceId);
 		buffer.putInt(freeBlockCount);
 		return CSIZE;
 	}
 	
 	public void update(ByteBuffer buffer) throws UnknownHostException {
+		this.serviceId = buffer.getLong();
 		this.freeBlockCount = buffer.getInt();
 	}
 
@@ -55,6 +55,15 @@
 	}
 
 	public void setStatistics(DataNodeStatistics statistics) {
+		this.serviceId = statistics.getServiceId();
 		this.freeBlockCount = statistics.getFreeBlockCount();
+	}
+
+	public void setServiceId(long serviceId) {
+		this.serviceId = serviceId;
 	}	
+	
+	public long getServiceId(){
+		return serviceId;
+	}
 }
diff --git a/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java b/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java
index e4e37c6..3a4c619 100644
--- a/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java
+++ b/client/src/main/java/com/ibm/crail/rpc/RpcConnection.java
@@ -63,6 +63,8 @@
 	public abstract RpcFuture<RpcPing> pingNameNode()
 			throws Exception;
 	
+	public abstract void close() throws Exception;
+	
 	@SuppressWarnings("unchecked")
 	public static RpcConnection createInstance(String name) throws Exception {
 		Class<?> nodeClass = Class.forName(name);
diff --git a/client/src/main/java/com/ibm/crail/rpc/RpcDispatcher.java b/client/src/main/java/com/ibm/crail/rpc/RpcDispatcher.java
new file mode 100644
index 0000000..ad92993
--- /dev/null
+++ b/client/src/main/java/com/ibm/crail/rpc/RpcDispatcher.java
@@ -0,0 +1,137 @@
+package com.ibm.crail.rpc;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+
+import com.ibm.crail.CrailNodeType;
+import com.ibm.crail.metadata.BlockInfo;
+import com.ibm.crail.metadata.DataNodeInfo;
+import com.ibm.crail.metadata.FileInfo;
+import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.utils.CrailUtils;
+
+public class RpcDispatcher implements RpcConnection {
+	private static final Logger LOG = CrailUtils.getLogger();
+	
+	private RpcConnection[] connections;
+	private int setBlockIndex;
+	private int getDataNodeIndex;
+
+	public RpcDispatcher(ConcurrentLinkedQueue<RpcConnection> connectionList) {
+		connections = new RpcConnection[connectionList.size()];
+		for (int i = 0; i < connections.length; i++){
+			connections[i] = connectionList.poll();
+		}
+		this.setBlockIndex = 0;
+		this.getDataNodeIndex = 0;
+	}
+
+	@Override
+	public RpcFuture<RpcCreateFile> createFile(FileName filename,
+			CrailNodeType type, int storageClass, int locationClass)
+			throws IOException {
+		int index = filename.getComponent(0) % connections.length;
+//		LOG.info("issuing create file for filename [" + filename.toString() + "], on index " + index);
+		return connections[index].createFile(filename, type, storageClass, locationClass);
+	}
+
+	@Override
+	public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable)
+			throws IOException {
+		int index = filename.getComponent(0) % connections.length;
+//		LOG.info("issuing get file for filename [" + filename.toString() + "], on index " + index);
+		return connections[index].getFile(filename, writeable);
+	}
+
+	@Override
+	public RpcFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close)
+			throws IOException {
+		long connectionsLength = (long) connections.length;
+		long _index = fileInfo.getFd() % connectionsLength;
+		int index = (int) _index;
+//		LOG.info("issuing set file for fd [" + fileInfo.getFd() + "], on index " + index);
+		return connections[index].setFile(fileInfo, close);
+	}
+
+	@Override
+	public RpcFuture<RpcDeleteFile> removeFile(FileName filename,
+			boolean recursive) throws IOException {
+		int index = filename.getComponent(0) % connections.length;
+//		LOG.info("issuing remove file for filename [" + filename.toString() + "], on index " + index);		
+		return connections[index].removeFile(filename, recursive);
+	}
+
+	@Override
+	public RpcFuture<RpcRenameFile> renameFile(FileName srcHash,
+			FileName dstHash) throws IOException {
+		int srcIndex = srcHash.getComponent(0) % connections.length;
+		int dstIndex = srcHash.getComponent(0) % connections.length;
+//		LOG.info("issuing remove file for src [" + srcHash.toString() + "," + dstHash.toString() + "], on index " + srcIndex);	
+		if (srcIndex != dstIndex){
+			throw new IOException("Rename not supported across namenode domains");
+		} else {
+			return connections[srcIndex].renameFile(srcHash, dstHash);
+		}
+	}
+
+	@Override
+	public RpcFuture<RpcGetBlock> getBlock(long fd, long token, long position,
+			long capacity) throws IOException {
+		long connectionsLength = (long) connections.length;
+		long _index = fd % connectionsLength;
+		int index = (int) _index;
+//		LOG.info("issuing get block for fd [" + fd + "], on index " + index);		
+		return connections[index].getBlock(fd, token, position, capacity);
+	}
+
+	@Override
+	public RpcFuture<RpcGetLocation> getLocation(FileName fileName,
+			long position) throws IOException {
+		int index = fileName.getComponent(0) % connections.length;
+//		LOG.info("issuing get location for filename [" + fileName.toString() + "], on index " + index);			
+		return connections[index].getLocation(fileName, position);
+	}
+
+	@Override
+	public RpcFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
+//		LOG.info("issuing set block on index " + setBlockIndex);
+		RpcFuture<RpcVoid> res = connections[setBlockIndex].setBlock(blockInfo);
+		setBlockIndex = (setBlockIndex + 1) % connections.length;
+		return res;
+	}
+
+	@Override
+	public RpcFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo)
+			throws Exception {
+//		LOG.info("issuing get datanode on index " + getDataNodeIndex);
+		RpcFuture<RpcGetDataNode> res = connections[getDataNodeIndex].getDataNode(dnInfo);
+		getDataNodeIndex = (getDataNodeIndex + 1) % connections.length;
+		return res;
+	}
+
+	@Override
+	public RpcFuture<RpcVoid> dumpNameNode() throws Exception {
+		return connections[0].dumpNameNode();
+	}
+
+	@Override
+	public RpcFuture<RpcPing> pingNameNode() throws Exception {
+		return connections[0].pingNameNode();
+	}
+
+	@Override
+	public String toString() {
+		String address = "";
+		for (RpcConnection connection : connections){
+			address = address + ", " + connection.toString();
+		}
+		
+		return address;
+	}
+
+	@Override
+	public void close() throws Exception {
+		connections[0].close();
+	}
+}
diff --git a/client/src/main/java/com/ibm/crail/utils/CrailUtils.java b/client/src/main/java/com/ibm/crail/utils/CrailUtils.java
index 25baafa..bf77510 100644
--- a/client/src/main/java/com/ibm/crail/utils/CrailUtils.java
+++ b/client/src/main/java/com/ibm/crail/utils/CrailUtils.java
@@ -30,6 +30,8 @@
 import java.nio.ByteBuffer;
 import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
@@ -54,11 +56,81 @@
 	}	
 	
 	public static InetSocketAddress getNameNodeAddress() {
-		URI uri = URI.create(CrailConstants.NAMENODE_ADDRESS);
-		InetSocketAddress nnAddr = createSocketAddrForHost(uri.getHost(), uri.getPort());
+		StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+		LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
+		while(tupleTokenizer.hasMoreTokens()){
+			String address = tupleTokenizer.nextToken();
+			URI uri = URI.create(address);
+			namenodes.add(uri);
+		}
+		
+		URI master = namenodes.poll();
+		InetSocketAddress nnAddr = createSocketAddrForHost(master.getHost(), master.getPort());
 		return nnAddr;
 	}
 	
+	public static URI getPrimaryNameNode() {
+		StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+		LinkedBlockingQueue<URI> namenodes = new LinkedBlockingQueue<URI>();
+		while(tupleTokenizer.hasMoreTokens()){
+			String address = tupleTokenizer.nextToken();
+			URI uri = URI.create(address);
+			namenodes.add(uri);
+		}
+		
+		URI master = namenodes.poll();
+		return master;
+	}	
+	
+	public static boolean verifyNamenode(String namenode) {
+		StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+		ConcurrentHashMap<String, Object> namenodes = new ConcurrentHashMap<String, Object>();
+		while(tupleTokenizer.hasMoreTokens()){
+			String address = tupleTokenizer.nextToken();
+			URI uri = URI.create(address);
+			String node = uri.getHost() + ":" + uri.getPort();
+			namenodes.put(node, node);
+		}		
+		
+		URI uri = URI.create(namenode);
+		String node = uri.getHost() + ":" + uri.getPort();
+		return namenodes.containsKey(node);
+	}
+
+	public static ConcurrentLinkedQueue<InetSocketAddress> getNameNodeList() {
+		StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+		ConcurrentLinkedQueue<InetSocketAddress> namenodes = new ConcurrentLinkedQueue<InetSocketAddress>();
+		while(tupleTokenizer.hasMoreTokens()){
+			String token = tupleTokenizer.nextToken();
+			URI uri = URI.create(token);
+			InetSocketAddress address = createSocketAddrForHost(uri.getHost(), uri.getPort());
+			namenodes.add(address);
+		}
+		return namenodes;
+	}
+	
+	public static long getServiceId(String namenode) {
+		StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+		ConcurrentHashMap<String, Long> namenodes = new ConcurrentHashMap<String, Long>();
+		long serviceId = 0;
+		while(tupleTokenizer.hasMoreTokens()){
+			String address = tupleTokenizer.nextToken();
+			URI uri = URI.create(address);
+			String node = uri.getHost() + ":" + uri.getPort();
+			namenodes.put(node, serviceId++);
+		}	
+		
+		URI uri = URI.create(namenode);
+		String node = uri.getHost() + ":" + uri.getPort();		
+		long id = namenodes.get(node);
+		return id;
+	}
+
+	public static long getServiceSize() {
+		StringTokenizer tupleTokenizer = new StringTokenizer(CrailConstants.NAMENODE_ADDRESS, ",");
+		return tupleTokenizer.countTokens();
+	}	
+
 	public static final long blockStartAddress(long offset) {
 		long blockCount = offset / CrailConstants.BLOCK_SIZE;
 		return blockCount*CrailConstants.BLOCK_SIZE;
@@ -205,4 +277,5 @@
 		}
 		return address;
 	}
+
 }
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java b/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java
index b70f23e..8c0bb94 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/AbstractNode.java
@@ -21,24 +21,19 @@
 
 package com.ibm.crail.namenode;
 
-import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import com.ibm.crail.CrailNodeType;
-import com.ibm.crail.CrailStorageClass;
 import com.ibm.crail.conf.CrailConstants;
 import com.ibm.crail.metadata.BlockInfo;
 import com.ibm.crail.metadata.FileInfo;
-import com.ibm.crail.metadata.FileName;
 
 public abstract class AbstractNode extends FileInfo implements Delayed {
-	private static AtomicLong fdcount = new AtomicLong(0);
-	
+//	private static AtomicLong fdcount = new AtomicLong(0);
 	private int fileComponent;
 	private AtomicLong dirOffsetCounter;
 	private ConcurrentHashMap<Integer, AbstractNode> children;
@@ -46,22 +41,8 @@
 	private int storageClass;
 	private int locationClass;
 	
-	public static AbstractNode createRoot() throws IOException {
-		return new DirectoryBlocks(new FileName("/").getFileComponent(), CrailNodeType.DIRECTORY, CrailConstants.STORAGE_ROOTCLASS, 0);
-	}
-	
-	public static AbstractNode createNode(int fileComponent, CrailNodeType type, int storageClass, int locationClass) throws IOException {
-		if (type == CrailNodeType.DIRECTORY){
-			return new DirectoryBlocks(fileComponent, CrailNodeType.DIRECTORY, storageClass, locationClass);
-		} else if (type == CrailNodeType.MULTIFILE){
-			return new DirectoryBlocks(fileComponent, CrailNodeType.MULTIFILE, storageClass, locationClass);
-		} else {
-			return new FileBlocks(fileComponent, CrailNodeType.DATAFILE, storageClass, locationClass);
-		}
-	}
-	
-	public AbstractNode(int fileComponent, CrailNodeType type, int storageClass, int locationAffinity){
-		super(fdcount.incrementAndGet(), type);
+	public AbstractNode(long fd, int fileComponent, CrailNodeType type, int storageClass, int locationAffinity){
+		super(fd, type);
 		
 		this.fileComponent = fileComponent;
 		this.storageClass = storageClass;
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java b/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java
index 002ab48..bfcfd54 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/DirectoryBlocks.java
@@ -31,8 +31,8 @@
 public class DirectoryBlocks extends AbstractNode {
 	private ConcurrentHashMap<Integer, BlockInfo> blocks;
 	
-	DirectoryBlocks(int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
-		super(fileComponent, type, storageClass, locationClass);
+	DirectoryBlocks(long fd, int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
+		super(fd, fileComponent, type, storageClass, locationClass);
 		this.blocks = new ConcurrentHashMap<Integer, BlockInfo>();
 	}
 
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java b/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java
index 9b2b348..1884143 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/FileBlocks.java
@@ -37,8 +37,8 @@
 	private final Lock readLock;
 	private final Lock writeLock;
 	
-	public FileBlocks(int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
-		super(fileComponent, type, storageClass, locationClass);
+	public FileBlocks(long fd, int fileComponent, CrailNodeType type, int storageClass, int locationClass) {
+		super(fd, fileComponent, type, storageClass, locationClass);
 		this.blocks = new ArrayList<BlockInfo>(CrailConstants.NAMENODE_FILEBLOCKS);
 		this.lock = new ReentrantReadWriteLock();
 		this.readLock = lock.readLock();
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java b/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java
index 7d44631..30d2687 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/FileStore.java
@@ -23,18 +23,31 @@
 
 import java.io.IOException;
 
+import com.ibm.crail.CrailNodeType;
 import com.ibm.crail.conf.CrailConstants;
 import com.ibm.crail.metadata.FileName;
 import com.ibm.crail.rpc.RpcErrors;
 import com.ibm.crail.rpc.RpcNameNodeState;
 
 public class FileStore {
+	private Sequencer sequencer;
 	private AbstractNode root;
 	
-	public FileStore() throws IOException { 
-		this.root = DirectoryBlocks.createRoot();
+	public FileStore(Sequencer sequencer) throws IOException { 
+		this.sequencer = sequencer;
+		this.root = createNode(new FileName("/").getFileComponent(), CrailNodeType.DIRECTORY, CrailConstants.STORAGE_ROOTCLASS, 0);
 	}
 	
+	public AbstractNode createNode(int fileComponent, CrailNodeType type, int storageClass, int locationClass) throws IOException {
+		if (type == CrailNodeType.DIRECTORY){
+			return new DirectoryBlocks(sequencer.getNextId(), fileComponent, CrailNodeType.DIRECTORY, storageClass, locationClass);
+		} else if (type == CrailNodeType.MULTIFILE){
+			return new DirectoryBlocks(sequencer.getNextId(), fileComponent, CrailNodeType.MULTIFILE, storageClass, locationClass);
+		} else {
+			return new FileBlocks(sequencer.getNextId(), fileComponent, CrailNodeType.DATAFILE, storageClass, locationClass);
+		}
+	}	
+	
 	public AbstractNode retrieveFile(FileName filename, RpcNameNodeState error) throws Exception{
 		return retrieveFileInternal(filename, filename.getLength(), error);
 	}
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java b/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java
index 157fad6..f02dc00 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/NameNode.java
@@ -21,38 +21,73 @@
 
 package com.ibm.crail.namenode;
 
-import java.util.concurrent.DelayQueue;
-
+import java.net.URI;
+import java.util.Arrays;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.slf4j.Logger;
 
 import com.ibm.crail.conf.CrailConfiguration;
 import com.ibm.crail.conf.CrailConstants;
 import com.ibm.crail.rpc.RpcBinding;
+import com.ibm.crail.rpc.RpcNameNodeService;
 import com.ibm.crail.utils.CrailUtils;
 
 public class NameNode {
 	private static final Logger LOG = CrailUtils.getLogger();
 	
 	public static void main(String args[]) throws Exception {
-		CrailConfiguration conf = new CrailConfiguration();
-		
 		LOG.info("initalizing namenode ");		
+		CrailConfiguration conf = new CrailConfiguration();
 		CrailConstants.updateConstants(conf);
+		
+		URI uri = CrailUtils.getPrimaryNameNode();
+		String address = uri.getHost();
+		int port = uri.getPort();
+		
+		if (args != null) {
+			Option addressOption = Option.builder("a").desc("ip address namenode is started on").hasArg().build();
+			Option portOption = Option.builder("p").desc("port namenode is started on").hasArg().build();
+			Options options = new Options();
+			options.addOption(portOption);
+			options.addOption(addressOption);
+			CommandLineParser parser = new DefaultParser();
+			
+			try {
+				CommandLine line = parser.parse(options, Arrays.copyOfRange(args, 0, args.length));
+				if (line.hasOption(addressOption.getOpt())) {
+					address = line.getOptionValue(addressOption.getOpt());
+				}					
+				if (line.hasOption(portOption.getOpt())) {
+					port = Integer.parseInt(line.getOptionValue(portOption.getOpt()));
+				}				
+			} catch (ParseException e) {
+				HelpFormatter formatter = new HelpFormatter();
+				formatter.printHelp("Namenode", options);
+				System.exit(-1);
+			}
+		}		
+		
+		String namenode = "crail://" + address + ":" + port;
+		long serviceId = CrailUtils.getServiceId(namenode);
+		long serviceSize = CrailUtils.getServiceSize();
+		if (!CrailUtils.verifyNamenode(namenode)){
+			throw new Exception("Namenode address/port [" + namenode + "] has to be listed in crail.namenode.address " + CrailConstants.NAMENODE_ADDRESS);
+		}
+		
+		CrailConstants.NAMENODE_ADDRESS = namenode + "?id=" + serviceId + "&size=" + serviceSize;
 		CrailConstants.printConf();
 		CrailConstants.verify();
 		
-		DelayQueue<AbstractNode> deleteQueue = new DelayQueue<AbstractNode>();
-		NameNodeService service = new NameNodeService(deleteQueue);
-		
+		RpcNameNodeService service = RpcNameNodeService.createInstance(CrailConstants.NAMENODE_RPC_SERVICE);
 		RpcBinding rpcBinding = RpcBinding.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
 		rpcBinding.init(conf, null);
 		rpcBinding.printConf(LOG);
-		
-		GCServer gcServer = new GCServer(service, deleteQueue);
-		
-		Thread gc = new Thread(gcServer);
-		gc.start();
-		
 		rpcBinding.run(service);
 		System.exit(0);;
 	}
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java b/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java
index b405de5..3aac619 100644
--- a/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/com/ibm/crail/namenode/NameNodeService.java
@@ -22,8 +22,11 @@
 package com.ibm.crail.namenode;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 
@@ -41,26 +44,42 @@
 import com.ibm.crail.rpc.RpcResponseMessage;
 import com.ibm.crail.utils.CrailUtils;
 
-public class NameNodeService implements RpcNameNodeService {
+public class NameNodeService implements RpcNameNodeService, Sequencer {
 	private static final Logger LOG = CrailUtils.getLogger();
 	
 	//data structures for datanodes, blocks, files
+	private long serviceId;
+	private long serviceSize;
+	private AtomicLong sequenceId;
 	private BlockStore blockStore;
 	private DelayQueue<AbstractNode> deleteQueue;
 	private FileStore fileTree;
 	private ConcurrentHashMap<Long, AbstractNode> fileTable;	
-
+	private GCServer gcServer;
 	
-	NameNodeService(DelayQueue<AbstractNode> deleteQueue) throws IOException {
+	public NameNodeService() throws IOException {
+		URI uri = URI.create(CrailConstants.NAMENODE_ADDRESS);
+		String query = uri.getRawQuery();
+		StringTokenizer tokenizer = new StringTokenizer(query, "&");
+		this.serviceId = Long.parseLong(tokenizer.nextToken().substring(3));
+		this.serviceSize = Long.parseLong(tokenizer.nextToken().substring(5));
+		this.sequenceId = new AtomicLong(serviceId);
 		this.blockStore = new BlockStore();
-		this.deleteQueue = deleteQueue;
-		this.fileTree = new FileStore();
+		this.deleteQueue = new DelayQueue<AbstractNode>();
+		this.fileTree = new FileStore(this);
 		this.fileTable = new ConcurrentHashMap<Long, AbstractNode>();
+		this.gcServer = new GCServer(this, deleteQueue);
 		
 		AbstractNode root = fileTree.getRoot();
 		fileTable.put(root.getFd(), root);
+		Thread gc = new Thread(gcServer);
+		gc.start();				
 	}
 	
+	public long getNextId(){
+		return sequenceId.getAndAdd(serviceSize);
+	}
+
 	@Override
 	public short createFile(RpcRequestMessage.CreateFileReq request, RpcResponseMessage.CreateFileRes response, RpcNameNodeState errorState) throws Exception {
 		//check protocol
@@ -99,7 +118,7 @@
 			locationClass = parentInfo.getLocationClass();
 		}
 		
-		AbstractNode fileInfo = FileBlocks.createNode(fileHash.getFileComponent(), type, storageClass, locationClass);
+		AbstractNode fileInfo = fileTree.createNode(fileHash.getFileComponent(), type, storageClass, locationClass);
 		if (!parentInfo.addChild(fileInfo)){
 			return RpcErrors.ERR_FILE_EXISTS;
 		}
@@ -387,6 +406,7 @@
 			return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
 		}
 		
+		response.setServiceId(serviceId);
 		response.setFreeBlockCount(dnInfoNn.getBlockCount());
 		
 		return RpcErrors.ERR_OK;
diff --git a/namenode/src/main/java/com/ibm/crail/namenode/Sequencer.java b/namenode/src/main/java/com/ibm/crail/namenode/Sequencer.java
new file mode 100644
index 0000000..96de5a8
--- /dev/null
+++ b/namenode/src/main/java/com/ibm/crail/namenode/Sequencer.java
@@ -0,0 +1,5 @@
+package com.ibm.crail.namenode;
+
+public interface Sequencer {
+	long getNextId();
+}
diff --git a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java
index afd1bb3..5a00ff5 100644
--- a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java
+++ b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNode.java
@@ -21,61 +21,27 @@
 
 package com.ibm.crail.namenode.rpc.darpc;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import org.slf4j.Logger;
-import com.ibm.crail.conf.CrailConfiguration;
 import com.ibm.crail.rpc.RpcBinding;
-import com.ibm.crail.rpc.RpcConnection;
 import com.ibm.crail.rpc.RpcNameNodeService;
 import com.ibm.crail.utils.CrailUtils;
-import com.ibm.darpc.DaRPCClientEndpoint;
-import com.ibm.darpc.DaRPCClientGroup;
 import com.ibm.darpc.DaRPCServerEndpoint;
 import com.ibm.darpc.DaRPCServerGroup;
 import com.ibm.disni.rdma.RdmaServerEndpoint;
 
-public class DaRPCNameNode implements RpcBinding {
+public class DaRPCNameNode extends DaRPCNameNodeClient implements RpcBinding {
 	private static final Logger LOG = CrailUtils.getLogger();
 	
-	private DaRPCClientGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeClientGroup;
-	private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeClientEp;
-	
 	private DaRPCServerGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeServerGroup;
 	private RdmaServerEndpoint<DaRPCServerEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse>> namenodeServerEp;
 	
 	public DaRPCNameNode(){
-		this.namenodeClientEp = null;
-		this.namenodeClientGroup = null;
 		this.namenodeServerEp = null;
 		this.namenodeServerGroup = null;
 	}
 	
-	public void init(CrailConfiguration conf, String[] args) throws IOException{
-		DaRPCConstants.updateConstants(conf);
-		DaRPCConstants.verify();		
-	}
-	
-	public void printConf(Logger logger){
-		DaRPCConstants.printConf(logger);
-	}
-
-	@Override
-	public RpcConnection connect(InetSocketAddress address) throws Exception {
-		DaRPCNameNodeProtocol namenodeProtocol = new DaRPCNameNodeProtocol();
-		this.namenodeClientGroup = DaRPCClientGroup.createClientGroup(namenodeProtocol, 100, DaRPCConstants.NAMENODE_DARPC_MAXINLINE, DaRPCConstants.NAMENODE_DARPC_RECVQUEUE, DaRPCConstants.NAMENODE_DARPC_SENDQUEUE);
-		LOG.info("rpc group started, recvQueue " + namenodeClientGroup.recvQueueSize());
-		this.namenodeClientEp = namenodeClientGroup.createEndpoint();
-		InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
-		LOG.info("connecting to namenode at " + nnAddr);
-		URI uri = URI.create("rdma://" + nnAddr.getAddress().getHostAddress() + ":" + nnAddr.getPort());
-		namenodeClientEp.connect(uri);
-		DaRPCNameNodeClient namenodeClientRpc = new DaRPCNameNodeClient(namenodeClientEp);
-		return namenodeClientRpc;
-		
-	}
-
 	@Override
 	public void run(RpcNameNodeService service) {
 		try {
@@ -109,14 +75,6 @@
 	@Override
 	public void close() {
 		try {
-			if (namenodeClientEp != null){
-				namenodeClientEp.close();
-				namenodeClientEp = null;
-			}
-			if (namenodeClientGroup != null){
-				namenodeClientGroup.close();
-				namenodeClientGroup = null;
-			}
 			if (namenodeServerEp != null){
 				namenodeServerEp.close();
 				namenodeServerEp = null;
diff --git a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java
index a457c51..688dbb4 100644
--- a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java
+++ b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeClient.java
@@ -1,284 +1,59 @@
-/*
- * Crail: A Multi-tiered Distributed Direct Access File System
- *
- * Author: Patrick Stuedi <stu@zurich.ibm.com>
- *
- * Copyright (C) 2016, IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
 package com.ibm.crail.namenode.rpc.darpc;
 
 import java.io.IOException;
-
+import java.net.InetSocketAddress;
+import java.net.URI;
 import org.slf4j.Logger;
-
-import com.ibm.crail.CrailNodeType;
-import com.ibm.crail.conf.CrailConstants;
-import com.ibm.crail.metadata.BlockInfo;
-import com.ibm.crail.metadata.DataNodeInfo;
-import com.ibm.crail.metadata.FileInfo;
-import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.conf.CrailConfiguration;
+import com.ibm.crail.rpc.RpcClient;
 import com.ibm.crail.rpc.RpcConnection;
-import com.ibm.crail.rpc.RpcCreateFile;
-import com.ibm.crail.rpc.RpcDeleteFile;
-import com.ibm.crail.rpc.RpcGetBlock;
-import com.ibm.crail.rpc.RpcGetDataNode;
-import com.ibm.crail.rpc.RpcGetFile;
-import com.ibm.crail.rpc.RpcGetLocation;
-import com.ibm.crail.rpc.RpcFuture;
-import com.ibm.crail.rpc.RpcPing;
-import com.ibm.crail.rpc.RpcProtocol;
-import com.ibm.crail.rpc.RpcRenameFile;
-import com.ibm.crail.rpc.RpcRequestMessage;
-import com.ibm.crail.rpc.RpcResponseMessage;
-import com.ibm.crail.rpc.RpcVoid;
 import com.ibm.crail.utils.CrailUtils;
 import com.ibm.darpc.DaRPCClientEndpoint;
-import com.ibm.darpc.DaRPCFuture;
-import com.ibm.darpc.DaRPCStream;
+import com.ibm.darpc.DaRPCClientGroup;
 
-public class DaRPCNameNodeClient implements RpcConnection {
+public class DaRPCNameNodeClient implements RpcClient {
 	private static final Logger LOG = CrailUtils.getLogger();
 	
-	private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> rpcEndpoint;
-	private DaRPCStream<DaRPCNameNodeRequest, DaRPCNameNodeResponse> stream;
+	private DaRPCClientGroup<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeClientGroup;
 	
-	public DaRPCNameNodeClient(DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) throws IOException {
-		this.rpcEndpoint = endpoint;
-		this.stream = endpoint.createStream();
-	}	
-	
-	@Override
-	public RpcFuture<RpcCreateFile> createFile(FileName filename, CrailNodeType type, int storageClass, int locationClass) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: createFile, fileType " + type + ", storageClass " + storageClass + ", locationClass " + locationClass);
-		}
-		
-		RpcRequestMessage.CreateFileReq createFileReq = new RpcRequestMessage.CreateFileReq(filename, type, storageClass, locationClass);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(createFileReq);
-		request.setCommand(RpcProtocol.CMD_CREATE_FILE);
-		
-		RpcResponseMessage.CreateFileRes fileRes = new RpcResponseMessage.CreateFileRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcCreateFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcCreateFile>(future, fileRes);
-		
-		return nameNodeFuture;
+	public DaRPCNameNodeClient(){
+		this.namenodeClientGroup = null;
 	}
 	
-	@Override
-	public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: getFile, writeable " + writeable);
-		}
-		
-		RpcRequestMessage.GetFileReq getFileReq = new RpcRequestMessage.GetFileReq(filename, writeable);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getFileReq);
-		request.setCommand(RpcProtocol.CMD_GET_FILE);
+	public void init(CrailConfiguration conf, String[] args) throws IOException{
+		DaRPCConstants.updateConstants(conf);
+		DaRPCConstants.verify();		
+	}
+	
+	public void printConf(Logger logger){
+		DaRPCConstants.printConf(logger);
+	}
 
-		RpcResponseMessage.GetFileRes fileRes = new RpcResponseMessage.GetFileRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcGetFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetFile>(future, fileRes);
-		
-		return nameNodeFuture;
-	}
-	
 	@Override
-	public DaRPCNameNodeFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: setFile, id " + fileInfo.getFd() + ", close " + close);
-		}
+	public RpcConnection connect(InetSocketAddress address) throws Exception {
+		DaRPCNameNodeProtocol namenodeProtocol = new DaRPCNameNodeProtocol();
+		this.namenodeClientGroup = DaRPCClientGroup.createClientGroup(namenodeProtocol, 100, DaRPCConstants.NAMENODE_DARPC_MAXINLINE, DaRPCConstants.NAMENODE_DARPC_RECVQUEUE, DaRPCConstants.NAMENODE_DARPC_SENDQUEUE);
+		LOG.info("rpc group started, recvQueue " + namenodeClientGroup.recvQueueSize());
+		DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> namenodeEndopoint = namenodeClientGroup.createEndpoint();
+//		LOG.info("connecting to namenode at " + address);
+		URI uri = URI.create("rdma://" + address.getAddress().getHostAddress() + ":" + address.getPort());
+		namenodeEndopoint.connect(uri);
+		DaRPCNameNodeConnection connection = new DaRPCNameNodeConnection(namenodeEndopoint);
+		return connection;
 		
-		RpcRequestMessage.SetFileReq setFileReq = new RpcRequestMessage.SetFileReq(fileInfo, close);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setFileReq);
-		request.setCommand(RpcProtocol.CMD_SET_FILE);
-		
-		RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
-		
-		return nameNodeFuture;		
 	}
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcDeleteFile> removeFile(FileName filename, boolean recursive) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: removeFile");
-		}
-		
-		RpcRequestMessage.RemoveFileReq removeReq = new RpcRequestMessage.RemoveFileReq(filename, recursive);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeReq);
-		request.setCommand(RpcProtocol.CMD_REMOVE_FILE);
-		
-		RpcResponseMessage.DeleteFileRes fileRes = new RpcResponseMessage.DeleteFileRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcDeleteFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcDeleteFile>(future, fileRes);
-		
-		return nameNodeFuture;			
-	}
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcRenameFile> renameFile(FileName srcHash, FileName dstHash) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: renameFile");
-		}
-		
-		RpcRequestMessage.RenameFileReq renameReq = new RpcRequestMessage.RenameFileReq(srcHash, dstHash);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(renameReq);
-		request.setCommand(RpcProtocol.CMD_RENAME_FILE);
-		
-		RpcResponseMessage.RenameRes renameRes = new RpcResponseMessage.RenameRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(renameRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcRenameFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcRenameFile>(future, renameRes);
-		
-		return nameNodeFuture;	
-	}
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcGetBlock> getBlock(long fd, long token, long position, long capacity) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: getBlock, fd " + fd + ", token " + token + ", position " + position + ", capacity " + capacity);
-		}
-		
-		RpcRequestMessage.GetBlockReq getBlockReq = new RpcRequestMessage.GetBlockReq(fd, token, position, capacity);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getBlockReq);
-		request.setCommand(RpcProtocol.CMD_GET_BLOCK);
-		
-		RpcResponseMessage.GetBlockRes getBlockRes = new RpcResponseMessage.GetBlockRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getBlockRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcGetBlock> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetBlock>(future, getBlockRes);
-		
-		return nameNodeFuture;	
-	}
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcGetLocation> getLocation(FileName fileName, long position) throws IOException {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: getLocation, position " + position);
-		}		
-		
-		RpcRequestMessage.GetLocationReq getLocationReq = new RpcRequestMessage.GetLocationReq(fileName, position);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getLocationReq);
-		request.setCommand(RpcProtocol.CMD_GET_LOCATION);
 
-		RpcResponseMessage.GetLocationRes getLocationRes = new RpcResponseMessage.GetLocationRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getLocationRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcGetLocation> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetLocation>(future, getLocationRes);
-		
-		return nameNodeFuture;			
-	}	
-	
 	@Override
-	public DaRPCNameNodeFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
-		if (CrailConstants.DEBUG){
-			LOG.debug("RPC: setBlock, ");
-		}		
-		
-		RpcRequestMessage.SetBlockReq setBlockReq = new RpcRequestMessage.SetBlockReq(blockInfo);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setBlockReq);
-		request.setCommand(RpcProtocol.CMD_SET_BLOCK);
-		
-		RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
-		
-		return nameNodeFuture;	
-	}
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo) throws Exception {
-		RpcRequestMessage.GetDataNodeReq getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(dnInfo);
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getDataNodeReq);
-		request.setCommand(RpcProtocol.CMD_GET_DATANODE);
-		
-		RpcResponseMessage.GetDataNodeRes getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getDataNodeRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcGetDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetDataNode>(future, getDataNodeRes);
-		
-		return nameNodeFuture;	
-	}	
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcVoid> dumpNameNode() throws Exception {
-		
-		
-		RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(dumpNameNodeReq);
-		request.setCommand(RpcProtocol.CMD_DUMP_NAMENODE);
-
-		RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);	
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
-		
-		return nameNodeFuture;	
-	}	
-	
-	@Override
-	public DaRPCNameNodeFuture<RpcPing> pingNameNode() throws Exception {
-		
-		RpcRequestMessage.PingNameNodeReq pingReq = new RpcRequestMessage.PingNameNodeReq();
-		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(pingReq);
-		request.setCommand(RpcProtocol.CMD_PING_NAMENODE);
-
-		RpcResponseMessage.PingNameNodeRes pingRes = new RpcResponseMessage.PingNameNodeRes();
-		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(pingRes);
-		
-		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
-		
-		DaRPCNameNodeFuture<RpcPing> nameNodeFuture = new DaRPCNameNodeFuture<RpcPing>(future, pingRes);
-		
-		return nameNodeFuture;	
-	}
-	
-	private DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> issueRPC(DaRPCNameNodeRequest request, DaRPCNameNodeResponse response) throws IOException{
+	public void close() {
 		try {
-			DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = stream.request(request, response, false);
-			return future;
-		} catch(IOException e){
-			LOG.info("ERROR: RPC failed, messagesSend " + rpcEndpoint.getMessagesSent() + ", messagesReceived " + rpcEndpoint.getMessagesReceived() + ", isConnected " + rpcEndpoint.isConnected() + ", qpNum " + rpcEndpoint.getQp().getQp_num());
-			throw e;
+			if (namenodeClientGroup != null){
+				namenodeClientGroup.close();
+				namenodeClientGroup = null;
+			}
+		} catch(Exception e){
+			e.printStackTrace();
+			LOG.info("Error while closing " + e.getMessage());
 		}
 	}
+
 }
diff --git a/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
new file mode 100644
index 0000000..96a2ea3
--- /dev/null
+++ b/rpc-darpc/src/main/java/com/ibm/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
@@ -0,0 +1,301 @@
+/*
+ * Crail: A Multi-tiered Distributed Direct Access File System
+ *
+ * Author: Patrick Stuedi <stu@zurich.ibm.com>
+ *
+ * Copyright (C) 2016, IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.ibm.crail.namenode.rpc.darpc;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+
+import com.ibm.crail.CrailNodeType;
+import com.ibm.crail.conf.CrailConstants;
+import com.ibm.crail.metadata.BlockInfo;
+import com.ibm.crail.metadata.DataNodeInfo;
+import com.ibm.crail.metadata.FileInfo;
+import com.ibm.crail.metadata.FileName;
+import com.ibm.crail.rpc.RpcConnection;
+import com.ibm.crail.rpc.RpcCreateFile;
+import com.ibm.crail.rpc.RpcDeleteFile;
+import com.ibm.crail.rpc.RpcGetBlock;
+import com.ibm.crail.rpc.RpcGetDataNode;
+import com.ibm.crail.rpc.RpcGetFile;
+import com.ibm.crail.rpc.RpcGetLocation;
+import com.ibm.crail.rpc.RpcFuture;
+import com.ibm.crail.rpc.RpcPing;
+import com.ibm.crail.rpc.RpcProtocol;
+import com.ibm.crail.rpc.RpcRenameFile;
+import com.ibm.crail.rpc.RpcRequestMessage;
+import com.ibm.crail.rpc.RpcResponseMessage;
+import com.ibm.crail.rpc.RpcVoid;
+import com.ibm.crail.utils.CrailUtils;
+import com.ibm.darpc.DaRPCClientEndpoint;
+import com.ibm.darpc.DaRPCFuture;
+import com.ibm.darpc.DaRPCStream;
+
+public class DaRPCNameNodeConnection implements RpcConnection {
+	private static final Logger LOG = CrailUtils.getLogger();
+	
+	private DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> rpcEndpoint;
+	private DaRPCStream<DaRPCNameNodeRequest, DaRPCNameNodeResponse> stream;
+	
+	public DaRPCNameNodeConnection(DaRPCClientEndpoint<DaRPCNameNodeRequest, DaRPCNameNodeResponse> endpoint) throws IOException {
+		this.rpcEndpoint = endpoint;
+		this.stream = endpoint.createStream();
+	}	
+	
+	@Override
+	public RpcFuture<RpcCreateFile> createFile(FileName filename, CrailNodeType type, int storageClass, int locationClass) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: createFile, fileType " + type + ", storageClass " + storageClass + ", locationClass " + locationClass);
+		}
+		
+		RpcRequestMessage.CreateFileReq createFileReq = new RpcRequestMessage.CreateFileReq(filename, type, storageClass, locationClass);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(createFileReq);
+		request.setCommand(RpcProtocol.CMD_CREATE_FILE);
+		
+		RpcResponseMessage.CreateFileRes fileRes = new RpcResponseMessage.CreateFileRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcCreateFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcCreateFile>(future, fileRes);
+		
+		return nameNodeFuture;
+	}
+	
+	@Override
+	public RpcFuture<RpcGetFile> getFile(FileName filename, boolean writeable) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: getFile, writeable " + writeable);
+		}
+		
+		RpcRequestMessage.GetFileReq getFileReq = new RpcRequestMessage.GetFileReq(filename, writeable);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getFileReq);
+		request.setCommand(RpcProtocol.CMD_GET_FILE);
+
+		RpcResponseMessage.GetFileRes fileRes = new RpcResponseMessage.GetFileRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcGetFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetFile>(future, fileRes);
+		
+		return nameNodeFuture;
+	}
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcVoid> setFile(FileInfo fileInfo, boolean close) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: setFile, id " + fileInfo.getFd() + ", close " + close);
+		}
+		
+		RpcRequestMessage.SetFileReq setFileReq = new RpcRequestMessage.SetFileReq(fileInfo, close);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setFileReq);
+		request.setCommand(RpcProtocol.CMD_SET_FILE);
+		
+		RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
+		
+		return nameNodeFuture;		
+	}
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcDeleteFile> removeFile(FileName filename, boolean recursive) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: removeFile");
+		}
+		
+		RpcRequestMessage.RemoveFileReq removeReq = new RpcRequestMessage.RemoveFileReq(filename, recursive);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeReq);
+		request.setCommand(RpcProtocol.CMD_REMOVE_FILE);
+		
+		RpcResponseMessage.DeleteFileRes fileRes = new RpcResponseMessage.DeleteFileRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(fileRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcDeleteFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcDeleteFile>(future, fileRes);
+		
+		return nameNodeFuture;			
+	}
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcRenameFile> renameFile(FileName srcHash, FileName dstHash) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: renameFile");
+		}
+		
+		RpcRequestMessage.RenameFileReq renameReq = new RpcRequestMessage.RenameFileReq(srcHash, dstHash);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(renameReq);
+		request.setCommand(RpcProtocol.CMD_RENAME_FILE);
+		
+		RpcResponseMessage.RenameRes renameRes = new RpcResponseMessage.RenameRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(renameRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcRenameFile> nameNodeFuture = new DaRPCNameNodeFuture<RpcRenameFile>(future, renameRes);
+		
+		return nameNodeFuture;	
+	}
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcGetBlock> getBlock(long fd, long token, long position, long capacity) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: getBlock, fd " + fd + ", token " + token + ", position " + position + ", capacity " + capacity);
+		}
+		
+		RpcRequestMessage.GetBlockReq getBlockReq = new RpcRequestMessage.GetBlockReq(fd, token, position, capacity);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getBlockReq);
+		request.setCommand(RpcProtocol.CMD_GET_BLOCK);
+		
+		RpcResponseMessage.GetBlockRes getBlockRes = new RpcResponseMessage.GetBlockRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getBlockRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcGetBlock> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetBlock>(future, getBlockRes);
+		
+		return nameNodeFuture;	
+	}
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcGetLocation> getLocation(FileName fileName, long position) throws IOException {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: getLocation, position " + position);
+		}		
+		
+		RpcRequestMessage.GetLocationReq getLocationReq = new RpcRequestMessage.GetLocationReq(fileName, position);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getLocationReq);
+		request.setCommand(RpcProtocol.CMD_GET_LOCATION);
+
+		RpcResponseMessage.GetLocationRes getLocationRes = new RpcResponseMessage.GetLocationRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getLocationRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcGetLocation> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetLocation>(future, getLocationRes);
+		
+		return nameNodeFuture;			
+	}	
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcVoid> setBlock(BlockInfo blockInfo) throws Exception {
+		if (CrailConstants.DEBUG){
+			LOG.debug("RPC: setBlock, ");
+		}		
+		
+		RpcRequestMessage.SetBlockReq setBlockReq = new RpcRequestMessage.SetBlockReq(blockInfo);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(setBlockReq);
+		request.setCommand(RpcProtocol.CMD_SET_BLOCK);
+		
+		RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
+		
+		return nameNodeFuture;	
+	}
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcGetDataNode> getDataNode(DataNodeInfo dnInfo) throws Exception {
+		RpcRequestMessage.GetDataNodeReq getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(dnInfo);
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(getDataNodeReq);
+		request.setCommand(RpcProtocol.CMD_GET_DATANODE);
+		
+		RpcResponseMessage.GetDataNodeRes getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(getDataNodeRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcGetDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcGetDataNode>(future, getDataNodeRes);
+		
+		return nameNodeFuture;	
+	}	
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcVoid> dumpNameNode() throws Exception {
+		
+		
+		RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(dumpNameNodeReq);
+		request.setCommand(RpcProtocol.CMD_DUMP_NAMENODE);
+
+		RpcResponseMessage.VoidRes voidRes = new RpcResponseMessage.VoidRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(voidRes);	
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcVoid> nameNodeFuture = new DaRPCNameNodeFuture<RpcVoid>(future, voidRes);
+		
+		return nameNodeFuture;	
+	}	
+	
+	@Override
+	public DaRPCNameNodeFuture<RpcPing> pingNameNode() throws Exception {
+		
+		RpcRequestMessage.PingNameNodeReq pingReq = new RpcRequestMessage.PingNameNodeReq();
+		DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(pingReq);
+		request.setCommand(RpcProtocol.CMD_PING_NAMENODE);
+
+		RpcResponseMessage.PingNameNodeRes pingRes = new RpcResponseMessage.PingNameNodeRes();
+		DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(pingRes);
+		
+		DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response);
+		
+		DaRPCNameNodeFuture<RpcPing> nameNodeFuture = new DaRPCNameNodeFuture<RpcPing>(future, pingRes);
+		
+		return nameNodeFuture;	
+	}
+	
+	@Override
+	public void close() throws Exception {
+		if (rpcEndpoint != null){
+			rpcEndpoint.close();
+			rpcEndpoint = null;
+		}		
+	}
+
+	private DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> issueRPC(DaRPCNameNodeRequest request, DaRPCNameNodeResponse response) throws IOException{
+		try {
+			DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = stream.request(request, response, false);
+			return future;
+		} catch(IOException e){
+			LOG.info("ERROR: RPC failed, messagesSend " + rpcEndpoint.getMessagesSent() + ", messagesReceived " + rpcEndpoint.getMessagesReceived() + ", isConnected " + rpcEndpoint.isConnected() + ", qpNum " + rpcEndpoint.getQp().getQp_num());
+			throw e;
+		}
+	}
+
+	@Override
+	public String toString() {
+		try {
+			return rpcEndpoint.getDstAddr().toString();
+		} catch(Exception e){
+			return "Unknown";
+		}
+	}
+}
diff --git a/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java b/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java
index b5ba5f9..62d339f 100644
--- a/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java
+++ b/rpc/src/main/java/com/ibm/crail/rpc/RpcNameNodeService.java
@@ -22,6 +22,7 @@
 package com.ibm.crail.rpc;
 
 public interface RpcNameNodeService {
+	
 	public abstract short createFile(RpcRequestMessage.CreateFileReq request,
 			RpcResponseMessage.CreateFileRes response, RpcNameNodeState errorState)
 			throws Exception;
@@ -67,4 +68,16 @@
 	public abstract short ping(RpcRequestMessage.PingNameNodeReq request,
 			RpcResponseMessage.PingNameNodeRes response, RpcNameNodeState errorState)
 			throws Exception;
+	
+	@SuppressWarnings("unchecked")
+	public static RpcNameNodeService createInstance(String name) throws Exception {
+		Class<?> serviceClass = Class.forName(name);
+		if (RpcNameNodeService.class.isAssignableFrom(serviceClass)){
+			Class<? extends RpcNameNodeService> rpcService = (Class<? extends RpcNameNodeService>) serviceClass;
+			RpcNameNodeService service = rpcService.newInstance();
+			return service;
+		} else {
+			throw new Exception("Cannot instantiate RPC service of type " + name);
+		}
+	}	
 }
diff --git a/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java b/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java
index b189a4f..0e5b338 100644
--- a/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java
+++ b/rpc/src/main/java/com/ibm/crail/rpc/RpcResponseMessage.java
@@ -24,7 +24,6 @@
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-
 import com.ibm.crail.metadata.BlockInfo;
 import com.ibm.crail.metadata.DataNodeStatistics;
 import com.ibm.crail.metadata.FileInfo;
@@ -592,6 +591,10 @@
 		
 		public short getError(){
 			return 0;
+		}
+
+		public void setServiceId(long serviceId) {
+			this.statistics.setServiceId(serviceId);
 		}		
 	}	
 	
diff --git a/storage/src/main/java/com/ibm/crail/storage/StorageServer.java b/storage/src/main/java/com/ibm/crail/storage/StorageServer.java
index c03d23b..fef5458 100644
--- a/storage/src/main/java/com/ibm/crail/storage/StorageServer.java
+++ b/storage/src/main/java/com/ibm/crail/storage/StorageServer.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -41,6 +42,7 @@
 import com.ibm.crail.metadata.DataNodeStatistics;
 import com.ibm.crail.rpc.RpcClient;
 import com.ibm.crail.rpc.RpcConnection;
+import com.ibm.crail.rpc.RpcDispatcher;
 import com.ibm.crail.utils.CrailUtils;
 
 public interface StorageServer {
@@ -122,29 +124,63 @@
 		storageTier.init(conf, extraParams);
 		storageTier.printConf(LOG);		
 		
-		InetSocketAddress nnAddr = CrailUtils.getNameNodeAddress();
 		RpcClient rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
 		rpcClient.init(conf, args);
 		rpcClient.printConf(LOG);					
-		RpcConnection rpcConnection = rpcClient.connect(nnAddr);
-		LOG.info("connected to namenode at " + nnAddr);				
+		
+		ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
+		ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
+		while(!namenodeList.isEmpty()){
+			InetSocketAddress address = namenodeList.poll();
+			RpcConnection connection = rpcClient.connect(address);
+			connectionList.add(connection);
+		}
+		RpcConnection rpcConnection = connectionList.peek();
+		if (connectionList.size() > 1){
+			rpcConnection = new RpcDispatcher(connectionList);
+		}		
+		LOG.info("connected to namenode(s) " + rpcConnection.toString());				
 		
 		StorageServer server = storageTier.launchServer();
 		StorageRpcClient storageRpc = new StorageRpcClient(storageType, CrailStorageClass.get(storageClass), server.getAddress(), rpcConnection);
 		
+		HashMap<Long, Long> blockCount = new HashMap<Long, Long>();
+		long sumCount = 0;
 		while (server.isAlive()) {
 			StorageResource resource = server.allocateResource();
 			if (resource == null){
 				break;
 			} else {
 				storageRpc.setBlock(resource.getAddress(), resource.getLength(), resource.getKey());
-				LOG.info("datanode statistics, freeBlocks " + storageRpc.getDataNode().getFreeBlockCount());		
+				DataNodeStatistics stats = storageRpc.getDataNode();
+				long newCount = stats.getFreeBlockCount();
+				long serviceId = stats.getServiceId();
+				
+				long oldCount = 0;
+				if (blockCount.containsKey(serviceId)){
+					oldCount = blockCount.get(serviceId);
+				}
+				long diffCount = newCount - oldCount;
+				blockCount.put(serviceId, newCount);
+				sumCount += diffCount;
+				LOG.info("datanode statistics, freeBlocks " + sumCount);		
 			}
 		}
 		
 		while (server.isAlive()) {
-			DataNodeStatistics statistics = storageRpc.getDataNode();
-			LOG.info("datanode statistics, freeBlocks " + statistics.getFreeBlockCount());
+			DataNodeStatistics stats = storageRpc.getDataNode();
+			long newCount = stats.getFreeBlockCount();
+			long serviceId = stats.getServiceId();
+			
+			long oldCount = 0;
+			if (blockCount.containsKey(serviceId)){
+				oldCount = blockCount.get(serviceId);
+			}
+			long diffCount = newCount - oldCount;
+			blockCount.put(serviceId, newCount);
+			sumCount += diffCount;			
+			
+			LOG.info("datanode statistics, freeBlocks " + sumCount);
 			Thread.sleep(2000);
 		}			
 	}