[Resource Elasticity] 2/3 Remove datanode mechanism
* implement mechanism to remove running datanodes
* minor changes
* rename private method
* small changes
* seperate removal method into prepare and remove parts
* add exit statement
diff --git a/bin/crail b/bin/crail
index 8b9e97b..16a5637 100755
--- a/bin/crail
+++ b/bin/crail
@@ -27,6 +27,7 @@
echo " where COMMAND is one of:"
echo " namenode run the Crail namenode"
echo " datanode run a Crail datanode"
+ echo " removeDatanode remove a Crail datanode"
echo " fsck run a Crail file check command"
echo " fs run a Crail shell command"
echo " iobench run a Crail benchmark/test"
@@ -53,6 +54,8 @@
CLASS=org.apache.crail.namenode.NameNode
elif [ "$COMMAND" = "datanode" ] ; then
CLASS=org.apache.crail.storage.StorageServer
+elif [ "$COMMAND" = "removeDatanode" ] ; then
+ CLASS=org.apache.crail.tools.RemoveDataNode
elif [ "$COMMAND" = "fsck" ] ; then
CLASS=org.apache.crail.tools.CrailFsck
elif [ "$COMMAND" = "fs" ] ; then
diff --git a/client/src/main/java/org/apache/crail/tools/RemoveDataNode.java b/client/src/main/java/org/apache/crail/tools/RemoveDataNode.java
new file mode 100644
index 0000000..ac3a2c7
--- /dev/null
+++ b/client/src/main/java/org/apache/crail/tools/RemoveDataNode.java
@@ -0,0 +1,82 @@
+package org.apache.crail.tools;
+
+import org.apache.commons.cli.*;
+import org.apache.crail.conf.CrailConfiguration;
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.*;
+import org.apache.crail.utils.CrailUtils;
+import org.slf4j.Logger;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+
+public class RemoveDataNode {
+
+ public static void main(String[] args) throws Exception {
+
+ InetAddress ipaddr = null;
+ int port = -1;
+
+ Option ipOption = Option.builder("i").desc("Ip address").hasArg().required().build();
+ Option portOption = Option.builder("p").desc("port").hasArg().required().build();
+
+ Options options = new Options();
+ options.addOption(ipOption);
+ options.addOption(portOption);
+
+ HelpFormatter formatter = new HelpFormatter();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine line = parser.parse(options, args);
+
+ try {
+ ipaddr = InetAddress.getByName(line.getOptionValue(ipOption.getOpt()));
+ port = Integer.parseInt(line.getOptionValue(portOption.getOpt()));
+ } catch(Exception e) {
+ formatter.printHelp("RemoveDataNode", options);
+ System.exit(-1);
+ }
+
+ Logger LOG = CrailUtils.getLogger();
+ CrailConfiguration conf = CrailConfiguration.createConfigurationFromFile();
+ CrailConstants.updateConstants(conf);
+ CrailConstants.printConf();
+ CrailConstants.verify();
+ RpcClient rpcClient = RpcClient.createInstance(CrailConstants.NAMENODE_RPC_TYPE);
+
+ rpcClient.init(conf, null);
+ rpcClient.printConf(LOG);
+
+ ConcurrentLinkedQueue<InetSocketAddress> namenodeList = CrailUtils.getNameNodeList();
+ ConcurrentLinkedQueue<RpcConnection> connectionList = new ConcurrentLinkedQueue<RpcConnection>();
+ while(!namenodeList.isEmpty()){
+ InetSocketAddress address = namenodeList.poll();
+ RpcConnection connection = rpcClient.connect(address);
+ connectionList.add(connection);
+ }
+ RpcConnection rpcConnection = connectionList.peek();
+ if (connectionList.size() > 1){
+ rpcConnection = new RpcDispatcher(connectionList);
+ }
+ LOG.info("connected to namenode(s) " + rpcConnection.toString());
+
+ LOG.info("Trying to remove datanode at " + ipaddr.getHostName() + ":" + port);
+
+ Future<RpcRemoveDataNode> res = rpcConnection.removeDataNode(ipaddr, port);
+ short response = res.get().getRpcStatus();
+
+ rpcConnection.close();
+
+ if(response == RpcErrors.ERR_DATANODE_NOT_REGISTERED) {
+ LOG.info("Datanode running at " + ipaddr.getHostAddress() + ":" + port + " is not registered at NameNode");
+ } else if(response == RpcErrors.ERR_OK) {
+ LOG.info("Datanode running at " + ipaddr.getHostAddress() + ":" + port + " was scheduled for removal");
+ } else {
+ throw new Exception("Unexpected error code in RPC response");
+ }
+
+
+ }
+}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
index 0b98981..fa32ffb 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
@@ -18,6 +18,7 @@
package org.apache.crail.namenode;
+import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
@@ -84,7 +85,28 @@
int storageClass = dnInfo.getStorageClass();
return storageClasses[storageClass].getDataNode(dnInfo);
}
-
+
+ short removeDataNode(DataNodeInfo dn) throws IOException {
+ int storageClass = dn.getStorageClass();
+ return storageClasses[storageClass].removeDatanode(dn);
+ }
+
+ short prepareDataNodeForRemoval(DataNodeInfo dn) throws IOException {
+
+ // Despite several potential storageClasses the pair (Ip-addr,port) should
+ // nevertheless target only one running datanode instance.
+ // Therefore we can iterate over all storageClasses to check whether
+ // the requested datanode is part of one of the storageClasses.
+ for(StorageClass storageClass : storageClasses) {
+ if (storageClass.getDataNode(dn) != null) {
+ return storageClass.prepareForRemovalDatanode(dn);
+ }
+ }
+
+ LOG.error("DataNode: " + dn.toString() + " not found");
+ return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
+ }
+
}
class StorageClass {
@@ -171,9 +193,36 @@
return RpcErrors.ERR_OK;
}
-
+
+ short prepareForRemovalDatanode(DataNodeInfo dn) throws IOException {
+
+ // this will only mark the datanode for removal
+ DataNodeBlocks toBeRemoved = membership.get(dn.key());
+ if (toBeRemoved == null) {
+ LOG.error("DataNode: " + dn.toString() + " not found");
+ return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
+ } else {
+ toBeRemoved.scheduleForRemoval();
+ return RpcErrors.ERR_OK;
+ }
+ }
+
+ short removeDatanode(DataNodeInfo dn) throws IOException {
+
+ // this will remove the datanode once it does not store any remaining data blocks
+ DataNodeBlocks toBeRemoved = membership.get(dn.key());
+ if (toBeRemoved == null) {
+ LOG.error("DataNode: " + dn.toString() + " not found");
+ return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
+ } else {
+ membership.remove(toBeRemoved.key());
+ return RpcErrors.ERR_OK;
+ }
+ }
+
//---------------
-
+
+
private void _addDataNode(DataNodeBlocks dataNode){
LOG.info("adding datanode " + CrailUtils.getIPAddressFromBytes(dataNode.getIpAddress()) + ":" + dataNode.getPort() + " of type " + dataNode.getStorageType() + " to storage class " + storageClass);
DataNodeArray hostMap = affinitySets.get(dataNode.getLocationClass());
@@ -256,7 +305,7 @@
for (int i = 0; i < size; i++){
int index = (startIndex + i) % size;
DataNodeBlocks anyDn = arrayList.get(index);
- if (anyDn.isOnline()){
+ if (anyDn.isOnline() && !anyDn.isScheduleForRemoval()){
block = anyDn.getFreeBlock();
}
if (block != null){
diff --git a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
index c91468f..58e4ab0 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
@@ -36,7 +36,9 @@
private ConcurrentHashMap<Long, BlockInfo> regions;
private LinkedBlockingQueue<NameNodeBlockInfo> freeBlocks;
private long token;
-
+ private long maxBlockCount;
+ private boolean scheduleForRemoval;
+
public static DataNodeBlocks fromDataNodeInfo(DataNodeInfo dnInfo) throws UnknownHostException{
DataNodeBlocks dnInfoNn = new DataNodeBlocks(dnInfo.getStorageType(), dnInfo.getStorageClass(), dnInfo.getLocationClass(), dnInfo.getIpAddress(), dnInfo.getPort());
return dnInfoNn;
@@ -46,20 +48,46 @@
super(storageType, getStorageClass, locationClass, ipAddress, port);
this.regions = new ConcurrentHashMap<Long, BlockInfo>();
this.freeBlocks = new LinkedBlockingQueue<NameNodeBlockInfo>();
+ this.scheduleForRemoval = false;
+ this.maxBlockCount = 0;
+ }
+
+ private void updateBlockCount(){
+
+ // When a datanode connects for the first time to the namenode, all of the offered storage capacities
+ // are added in the form of free blocks. By keeping track of this number (which grows block for block), we
+ // learn the maximum available capacity in this datanode. Only when the number of free blocks equals the number
+ // of all blocks, the datanode is safe to be removed.
+ if(freeBlocks.size() > this.maxBlockCount) {
+ this.maxBlockCount = freeBlocks.size();
+ }
}
public void addFreeBlock(NameNodeBlockInfo nnBlock) {
regions.put(nnBlock.getRegion().getLba(), nnBlock.getRegion());
freeBlocks.add(nnBlock);
+ updateBlockCount();
}
public NameNodeBlockInfo getFreeBlock() throws InterruptedException {
NameNodeBlockInfo block = this.freeBlocks.poll();
return block;
}
-
+
+ public void scheduleForRemoval() {
+ this.scheduleForRemoval = true;
+ }
+
+ public boolean safeForRemoval() {
+ return this.maxBlockCount == this.freeBlocks.size();
+ }
+
+ public boolean isScheduleForRemoval(){
+ return this.scheduleForRemoval;
+ }
+
public int getBlockCount() {
- return freeBlocks.size();
+ return this.freeBlocks.size();
}
public boolean regionExists(BlockInfo region) {
diff --git a/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java b/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java
index 7928341..11dda79 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/LogDispatcher.java
@@ -28,6 +28,7 @@
import org.apache.crail.rpc.RpcRequestMessage.GetFileReq;
import org.apache.crail.rpc.RpcRequestMessage.GetLocationReq;
import org.apache.crail.rpc.RpcRequestMessage.PingNameNodeReq;
+import org.apache.crail.rpc.RpcRequestMessage.RemoveDataNodeReq;
import org.apache.crail.rpc.RpcRequestMessage.RemoveFileReq;
import org.apache.crail.rpc.RpcRequestMessage.RenameFileReq;
import org.apache.crail.rpc.RpcRequestMessage.SetBlockReq;
@@ -39,6 +40,7 @@
import org.apache.crail.rpc.RpcResponseMessage.GetFileRes;
import org.apache.crail.rpc.RpcResponseMessage.GetLocationRes;
import org.apache.crail.rpc.RpcResponseMessage.PingNameNodeRes;
+import org.apache.crail.rpc.RpcResponseMessage.RemoveDataNodeRes;
import org.apache.crail.rpc.RpcResponseMessage.RenameRes;
import org.apache.crail.rpc.RpcResponseMessage.VoidRes;
@@ -135,5 +137,11 @@
RpcNameNodeState errorState) throws Exception {
return service.ping(request, response, errorState);
}
-
+
+ @Override
+ public short removeDataNode(RemoveDataNodeReq request, RemoveDataNodeRes response,
+ RpcNameNodeState errorState) throws Exception {
+ return service.removeDataNode(request, response, errorState);
+ }
+
}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
index 29d54d2..a5da526 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
@@ -27,10 +27,7 @@
import org.apache.crail.CrailNodeType;
import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.metadata.BlockInfo;
-import org.apache.crail.metadata.DataNodeInfo;
-import org.apache.crail.metadata.FileInfo;
-import org.apache.crail.metadata.FileName;
+import org.apache.crail.metadata.*;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.rpc.RpcNameNodeService;
import org.apache.crail.rpc.RpcNameNodeState;
@@ -412,7 +409,18 @@
if (dnInfoNn == null){
return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
}
-
+
+ if(dnInfoNn.isScheduleForRemoval()){
+ // verify that datanode does not store any remaining blocks
+ if(dnInfoNn.safeForRemoval()){
+ // remove datanode from internal datastructures and prepare response
+ blockStore.removeDataNode(dnInfo);
+ response.setServiceId(serviceId);
+ response.setStatus(DataNodeStatus.STATUS_DATANODE_STOP);
+ return RpcErrors.ERR_OK;
+ }
+ }
+
dnInfoNn.touch();
response.setServiceId(serviceId);
response.setFreeBlockCount(dnInfoNn.getBlockCount());
@@ -570,10 +578,26 @@
return RpcErrors.ERR_OK;
}
-
+
+ @Override
+ public short removeDataNode(RpcRequestMessage.RemoveDataNodeReq request, RpcResponseMessage.RemoveDataNodeRes response, RpcNameNodeState errorState) throws Exception {
+
+ DataNodeInfo dn_info = new DataNodeInfo(0,0,0,request.getIPAddress().getAddress(), request.port());
+
+ short res = prepareDataNodeForRemoval(dn_info);
+ response.setRpcStatus(res);
+
+ return RpcErrors.ERR_OK;
+ }
+
//--------------- helper functions
-
+
+ public short prepareDataNodeForRemoval(DataNodeInfo dn) throws Exception {
+ LOG.info("Removing data node: " + dn);
+ return blockStore.prepareDataNodeForRemoval(dn);
+ }
+
void appendToDeleteQueue(AbstractNode fileInfo) throws Exception {
if (fileInfo != null) {
fileInfo.setDelay(CrailConstants.TOKEN_EXPIRATION);
diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java
index 89a1e2c..f518708 100644
--- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java
+++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCServiceDispatcher.java
@@ -119,6 +119,9 @@
error = this.stats(request.pingNameNode(), response.pingNameNode(), response);
error = service.ping(request.pingNameNode(), response.pingNameNode(), response);
break;
+ case RpcProtocol.CMD_REMOVE_DATANODE:
+ error = service.removeDataNode(request.removeDataNode(), response.removeDataNode(), response);
+ break;
default:
error = RpcErrors.ERR_INVALID_RPC_CMD;
LOG.info("Rpc command not valid, opcode " + request.getCmd());
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java
index 4e2358c..4807294 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcDispatcher.java
@@ -81,6 +81,9 @@
case RpcProtocol.CMD_PING_NAMENODE:
error = service.ping(request.pingNameNode(), response.pingNameNode(), response);
break;
+ case RpcProtocol.CMD_REMOVE_DATANODE:
+ error = service.removeDataNode(request.removeDataNode(), response.removeDataNode(), response);
+ break;
default:
error = RpcErrors.ERR_INVALID_RPC_CMD;
LOG.info("Rpc command not valid, opcode " + request.getCmd());
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java b/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java
index 54dc7a8..26ac55c 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcNameNodeService.java
@@ -65,7 +65,10 @@
public abstract short ping(RpcRequestMessage.PingNameNodeReq request,
RpcResponseMessage.PingNameNodeRes response, RpcNameNodeState errorState)
throws Exception;
-
+
+ public abstract short removeDataNode(RpcRequestMessage.RemoveDataNodeReq request,
+ RpcResponseMessage.RemoveDataNodeRes response, RpcNameNodeState errorState) throws Exception;
+
@SuppressWarnings("unchecked")
public static RpcNameNodeService createInstance(String name) throws Exception {
Class<?> serviceClass = Class.forName(name);
diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
index b0924d3..7d64812 100644
--- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
+++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
@@ -101,6 +101,20 @@
}
@Override
+ public void prepareToShutDown(){
+
+ LOG.info("Preparing TCP-Storage server for shutdown");
+ this.alive = false;
+
+ try {
+ serverEndpoint.close();
+ serverGroup.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
public void run() {
try {
LOG.info("running TCP storage server, address " + address);
@@ -110,7 +124,11 @@
LOG.info("new connection " + endpoint.address());
}
} catch(Exception e){
- e.printStackTrace();
+ // if StorageServer is still marked as running output stacktrace
+ // otherwise this is expected behaviour
+ if(this.alive) {
+ e.printStackTrace();
+ }
}
}
diff --git a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
index f901f56..cc7fe02 100644
--- a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
+++ b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageServer.java
@@ -96,8 +96,11 @@
Thread.sleep(NvmfStorageConstants.KEEP_ALIVE_INTERVAL_MS);
controller.keepAlive();
} catch (Exception e) {
- e.printStackTrace();
- isAlive = false;
+ // if StorageServer is still marked as running output stacktrace
+ // otherwise this is expected behaviour
+ if(this.isAlive){
+ e.printStackTrace();
+ }
}
}
}
@@ -126,4 +129,18 @@
public boolean isAlive() {
return isAlive;
}
+
+ public void prepareToShutDown(){
+
+ LOG.info("Preparing Nvmf-Storage server for shutdown");
+
+ this.isAlive = false;
+
+ // stop jnvmf controller
+ try {
+ this.controller.free();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java b/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java
index e808864..324f5ca 100644
--- a/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java
+++ b/storage-rdma/src/main/java/org/apache/crail/storage/rdma/RdmaStorageServer.java
@@ -64,7 +64,7 @@
LOG.info("Configured network interface " + RdmaConstants.STORAGE_RDMA_INTERFACE + " cannot be found..exiting!!!");
return;
}
- this.datanodeGroup = new RdmaActiveEndpointGroup<RdmaStorageServerEndpoint>(-1, false, 1, 1, 1);
+ this.datanodeGroup = new RdmaActiveEndpointGroup<RdmaStorageServerEndpoint>(1000, false, 1, 1, 1);
this.datanodeServerEndpoint = datanodeGroup.createServerEndpoint();
datanodeGroup.init(new RdmaStorageEndpointFactory(datanodeGroup, this));
datanodeServerEndpoint.bind(serverAddr, RdmaConstants.STORAGE_RDMA_BACKLOG);
@@ -130,9 +130,12 @@
LOG.info("accepting client connection, conncount " + allEndpoints.size());
}
} catch(Exception e){
- e.printStackTrace();
+ // if StorageServer is still marked as running output stacktrace
+ // otherwise this is expected behaviour
+ if(this.isAlive) {
+ e.printStackTrace();
+ }
}
- this.isAlive = false;
}
@Override
@@ -144,4 +147,29 @@
public boolean isAlive() {
return isAlive;
}
+
+ public void prepareToShutDown(){
+
+ LOG.info("Preparing RDMA-Storage server for shutdown");
+ this.isAlive = false;
+
+ try {
+
+ // close all open clientEndpoints
+ for(RdmaEndpoint endpoint : this.allEndpoints.values()) {
+ this.close(endpoint);
+ }
+
+ // close datanodeServerEndpoint and notify to exit blocking accept
+ this.datanodeServerEndpoint.close();
+ synchronized(this.datanodeServerEndpoint) {this.datanodeServerEndpoint.notifyAll();}
+
+ // close datanodeGroup
+ this.datanodeGroup.close();
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+
+ }
}
diff --git a/storage/src/main/java/org/apache/crail/storage/StorageServer.java b/storage/src/main/java/org/apache/crail/storage/StorageServer.java
index 8345f44..404b207 100644
--- a/storage/src/main/java/org/apache/crail/storage/StorageServer.java
+++ b/storage/src/main/java/org/apache/crail/storage/StorageServer.java
@@ -36,15 +36,18 @@
import org.apache.crail.conf.CrailConfiguration;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.DataNodeStatistics;
+import org.apache.crail.metadata.DataNodeStatus;
import org.apache.crail.rpc.RpcClient;
import org.apache.crail.rpc.RpcConnection;
import org.apache.crail.rpc.RpcDispatcher;
+import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
public interface StorageServer extends Configurable, Runnable {
public abstract StorageResource allocateResource() throws Exception;
public abstract boolean isAlive();
+ public abstract void prepareToShutDown();
public abstract InetSocketAddress getAddress();
public static void main(String[] args) throws Exception {
@@ -175,6 +178,7 @@
DataNodeStatistics stats = storageRpc.getDataNode();
long newCount = stats.getFreeBlockCount();
long serviceId = stats.getServiceId();
+ short status = stats.getStatus().getStatus();
long oldCount = 0;
if (blockCount.containsKey(serviceId)){
@@ -185,7 +189,22 @@
sumCount += diffCount;
LOG.info("datanode statistics, freeBlocks " + sumCount);
+ processStatus(server, rpcConnection, thread, status);
Thread.sleep(CrailConstants.STORAGE_KEEPALIVE*1000);
}
}
+
+ public static void processStatus(StorageServer server, RpcConnection rpc, Thread thread, short status) throws Exception {
+ if (status == DataNodeStatus.STATUS_DATANODE_STOP) {
+ server.prepareToShutDown();
+ rpc.close();
+
+ // interrupt sleeping thread
+ try {
+ thread.interrupt();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
}