[Resource Elasticity] 2/3 Remove datanode mechanism

* implement mechanism to remove running datanodes

* minor changes

* rename private method

* small changes

* seperate removal method into prepare and remove parts

* add exit statement
diff --git a/bin/crail b/bin/crail
index 8b9e97b..16a5637 100755
--- a/bin/crail
+++ b/bin/crail
@@ -27,6 +27,7 @@
   echo "       where COMMAND is one of:"
   echo "  namenode             run the Crail namenode"
   echo "  datanode             run a Crail datanode"
+  echo "  removeDatanode       remove a Crail datanode"
   echo "  fsck                 run a Crail file check command"
   echo "  fs                   run a Crail shell command"
   echo "  iobench              run a Crail benchmark/test"
@@ -53,6 +54,8 @@
   CLASS=org.apache.crail.namenode.NameNode
 elif [ "$COMMAND" = "datanode" ] ; then
   CLASS=org.apache.crail.storage.StorageServer
+elif [ "$COMMAND" = "removeDatanode" ] ; then
+  CLASS=org.apache.crail.tools.RemoveDataNode
 elif [ "$COMMAND" = "fsck" ] ; then
   CLASS=org.apache.crail.tools.CrailFsck
 elif [ "$COMMAND" = "fs" ] ; then
diff --git a/client/src/main/java/org/apache/crail/tools/RemoveDataNode.java b/client/src/main/java/org/apache/crail/tools/RemoveDataNode.java
new file mode 100644
index 0000000..ac3a2c7
--- /dev/null
+++ b/client/src/main/java/org/apache/crail/tools/RemoveDataNode.java
@@ -0,0 +1,82 @@
+package org.apache.crail.tools;
+
+import org.apache.commons.cli.*;
+import org.apache.crail.conf.CrailConfiguration;
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.*;
+import org.apache.crail.utils.CrailUtils;
+import org.slf4j.Logger;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+
+public class RemoveDataNode {
+
+    public static void main(String[] args) throws Exception {
+
+        InetAddress ipaddr = null;
+        int port = -1;
+
+        Option ipOption = Option.builder("i").desc("Ip address").hasArg().required().build();
+        Option portOption = Option.builder("p").desc("port").hasArg().required().build();
+
+        Options options = new Options();
+        options.addOption(ipOption);
+        options.addOption(portOption);
+
+        HelpFormatter formatter = new HelpFormatter();
+        CommandLineParser parser = new DefaultParser();
+        CommandLine line = parser.parse(options, args);
+
+        try {
+            ipaddr = InetAddress.getByName(line.getOptionValue(ipOption.getOpt()));
+            port = Integer.parseInt(line.getOptionValue(portOption.getOpt()));
+        } catch(Exception e) {
+            formatter.printHelp("RemoveDataNode", options);
+            System.exit(-1);
+        }
+
+        Logger LOG = CrailUtils.getLogger();
+        CrailConfiguration conf = CrailConfiguration.createConfigurationFromFile();
+        CrailConstants.updateConstants(conf);
+        CrailConstants.printConf();
+        CrailConstants.verify();
+        RpcClient rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
+
+        rpcClient.init(conf, null);
+        rpcClient.printConf(LOG);
+
+        ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
+        ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
+        while(!namenodeList.isEmpty()){
+            InetSocketAddress address = namenodeList.poll();
+            RpcConnection connection = rpcClient.connect(address);
+            connectionList.add(connection);
+        }
+        RpcConnection rpcConnection = connectionList.peek();
+        if (connectionList.size() > 1){
+            rpcConnection = new RpcDispatcher(connectionList);
+        }
+        LOG.info("connected to namenode(s) " + rpcConnection.toString());
+
+        LOG.info("Trying to remove datanode at " + ipaddr.getHostName() + ":" + port);
+
+        Future<RpcRemoveDataNode> res = rpcConnection.removeDataNode(ipaddr, port);
+        short response = res.get().getRpcStatus();
+
+        rpcConnection.close();
+
+        if(response == RpcErrors.ERR_DATANODE_NOT_REGISTERED) {
+            LOG.info("Datanode running at " + ipaddr.getHostAddress() + ":" + port + " is not registered at NameNode");
+        } else if(response == RpcErrors.ERR_OK) {
+            LOG.info("Datanode running at " + ipaddr.getHostAddress() + ":" + port + " was scheduled for removal");
+        } else {
+            throw new Exception("Unexpected error code in RPC response");
+        }
+
+
+    }
+}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
index 0b98981..fa32ffb 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.crail.namenode;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
@@ -84,7 +85,28 @@
 		int storageClass = dnInfo.getStorageClass();
 		return storageClasses[storageClass].getDataNode(dnInfo);
 	}
-	
+
+	short removeDataNode(DataNodeInfo dn) throws IOException {
+		int storageClass = dn.getStorageClass();
+		return storageClasses[storageClass].removeDatanode(dn);
+	}
+
+	short prepareDataNodeForRemoval(DataNodeInfo dn) throws IOException {
+
+		// Despite several potential storageClasses the pair (Ip-addr,port) should
+		// nevertheless target only one running datanode instance.
+		// Therefore we can iterate over all storageClasses to check whether
+		// the requested datanode is part of one of the storageClasses.
+		for(StorageClass storageClass : storageClasses) {
+			if (storageClass.getDataNode(dn) != null) {
+				return storageClass.prepareForRemovalDatanode(dn);
+			}
+		}
+
+		LOG.error("DataNode: " + dn.toString() + " not found");
+		return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
+	}
+
 }
 
 class StorageClass {
@@ -171,9 +193,36 @@
 		return RpcErrors.ERR_OK;
 
 	}
-	
+
+	short prepareForRemovalDatanode(DataNodeInfo dn) throws IOException {
+
+		// this will only mark the datanode for removal
+		DataNodeBlocks toBeRemoved = membership.get(dn.key());
+		if (toBeRemoved == null) {
+			LOG.error("DataNode: " + dn.toString() + " not found");
+			return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
+		} else {
+			toBeRemoved.scheduleForRemoval();
+			return RpcErrors.ERR_OK;
+		}
+	}
+
+	short removeDatanode(DataNodeInfo dn) throws IOException {
+
+		// this will remove the datanode once it does not store any remaining data blocks
+		DataNodeBlocks toBeRemoved = membership.get(dn.key());
+		if (toBeRemoved == null) {
+			LOG.error("DataNode: " + dn.toString() + " not found");
+			return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
+		} else {
+			membership.remove(toBeRemoved.key());
+			return RpcErrors.ERR_OK;
+		}
+	}
+
 	//---------------
-	
+
+
 	private void _addDataNode(DataNodeBlocks dataNode){
 		LOG.info("adding datanode " + CrailUtils.getIPAddressFromBytes(dataNode.getIpAddress()) + ":" + dataNode.getPort() + " of type " + dataNode.getStorageType() + " to storage class " + storageClass);
 		DataNodeArray hostMap = affinitySets.get(dataNode.getLocationClass());
@@ -256,7 +305,7 @@
 					for (int i = 0; i < size; i++){
 						int index = (startIndex + i) % size;
 						DataNodeBlocks anyDn = arrayList.get(index);
-						if (anyDn.isOnline()){
+						if (anyDn.isOnline() && !anyDn.isScheduleForRemoval()){
 							block = anyDn.getFreeBlock();
 						}
 						if (block != null){
diff --git a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
index c91468f..58e4ab0 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
@@ -36,7 +36,9 @@
 	private ConcurrentHashMap<Long, BlockInfo> regions;
 	private LinkedBlockingQueue<NameNodeBlockInfo> freeBlocks;
 	private long token;
-	
+	private long maxBlockCount;
+	private boolean scheduleForRemoval;
+
 	public static DataNodeBlocks fromDataNodeInfo(DataNodeInfo dnInfo) throws UnknownHostException{
 		DataNodeBlocks dnInfoNn = new DataNodeBlocks(dnInfo.getStorageType(), dnInfo.getStorageClass(), dnInfo.getLocationClass(), dnInfo.getIpAddress(), dnInfo.getPort());
 		return dnInfoNn;
@@ -46,20 +48,46 @@
 		super(storageType, getStorageClass, locationClass, ipAddress, port);
 		this.regions = new ConcurrentHashMap<Long, BlockInfo>();
 		this.freeBlocks = new LinkedBlockingQueue<NameNodeBlockInfo>();
+		this.scheduleForRemoval = false;
+		this.maxBlockCount = 0;
+	}
+
+	private void updateBlockCount(){
+
+		// When a datanode connects for the first time to the namenode, all of the offered storage capacities
+		// are added in the form of free blocks. By keeping track of this number (which grows block for block), we
+		// learn the maximum available capacity in this datanode. Only when the number of free blocks equals the number
+		// of all blocks, the datanode is safe to be removed.
+		if(freeBlocks.size() > this.maxBlockCount) {
+			this.maxBlockCount = freeBlocks.size();
+		}
 	}
 	
 	public void addFreeBlock(NameNodeBlockInfo nnBlock) {
 		regions.put(nnBlock.getRegion().getLba(), nnBlock.getRegion());
 		freeBlocks.add(nnBlock);
+		updateBlockCount();
 	}
 
 	public NameNodeBlockInfo getFreeBlock() throws InterruptedException {
 		NameNodeBlockInfo block = this.freeBlocks.poll();
 		return block;
 	}
-	
+
+	public void scheduleForRemoval() {
+		this.scheduleForRemoval = true;
+	}
+
+	public boolean safeForRemoval() {
+		return  this.maxBlockCount == this.freeBlocks.size();
+	}
+
+	public boolean isScheduleForRemoval(){
+		return this.scheduleForRemoval;
+	}
+
 	public int getBlockCount() {
-		return freeBlocks.size();
+		return this.freeBlocks.size();
 	}
 
 	public boolean regionExists(BlockInfo region) {
diff --git a/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java b/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java
index 7928341..11dda79 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java
@@ -28,6 +28,7 @@
 import org.apache.crail.rpc.RpcRequestMessage.GetFileReq;
 import org.apache.crail.rpc.RpcRequestMessage.GetLocationReq;
 import org.apache.crail.rpc.RpcRequestMessage.PingNameNodeReq;
+import org.apache.crail.rpc.RpcRequestMessage.RemoveDataNodeReq;
 import org.apache.crail.rpc.RpcRequestMessage.RemoveFileReq;
 import org.apache.crail.rpc.RpcRequestMessage.RenameFileReq;
 import org.apache.crail.rpc.RpcRequestMessage.SetBlockReq;
@@ -39,6 +40,7 @@
 import org.apache.crail.rpc.RpcResponseMessage.GetFileRes;
 import org.apache.crail.rpc.RpcResponseMessage.GetLocationRes;
 import org.apache.crail.rpc.RpcResponseMessage.PingNameNodeRes;
+import org.apache.crail.rpc.RpcResponseMessage.RemoveDataNodeRes;
 import org.apache.crail.rpc.RpcResponseMessage.RenameRes;
 import org.apache.crail.rpc.RpcResponseMessage.VoidRes;
 
@@ -135,5 +137,11 @@
 			RpcNameNodeState errorState) throws Exception {
 		return service.ping(request, response, errorState);
 	}
-	
+
+	@Override
+	public short removeDataNode(RemoveDataNodeReq request, RemoveDataNodeRes response,
+			RpcNameNodeState errorState) throws Exception {
+		return service.removeDataNode(request, response, errorState);
+	}
+
 }
diff --git a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
index 29d54d2..a5da526 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
@@ -27,10 +27,7 @@
 
 import org.apache.crail.CrailNodeType;
 import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.metadata.DataNodeInfo;
-import org.apache.crail.metadata.FileInfo;
-import org.apache.crail.metadata.FileName;
+import org.apache.crail.metadata.*;
 import org.apache.crail.rpc.RpcErrors;
 import org.apache.crail.rpc.RpcNameNodeService;
 import org.apache.crail.rpc.RpcNameNodeState;
@@ -412,7 +409,18 @@
 		if (dnInfoNn == null){
 			return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
 		}
-		
+
+		if(dnInfoNn.isScheduleForRemoval()){
+			// verify that datanode does not store any remaining blocks
+			if(dnInfoNn.safeForRemoval()){
+				// remove datanode from internal datastructures and prepare response
+				blockStore.removeDataNode(dnInfo);
+				response.setServiceId(serviceId);
+				response.setStatus(DataNodeStatus.STATUS_DATANODE_STOP);
+				return RpcErrors.ERR_OK;
+			}
+		}
+
 		dnInfoNn.touch();
 		response.setServiceId(serviceId);
 		response.setFreeBlockCount(dnInfoNn.getBlockCount());
@@ -570,10 +578,26 @@
 		
 		return RpcErrors.ERR_OK;
 	}
-	
+
+	@Override
+	public short removeDataNode(RpcRequestMessage.RemoveDataNodeReq request, RpcResponseMessage.RemoveDataNodeRes response, RpcNameNodeState errorState) throws Exception {
+
+		DataNodeInfo dn_info = new DataNodeInfo(0,0,0,request.getIPAddress().getAddress(), request.port());
+
+		short res = prepareDataNodeForRemoval(dn_info);
+		response.setRpcStatus(res);
+
+		return RpcErrors.ERR_OK;
+	}
+
 	
 	//--------------- helper functions
-	
+
+	public short prepareDataNodeForRemoval(DataNodeInfo dn) throws Exception {
+		LOG.info("Removing data node: " + dn);
+		return blockStore.prepareDataNodeForRemoval(dn);
+	}
+
 	void appendToDeleteQueue(AbstractNode fileInfo) throws Exception {
 		if (fileInfo != null) {
 			fileInfo.setDelay(CrailConstants.TOKEN_EXPIRATION);
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java
index 89a1e2c..f518708 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java
@@ -119,6 +119,9 @@
 				error = this.stats(request.pingNameNode(), response.pingNameNode(), response);
 				error = service.ping(request.pingNameNode(), response.pingNameNode(), response);
 				break;
+			case RpcProtocol.CMD_REMOVE_DATANODE:
+				error = service.removeDataNode(request.removeDataNode(), response.removeDataNode(), response);
+				break;
 			default:
 				error = RpcErrors.ERR_INVALID_RPC_CMD;
 				LOG.info("Rpc command not valid, opcode " + request.getCmd());
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java
index 4e2358c..4807294 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java
@@ -81,6 +81,9 @@
 			case RpcProtocol.CMD_PING_NAMENODE:
 				error = service.ping(request.pingNameNode(), response.pingNameNode(), response);
 				break;
+			case RpcProtocol.CMD_REMOVE_DATANODE:
+				error = service.removeDataNode(request.removeDataNode(), response.removeDataNode(), response);
+				break;
 			default:
 				error = RpcErrors.ERR_INVALID_RPC_CMD;
 				LOG.info("Rpc command not valid, opcode " + request.getCmd());
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java b/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java
index 54dc7a8..26ac55c 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java
@@ -65,7 +65,10 @@
 	public abstract short ping(RpcRequestMessage.PingNameNodeReq request,
 			RpcResponseMessage.PingNameNodeRes response, RpcNameNodeState errorState)
 			throws Exception;
-	
+
+	public abstract short removeDataNode(RpcRequestMessage.RemoveDataNodeReq request,
+			 RpcResponseMessage.RemoveDataNodeRes response, RpcNameNodeState errorState) throws Exception;
+
 	@SuppressWarnings("unchecked")
 	public static RpcNameNodeService createInstance(String name) throws Exception {
 		Class<?> serviceClass = Class.forName(name);
diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
index b0924d3..7d64812 100644
--- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
+++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
@@ -101,6 +101,20 @@
 	}
 
 	@Override
+	public void prepareToShutDown(){
+
+		LOG.info("Preparing TCP-Storage server for shutdown");
+		this.alive = false;
+		
+		try {
+			serverEndpoint.close();
+			serverGroup.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	@Override
 	public void run() {
 		try {
 			LOG.info("running TCP storage server, address " + address);
@@ -110,7 +124,11 @@
 				LOG.info("new connection " + endpoint.address());
 			}
 		} catch(Exception e){
-			e.printStackTrace();
+			// if StorageServer is still marked as running output stacktrace
+			// otherwise this is expected behaviour
+			if(this.alive) {
+				e.printStackTrace();
+			}
 		}
 	}
 
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index f901f56..cc7fe02 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -96,8 +96,11 @@
 				Thread.sleep(NvmfStorageConstants.KEEP_ALIVE_INTERVAL_MS);
 				controller.keepAlive();
 			} catch (Exception e) {
-				e.printStackTrace();
-				isAlive = false;
+				// if StorageServer is still marked as running output stacktrace
+				// otherwise this is expected behaviour
+				if(this.isAlive){
+					e.printStackTrace();
+				}
 			}
 		}
 	}
@@ -126,4 +129,18 @@
 	public boolean isAlive() {
 		return isAlive;
 	}
+
+	public void prepareToShutDown(){
+
+		LOG.info("Preparing Nvmf-Storage server for shutdown");
+
+		this.isAlive = false;
+
+		// stop jnvmf controller
+		try {
+			this.controller.free();
+		} catch(Exception e) {
+			e.printStackTrace();
+		}
+	}
 }
diff --git a/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java b/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java
index e808864..324f5ca 100644
--- a/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java
+++ b/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java
@@ -64,7 +64,7 @@
 			LOG.info("Configured network interface " + RdmaConstants.STORAGE_RDMA_INTERFACE + " cannot be found..exiting!!!");
 			return;
 		}
-		this.datanodeGroup = new RdmaActiveEndpointGroup<RdmaStorageServerEndpoint>(-1, false, 1, 1, 1);
+		this.datanodeGroup = new RdmaActiveEndpointGroup<RdmaStorageServerEndpoint>(1000, false, 1, 1, 1);
 		this.datanodeServerEndpoint = datanodeGroup.createServerEndpoint();
 		datanodeGroup.init(new RdmaStorageEndpointFactory(datanodeGroup, this));
 		datanodeServerEndpoint.bind(serverAddr, RdmaConstants.STORAGE_RDMA_BACKLOG);
@@ -130,9 +130,12 @@
 				LOG.info("accepting client connection, conncount " + allEndpoints.size());
 			}			
 		} catch(Exception e){
-			e.printStackTrace();
+			// if StorageServer is still marked as running output stacktrace
+			// otherwise this is expected behaviour
+			if(this.isAlive) {
+				e.printStackTrace();
+			}
 		}
-		this.isAlive = false;
 	}
 
 	@Override
@@ -144,4 +147,29 @@
 	public boolean isAlive() {
 		return isAlive;
 	}
+
+	public void prepareToShutDown(){
+
+		LOG.info("Preparing RDMA-Storage server for shutdown");
+		this.isAlive = false;
+
+		try {
+
+			// close all open clientEndpoints
+			for(RdmaEndpoint endpoint : this.allEndpoints.values()) {
+				this.close(endpoint);
+			}
+
+			// close datanodeServerEndpoint and notify to exit blocking accept
+			this.datanodeServerEndpoint.close();
+			synchronized(this.datanodeServerEndpoint) {this.datanodeServerEndpoint.notifyAll();}
+
+			// close datanodeGroup
+			this.datanodeGroup.close();
+
+		} catch(Exception e) {
+			e.printStackTrace();
+		}
+
+	}
 }
diff --git a/storage/src/main/java/org/apache/crail/storage/StorageServer.java b/storage/src/main/java/org/apache/crail/storage/StorageServer.java
index 8345f44..404b207 100644
--- a/storage/src/main/java/org/apache/crail/storage/StorageServer.java
+++ b/storage/src/main/java/org/apache/crail/storage/StorageServer.java
@@ -36,15 +36,18 @@
 import org.apache.crail.conf.CrailConfiguration;
 import org.apache.crail.conf.CrailConstants;
 import org.apache.crail.metadata.DataNodeStatistics;
+import org.apache.crail.metadata.DataNodeStatus;
 import org.apache.crail.rpc.RpcClient;
 import org.apache.crail.rpc.RpcConnection;
 import org.apache.crail.rpc.RpcDispatcher;
+import org.apache.crail.rpc.RpcErrors;
 import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
 public interface StorageServer extends Configurable, Runnable {
 	public abstract StorageResource allocateResource() throws Exception;
 	public abstract boolean isAlive();
+	public abstract void prepareToShutDown();
 	public abstract InetSocketAddress getAddress();
 	
 	public static void main(String[] args) throws Exception {
@@ -175,6 +178,7 @@
 			DataNodeStatistics stats = storageRpc.getDataNode();
 			long newCount = stats.getFreeBlockCount();
 			long serviceId = stats.getServiceId();
+			short status = stats.getStatus().getStatus();
 			
 			long oldCount = 0;
 			if (blockCount.containsKey(serviceId)){
@@ -185,7 +189,22 @@
 			sumCount += diffCount;			
 			
 			LOG.info("datanode statistics, freeBlocks " + sumCount);
+			processStatus(server, rpcConnection, thread, status);
 			Thread.sleep(CrailConstants.STORAGE_KEEPALIVE*1000);
 		}			
 	}
+
+	public static void processStatus(StorageServer server, RpcConnection rpc, Thread thread, short status) throws Exception {
+		if (status == DataNodeStatus.STATUS_DATANODE_STOP) {
+			server.prepareToShutDown();
+			rpc.close();
+			
+			// interrupt sleeping thread
+			try {
+				thread.interrupt();
+			} catch(Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
 }