[Resource Elasticity] 3/3 Elastic Namenode Service 

* implement simple autoscaling functionality

* renaming and small changes

* small style changes
diff --git a/client/src/main/java/org/apache/crail/conf/CrailConstants.java b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
index 310abc1..0ee25eb 100644
--- a/client/src/main/java/org/apache/crail/conf/CrailConstants.java
+++ b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
@@ -118,6 +118,25 @@
 	public static final String STORAGE_KEEPALIVE_KEY = "crail.storage.keepalive";
 	public static int STORAGE_KEEPALIVE = 2;
 
+	// elasticstore
+	public static final String ELASTICSTORE_SCALEUP_KEY = "crail.elasticstore.scaleup";
+	public static double ELASTICSTORE_SCALEUP = 0.4;
+
+	public static final String ELASTICSTORE_SCALEDOWN_KEY = "crail.elasticstore.scaledown";
+	public static double ELASTICSTORE_SCALEDOWN = 0.1;
+
+	public static final String ELASTICSTORE_MINNODES_KEY = "crail.elasticstore.minnodes";
+	public static int ELASTICSTORE_MINNODES = 1;
+	
+	public static final String ELASTICSTORE_MAXNODES_KEY = "crail.elasticstore.maxnodes";
+	public static int ELASTICSTORE_MAXNODES = 10;
+
+	public static final String ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY = "crail.elasticstore.policyrunner.interval";
+	public static int ELASTICSTORE_POLICYRUNNER_INTERVAL = 1000;
+
+	public static final String ELASTICSTORE_LOGGING_KEY = "crail.elasticstore.logging";
+	public static boolean ELASTICSTORE_LOGGING = false;
+
 	public static void updateConstants(CrailConfiguration conf){
 		//general
 		if (conf.get(DIRECTORY_DEPTH_KEY) != null) {
@@ -191,6 +210,9 @@
 		if (conf.get(NAMENODE_RPC_TYPE_KEY) != null) {
 			NAMENODE_RPC_TYPE = conf.get(NAMENODE_RPC_TYPE_KEY);
 		}
+		if (conf.get(NAMENODE_RPC_SERVICE_KEY) != null) {
+			NAMENODE_RPC_SERVICE = conf.get(NAMENODE_RPC_SERVICE_KEY);
+		}
 		if (conf.get(NAMENODE_LOG_KEY) != null) {
 			NAMENODE_LOG = conf.get(NAMENODE_LOG_KEY);
 		}
@@ -210,6 +232,29 @@
 		if (conf.get(STORAGE_KEEPALIVE_KEY) != null) {
 			STORAGE_KEEPALIVE = Integer.parseInt(conf.get(STORAGE_KEEPALIVE_KEY));
 		}
+
+		//elasticstore
+		if (conf.get(ELASTICSTORE_SCALEUP_KEY) != null) {
+			ELASTICSTORE_SCALEUP = Double.parseDouble(conf.get(ELASTICSTORE_SCALEUP_KEY));
+		}
+
+		if (conf.get(ELASTICSTORE_SCALEDOWN_KEY) != null) {
+			ELASTICSTORE_SCALEDOWN = Double.parseDouble(conf.get(ELASTICSTORE_SCALEDOWN_KEY));
+		}
+
+		if (conf.get(ELASTICSTORE_MINNODES_KEY) != null) {
+			ELASTICSTORE_MINNODES = Integer.parseInt(conf.get(ELASTICSTORE_MINNODES_KEY));
+		}
+
+		if (conf.get(ELASTICSTORE_MAXNODES_KEY) != null) {
+			ELASTICSTORE_MAXNODES = Integer.parseInt(conf.get(ELASTICSTORE_MAXNODES_KEY));
+		}
+		if (conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY) != null) {
+			ELASTICSTORE_POLICYRUNNER_INTERVAL = Integer.parseInt(conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY));
+		}
+		if (conf.get(ELASTICSTORE_LOGGING_KEY) != null) {
+			ELASTICSTORE_LOGGING = Boolean.parseBoolean(conf.get(ELASTICSTORE_LOGGING_KEY));
+		}
 	}
 
 	public static void printConf(){
@@ -237,11 +282,18 @@
 		LOG.info(NAMENODE_BLOCKSELECTION_KEY + " " + NAMENODE_BLOCKSELECTION);
 		LOG.info(NAMENODE_FILEBLOCKS_KEY + " " + NAMENODE_FILEBLOCKS);
 		LOG.info(NAMENODE_RPC_TYPE_KEY + " " + NAMENODE_RPC_TYPE);
+		LOG.info(NAMENODE_RPC_SERVICE_KEY + " " + NAMENODE_RPC_SERVICE);
 		LOG.info(NAMENODE_LOG_KEY + " " + NAMENODE_LOG);
 		LOG.info(STORAGE_TYPES_KEY + " " + STORAGE_TYPES);
 		LOG.info(STORAGE_CLASSES_KEY + " " + STORAGE_CLASSES);
 		LOG.info(STORAGE_ROOTCLASS_KEY + " " + STORAGE_ROOTCLASS);
 		LOG.info(STORAGE_KEEPALIVE_KEY + " " + STORAGE_KEEPALIVE);
+		LOG.info(ELASTICSTORE_SCALEUP_KEY + " " + ELASTICSTORE_SCALEUP);
+		LOG.info(ELASTICSTORE_SCALEDOWN_KEY + " " + ELASTICSTORE_SCALEDOWN);
+		LOG.info(ELASTICSTORE_MAXNODES_KEY + " " + ELASTICSTORE_MAXNODES);
+		LOG.info(ELASTICSTORE_MINNODES_KEY + " " + ELASTICSTORE_MINNODES);
+		LOG.info(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY + " " + ELASTICSTORE_POLICYRUNNER_INTERVAL);
+		LOG.info(ELASTICSTORE_LOGGING_KEY + " " + ELASTICSTORE_LOGGING);
 	}
 
 	public static void verify() throws IOException {
diff --git a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
index fa32ffb..7c616a2 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
@@ -20,7 +20,7 @@
 
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -97,7 +97,7 @@
 		// nevertheless target only one running datanode instance.
 		// Therefore we can iterate over all storageClasses to check whether
 		// the requested datanode is part of one of the storageClasses.
-		for(StorageClass storageClass : storageClasses) {
+		for (StorageClass storageClass : storageClasses) {
 			if (storageClass.getDataNode(dn) != null) {
 				return storageClass.prepareForRemovalDatanode(dn);
 			}
@@ -107,6 +107,85 @@
 		return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
 	}
 
+	public double getStorageUsedPercentage() throws Exception {
+		long total = 0;
+		long free = 0;
+		for (StorageClass storageClass : storageClasses) {
+			total += storageClass.getTotalBlockCount();
+			free += storageClass.getFreeBlockCount();
+		}
+
+		// if there is no available capacity (i.e. total number of available blocks is 0),
+		// return 1.0 which tells that all storage is used
+		if (total != 0) {
+			double available = (double) free / (double) total;
+			return 1.0 - available;
+		} else {
+			return 1.0;
+		}
+
+	}
+
+	public long getNumberOfBlocksUsed() throws Exception {
+		int total = 0;
+
+		for (StorageClass storageClass: storageClasses) {
+			total += (storageClass.getTotalBlockCount() - storageClass.getFreeBlockCount());
+		}
+
+		return total;
+	}
+
+	public long getNumberOfBlocks() throws Exception {
+		int total = 0;
+
+		for (StorageClass storageClass: storageClasses) {
+			total += storageClass.getTotalBlockCount();
+		}
+
+		return total;
+	}
+
+	public int getNumberOfRunningDatanodes() {
+		int total = 0;
+
+		for (StorageClass storageClass : storageClasses) {
+			total += storageClass.getNumberOfRunningDatanodes();
+		}
+
+		return total;
+	}
+
+	public DataNodeBlocks identifyRemoveCandidate() {
+
+		ArrayList<DataNodeBlocks> dataNodeBlocks = new ArrayList<DataNodeBlocks>();
+		for (StorageClass storageClass : storageClasses) {
+			dataNodeBlocks.addAll(storageClass.getDataNodeBlocks());
+		}
+
+		// sort all datanodes by increasing numbers of available datablocks
+		Collections.sort(dataNodeBlocks, new Comparator<DataNodeBlocks>() {
+			public int compare(DataNodeBlocks d1, DataNodeBlocks d2) {
+				if (d1.getBlockCount() < d2.getBlockCount()) {
+					return 1;
+				} else if (d1.getBlockCount() > d2.getBlockCount()) {
+					return -1;
+				} else return 0;
+			}
+		});
+
+		// iterate over datanodes and return first datanode which is not already scheduled for removal
+		for (DataNodeBlocks candidate: dataNodeBlocks) {
+			if (!candidate.isScheduleForRemoval()) {
+				return candidate;
+			}
+		}
+
+		// return null if there is no available candidate
+		return null;
+
+	}
+
 }
 
 class StorageClass {
@@ -124,6 +203,8 @@
 		this.affinitySets = new ConcurrentHashMap<Integer, DataNodeArray>();
 		if (CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("roundrobin")){
 			this.blockSelection = new RoundRobinBlockSelection();
+		} else if (CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("sequential")) {
+			this.blockSelection = new SequentialBlockSelection();
 		} else {
 			this.blockSelection = new RandomBlockSelection();
 		}
@@ -222,6 +303,33 @@
 
 	//---------------
 
+	public long getTotalBlockCount() {
+		long capacity = 0;
+
+		for (DataNodeBlocks datanode : membership.values()) {
+			capacity += datanode.getTotalNumberOfBlocks();
+		}
+
+		return capacity;
+	}
+
+	public long getFreeBlockCount() {
+		long capacity = 0;
+
+		for (DataNodeBlocks datanode : membership.values()) {
+			capacity += datanode.getBlockCount();
+		}
+
+		return capacity;
+	}
+
+	public Collection<DataNodeBlocks> getDataNodeBlocks() {
+		return this.membership.values();
+	}
+
+	public int getNumberOfRunningDatanodes() {
+		return this.membership.size();
+	}
 
 	private void _addDataNode(DataNodeBlocks dataNode){
 		LOG.info("adding datanode " + CrailUtils.getIPAddressFromBytes(dataNode.getIpAddress()) + ":" + dataNode.getPort() + " of type " + dataNode.getStorageType() + " to storage class " + storageClass);
@@ -273,7 +381,18 @@
 		public int getNext(int size) {
 			return ThreadLocalRandom.current().nextInt(size);
 		}
-	}	
+	}
+	
+	public class SequentialBlockSelection implements BlockSelection {
+		public SequentialBlockSelection(){
+			LOG.info("sequential block selection");
+		}
+
+		@Override
+		public int getNext(int size) {
+			return 0;
+		}
+	}
 	
 	private class DataNodeArray {
 		private ArrayList<DataNodeBlocks> arrayList;
diff --git a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
index 58e4ab0..d3d82ea 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
@@ -86,6 +86,10 @@
 		return this.scheduleForRemoval;
 	}
 
+	public long getTotalNumberOfBlocks() {
+		return this.maxBlockCount;
+	}
+
 	public int getBlockCount() {
 		return this.freeBlocks.size();
 	}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java
new file mode 100644
index 0000000..745916c
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java
@@ -0,0 +1,19 @@
+package org.apache.crail.namenode;
+
+import java.io.IOException;
+
+import org.apache.crail.conf.CrailConstants;
+
+public class ElasticNameNodeService extends NameNodeService {
+
+    PolicyRunner policyRunner;
+
+    public ElasticNameNodeService() throws IOException {
+
+        this.policyRunner = new FreeCapacityPolicy(this,
+                CrailConstants.ELASTICSTORE_SCALEUP,
+                CrailConstants.ELASTICSTORE_SCALEDOWN,
+                CrailConstants.ELASTICSTORE_MINNODES,
+                CrailConstants.ELASTICSTORE_MAXNODES);
+    }
+}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java
new file mode 100644
index 0000000..70615ac
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java
@@ -0,0 +1,75 @@
+package org.apache.crail.namenode;
+
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.RpcNameNodeService;
+
+public class FreeCapacityPolicy extends PolicyRunner {
+
+    double scaleUp;
+    double scaleDown;
+    int minDataNodes;
+    int maxDataNodes;
+    int datanodes; // maintains the desired number of datanodes (i.e. a datanode might be still starting / terminating)
+    boolean updated; // shows whether a launch/terminate datanode operation returned
+    long lastCapacity; // maintains the capacity that was available when the launch/terminate datanode operation was issued
+
+    FreeCapacityPolicy(RpcNameNodeService service, double scaleUp, double scaleDown, int minDataNodes, int maxDataNodes) {
+        super(service);
+        this.scaleUp = scaleUp;
+        this.scaleDown = scaleDown;
+        this.minDataNodes = minDataNodes;
+        this.maxDataNodes = maxDataNodes;
+        this.datanodes = 0;
+        this.updated = true;
+        this.lastCapacity = 0;
+    }
+
+    @Override
+    public void checkPolicy() {
+
+        try {
+
+            // log current usage information
+            double usage = this.service.getStorageUsedPercentage();
+
+            if (CrailConstants.ELASTICSTORE_LOGGING) {
+                LOG.info("Current block usage: " + this.service.getNumberOfBlocksUsed() + "/" + this.service.getNumberOfBlocks());
+                LOG.info("Current storage usage: " + 100*usage + "%");
+                LOG.info("Current number of datanodes: " + this.datanodes);
+            }
+
+            // check whether datanode launch/terminate operation finished
+            if (!this.updated && this.lastCapacity != this.service.getNumberOfBlocks()) {
+                this.updated = true;
+            }
+
+            // check whether scaling up or down is possible
+            if (this.updated) {
+                if (usage < scaleDown && this.datanodes > minDataNodes) {
+                    LOG.info("Scale down detected");
+    
+                    DataNodeBlocks removeCandidate = this.service.identifyRemoveCandidate();
+    
+                    if (removeCandidate != null) {
+                        this.lastCapacity = this.service.getNumberOfBlocks();
+                        this.updated = false;
+                        this.service.prepareDataNodeForRemoval(removeCandidate);
+                        this.datanodes--;
+                    }
+                }
+    
+                if (usage > this.scaleUp && this.datanodes < maxDataNodes) {
+                    LOG.info("Scale up detected");
+                    this.lastCapacity = this.service.getNumberOfBlocks();
+                    this.updated = false;
+                    launchDatanode();
+                    this.datanodes++;
+                }
+            }
+
+
+        } catch (Exception e) {
+            LOG.error("Unable to retrieve storage usage information");
+        }
+    }
+}
\ No newline at end of file
diff --git a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
index a5da526..beab3f1 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
@@ -552,6 +552,26 @@
 		return RpcErrors.ERR_OK;
 	}
 
+	public double getStorageUsedPercentage() throws Exception {
+		return this.blockStore.getStorageUsedPercentage();
+	}
+
+	public long getNumberOfBlocksUsed() throws Exception {
+		return this.blockStore.getNumberOfBlocksUsed();
+	}
+
+	public long getNumberOfBlocks() throws Exception {
+		return this.blockStore.getNumberOfBlocks();
+	}
+
+	public int getNumberOfRunningDatanodes() {
+		return this.blockStore.getNumberOfRunningDatanodes();
+	}
+
+	public DataNodeBlocks identifyRemoveCandidate() {
+		return  this.blockStore.identifyRemoveCandidate();
+	}
+
 	//------------------------
 	
 	@Override
diff --git a/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java
new file mode 100644
index 0000000..8ce0ea2
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java
@@ -0,0 +1,51 @@
+package org.apache.crail.namenode;
+
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.RpcNameNodeService;
+import org.apache.crail.utils.CrailUtils;
+import org.slf4j.Logger;
+
+public abstract class PolicyRunner implements Runnable {
+
+    static final Logger LOG = CrailUtils.getLogger();
+    NameNodeService service;
+    int instances = 0;
+
+    PolicyRunner(RpcNameNodeService service){
+        this.service = (NameNodeService) service;
+        Thread runner = new Thread(this);
+        runner.start();
+    }
+
+    public abstract void checkPolicy();
+
+    public void run() {
+
+        while (true) {
+            checkPolicy();
+
+            try {
+                Thread.sleep(CrailConstants.ELASTICSTORE_POLICYRUNNER_INTERVAL);
+            } catch(Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    public void launchDatanode() {
+
+        try {
+            String port = Integer.toString(50020+this.instances);
+            Process p = new ProcessBuilder(System.getenv("CRAIL_HOME") + "/bin/crail", "datanode", "--",  "-p" + port).start();
+
+            LOG.info("Launched new datanode instance");
+            this.instances++;
+
+        } catch(Exception e) {
+            LOG.error("Unable to launch datanode");
+            e.printStackTrace();
+        }
+
+    }
+}