[Resource Elasticity] 3/3 Elastic Namenode Service
* implement simple autoscaling functionality
* renaming and small changes
* small style changes
diff --git a/client/src/main/java/org/apache/crail/conf/CrailConstants.java b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
index 310abc1..0ee25eb 100644
--- a/client/src/main/java/org/apache/crail/conf/CrailConstants.java
+++ b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
@@ -118,6 +118,25 @@
public static final String STORAGE_KEEPALIVE_KEY = "crail.storage.keepalive";
public static int STORAGE_KEEPALIVE = 2;
+ // elasticstore
+ public static final String ELASTICSTORE_SCALEUP_KEY = "crail.elasticstore.scaleup";
+ public static double ELASTICSTORE_SCALEUP = 0.4;
+
+ public static final String ELASTICSTORE_SCALEDOWN_KEY = "crail.elasticstore.scaledown";
+ public static double ELASTICSTORE_SCALEDOWN = 0.1;
+
+ public static final String ELASTICSTORE_MINNODES_KEY = "crail.elasticstore.minnodes";
+ public static int ELASTICSTORE_MINNODES = 1;
+
+ public static final String ELASTICSTORE_MAXNODES_KEY = "crail.elasticstore.maxnodes";
+ public static int ELASTICSTORE_MAXNODES = 10;
+
+ public static final String ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY = "crail.elasticstore.policyrunner.interval";
+ public static int ELASTICSTORE_POLICYRUNNER_INTERVAL = 1000;
+
+ public static final String ELASTICSTORE_LOGGING_KEY = "crail.elasticstore.logging";
+ public static boolean ELASTICSTORE_LOGGING = false;
+
public static void updateConstants(CrailConfiguration conf){
//general
if (conf.get(DIRECTORY_DEPTH_KEY) != null) {
@@ -191,6 +210,9 @@
if (conf.get(NAMENODE_RPC_TYPE_KEY) != null) {
NAMENODE_RPC_TYPE = conf.get(NAMENODE_RPC_TYPE_KEY);
}
+ if (conf.get(NAMENODE_RPC_SERVICE_KEY) != null) {
+ NAMENODE_RPC_SERVICE = conf.get(NAMENODE_RPC_SERVICE_KEY);
+ }
if (conf.get(NAMENODE_LOG_KEY) != null) {
NAMENODE_LOG = conf.get(NAMENODE_LOG_KEY);
}
@@ -210,6 +232,29 @@
if (conf.get(STORAGE_KEEPALIVE_KEY) != null) {
STORAGE_KEEPALIVE = Integer.parseInt(conf.get(STORAGE_KEEPALIVE_KEY));
}
+
+ //elasticstore
+ if (conf.get(ELASTICSTORE_SCALEUP_KEY) != null) {
+ ELASTICSTORE_SCALEUP = Double.parseDouble(conf.get(ELASTICSTORE_SCALEUP_KEY));
+ }
+
+ if (conf.get(ELASTICSTORE_SCALEDOWN_KEY) != null) {
+ ELASTICSTORE_SCALEDOWN = Double.parseDouble(conf.get(ELASTICSTORE_SCALEDOWN_KEY));
+ }
+
+ if (conf.get(ELASTICSTORE_MINNODES_KEY) != null) {
+ ELASTICSTORE_MINNODES = Integer.parseInt(conf.get(ELASTICSTORE_MINNODES_KEY));
+ }
+
+ if (conf.get(ELASTICSTORE_MAXNODES_KEY) != null) {
+ ELASTICSTORE_MAXNODES = Integer.parseInt(conf.get(ELASTICSTORE_MAXNODES_KEY));
+ }
+ if (conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY) != null) {
+ ELASTICSTORE_POLICYRUNNER_INTERVAL = Integer.parseInt(conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY));
+ }
+ if (conf.get(ELASTICSTORE_LOGGING_KEY) != null) {
+ ELASTICSTORE_LOGGING = Boolean.parseBoolean(conf.get(ELASTICSTORE_LOGGING_KEY));
+ }
}
public static void printConf(){
@@ -237,11 +282,18 @@
LOG.info(NAMENODE_BLOCKSELECTION_KEY + " " + NAMENODE_BLOCKSELECTION);
LOG.info(NAMENODE_FILEBLOCKS_KEY + " " + NAMENODE_FILEBLOCKS);
LOG.info(NAMENODE_RPC_TYPE_KEY + " " + NAMENODE_RPC_TYPE);
+ LOG.info(NAMENODE_RPC_SERVICE_KEY + " " + NAMENODE_RPC_SERVICE);
LOG.info(NAMENODE_LOG_KEY + " " + NAMENODE_LOG);
LOG.info(STORAGE_TYPES_KEY + " " + STORAGE_TYPES);
LOG.info(STORAGE_CLASSES_KEY + " " + STORAGE_CLASSES);
LOG.info(STORAGE_ROOTCLASS_KEY + " " + STORAGE_ROOTCLASS);
LOG.info(STORAGE_KEEPALIVE_KEY + " " + STORAGE_KEEPALIVE);
+ LOG.info(ELASTICSTORE_SCALEUP_KEY + " " + ELASTICSTORE_SCALEUP);
+ LOG.info(ELASTICSTORE_SCALEDOWN_KEY + " " + ELASTICSTORE_SCALEDOWN);
+ LOG.info(ELASTICSTORE_MAXNODES_KEY + " " + ELASTICSTORE_MAXNODES);
+ LOG.info(ELASTICSTORE_MINNODES_KEY + " " + ELASTICSTORE_MINNODES);
+ LOG.info(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY + " " + ELASTICSTORE_POLICYRUNNER_INTERVAL);
+ LOG.info(ELASTICSTORE_LOGGING_KEY + " " + ELASTICSTORE_LOGGING);
}
public static void verify() throws IOException {
diff --git a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
index fa32ffb..7c616a2 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -97,7 +97,7 @@
// nevertheless target only one running datanode instance.
// Therefore we can iterate over all storageClasses to check whether
// the requested datanode is part of one of the storageClasses.
- for(StorageClass storageClass : storageClasses) {
+ for (StorageClass storageClass : storageClasses) {
if (storageClass.getDataNode(dn) != null) {
return storageClass.prepareForRemovalDatanode(dn);
}
@@ -107,6 +107,85 @@
return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
}
+ public double getStorageUsedPercentage() throws Exception {
+ long total = 0;
+ long free = 0;
+ for (StorageClass storageClass : storageClasses) {
+ total += storageClass.getTotalBlockCount();
+ free += storageClass.getFreeBlockCount();
+ }
+
+ // if there is no available capacity (i.e. total number of available blocks is 0),
+ // return 1.0 which tells that all storage is used
+ if (total != 0) {
+ double available = (double) free / (double) total;
+ return 1.0 - available;
+ } else {
+ return 1.0;
+ }
+
+ }
+
+ public long getNumberOfBlocksUsed() throws Exception {
+ int total = 0;
+
+ for (StorageClass storageClass: storageClasses) {
+ total += (storageClass.getTotalBlockCount() - storageClass.getFreeBlockCount());
+ }
+
+ return total;
+ }
+
+ public long getNumberOfBlocks() throws Exception {
+ int total = 0;
+
+ for (StorageClass storageClass: storageClasses) {
+ total += storageClass.getTotalBlockCount();
+ }
+
+ return total;
+ }
+
+ public int getNumberOfRunningDatanodes() {
+ int total = 0;
+
+ for (StorageClass storageClass : storageClasses) {
+ total += storageClass.getNumberOfRunningDatanodes();
+ }
+
+ return total;
+ }
+
+ public DataNodeBlocks identifyRemoveCandidate() {
+
+ ArrayList<DataNodeBlocks> dataNodeBlocks = new ArrayList<DataNodeBlocks>();
+ for (StorageClass storageClass : storageClasses) {
+ dataNodeBlocks.addAll(storageClass.getDataNodeBlocks());
+ }
+
+ // sort all datanodes by increasing numbers of available datablocks
+ Collections.sort(dataNodeBlocks, new Comparator<DataNodeBlocks>() {
+ public int compare(DataNodeBlocks d1, DataNodeBlocks d2) {
+ if (d1.getBlockCount() < d2.getBlockCount()) {
+ return 1;
+ } else if (d1.getBlockCount() > d2.getBlockCount()) {
+ return -1;
+ } else return 0;
+ }
+ });
+
+ // iterate over datanodes and return first datanode which is not already scheduled for removal
+ for (DataNodeBlocks candidate: dataNodeBlocks) {
+ if (!candidate.isScheduleForRemoval()) {
+ return candidate;
+ }
+ }
+
+ // return null if there is no available candidate
+ return null;
+
+ }
+
}
class StorageClass {
@@ -124,6 +203,8 @@
this.affinitySets = new ConcurrentHashMap<Integer, DataNodeArray>();
if (CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("roundrobin")){
this.blockSelection = new RoundRobinBlockSelection();
+ } else if (CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("sequential")) {
+ this.blockSelection = new SequentialBlockSelection();
} else {
this.blockSelection = new RandomBlockSelection();
}
@@ -222,6 +303,33 @@
//---------------
+ public long getTotalBlockCount() {
+ long capacity = 0;
+
+ for (DataNodeBlocks datanode : membership.values()) {
+ capacity += datanode.getTotalNumberOfBlocks();
+ }
+
+ return capacity;
+ }
+
+ public long getFreeBlockCount() {
+ long capacity = 0;
+
+ for (DataNodeBlocks datanode : membership.values()) {
+ capacity += datanode.getBlockCount();
+ }
+
+ return capacity;
+ }
+
+ public Collection<DataNodeBlocks> getDataNodeBlocks() {
+ return this.membership.values();
+ }
+
+ public int getNumberOfRunningDatanodes() {
+ return this.membership.size();
+ }
private void _addDataNode(DataNodeBlocks dataNode){
LOG.info("adding datanode " + CrailUtils.getIPAddressFromBytes(dataNode.getIpAddress()) + ":" + dataNode.getPort() + " of type " + dataNode.getStorageType() + " to storage class " + storageClass);
@@ -273,7 +381,18 @@
public int getNext(int size) {
return ThreadLocalRandom.current().nextInt(size);
}
- }
+ }
+
+ public class SequentialBlockSelection implements BlockSelection {
+ public SequentialBlockSelection(){
+ LOG.info("sequential block selection");
+ }
+
+ @Override
+ public int getNext(int size) {
+ return 0;
+ }
+ }
private class DataNodeArray {
private ArrayList<DataNodeBlocks> arrayList;
diff --git a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
index 58e4ab0..d3d82ea 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
@@ -86,6 +86,10 @@
return this.scheduleForRemoval;
}
+ public long getTotalNumberOfBlocks() {
+ return this.maxBlockCount;
+ }
+
public int getBlockCount() {
return this.freeBlocks.size();
}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java
new file mode 100644
index 0000000..745916c
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java
@@ -0,0 +1,19 @@
+package org.apache.crail.namenode;
+
+import java.io.IOException;
+
+import org.apache.crail.conf.CrailConstants;
+
+public class ElasticNameNodeService extends NameNodeService {
+
+ PolicyRunner policyRunner;
+
+ public ElasticNameNodeService() throws IOException {
+
+ this.policyRunner = new FreeCapacityPolicy(this,
+ CrailConstants.ELASTICSTORE_SCALEUP,
+ CrailConstants.ELASTICSTORE_SCALEDOWN,
+ CrailConstants.ELASTICSTORE_MINNODES,
+ CrailConstants.ELASTICSTORE_MAXNODES);
+ }
+}
diff --git a/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java
new file mode 100644
index 0000000..70615ac
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java
@@ -0,0 +1,75 @@
+package org.apache.crail.namenode;
+
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.RpcNameNodeService;
+
+public class FreeCapacityPolicy extends PolicyRunner {
+
+ double scaleUp;
+ double scaleDown;
+ int minDataNodes;
+ int maxDataNodes;
+ int datanodes; // maintains the desired number of datanodes (i.e. a datanode might be still starting / terminating)
+ boolean updated; // shows whether a launch/terminate datanode operation returned
+ long lastCapacity; // maintains the capacity that was available when the launch/terminate datanode operation was issued
+
+ FreeCapacityPolicy(RpcNameNodeService service, double scaleUp, double scaleDown, int minDataNodes, int maxDataNodes) {
+ super(service);
+ this.scaleUp = scaleUp;
+ this.scaleDown = scaleDown;
+ this.minDataNodes = minDataNodes;
+ this.maxDataNodes = maxDataNodes;
+ this.datanodes = 0;
+ this.updated = true;
+ this.lastCapacity = 0;
+ }
+
+ @Override
+ public void checkPolicy() {
+
+ try {
+
+ // log current usage information
+ double usage = this.service.getStorageUsedPercentage();
+
+ if (CrailConstants.ELASTICSTORE_LOGGING) {
+ LOG.info("Current block usage: " + this.service.getNumberOfBlocksUsed() + "/" + this.service.getNumberOfBlocks());
+ LOG.info("Current storage usage: " + 100*usage + "%");
+ LOG.info("Current number of datanodes: " + this.datanodes);
+ }
+
+ // check whether datanode launch/terminate operation finished
+ if (!this.updated && this.lastCapacity != this.service.getNumberOfBlocks()) {
+ this.updated = true;
+ }
+
+ // check whether scaling up or down is possible
+ if (this.updated) {
+ if (usage < scaleDown && this.datanodes > minDataNodes) {
+ LOG.info("Scale down detected");
+
+ DataNodeBlocks removeCandidate = this.service.identifyRemoveCandidate();
+
+ if (removeCandidate != null) {
+ this.lastCapacity = this.service.getNumberOfBlocks();
+ this.updated = false;
+ this.service.prepareDataNodeForRemoval(removeCandidate);
+ this.datanodes--;
+ }
+ }
+
+ if (usage > this.scaleUp && this.datanodes < maxDataNodes) {
+ LOG.info("Scale up detected");
+ this.lastCapacity = this.service.getNumberOfBlocks();
+ this.updated = false;
+ launchDatanode();
+ this.datanodes++;
+ }
+ }
+
+
+ } catch (Exception e) {
+ LOG.error("Unable to retrieve storage usage information");
+ }
+ }
+}
\ No newline at end of file
diff --git a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
index a5da526..beab3f1 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
@@ -552,6 +552,26 @@
return RpcErrors.ERR_OK;
}
+ public double getStorageUsedPercentage() throws Exception {
+ return this.blockStore.getStorageUsedPercentage();
+ }
+
+ public long getNumberOfBlocksUsed() throws Exception {
+ return this.blockStore.getNumberOfBlocksUsed();
+ }
+
+ public long getNumberOfBlocks() throws Exception {
+ return this.blockStore.getNumberOfBlocks();
+ }
+
+ public int getNumberOfRunningDatanodes() {
+ return this.blockStore.getNumberOfRunningDatanodes();
+ }
+
+ public DataNodeBlocks identifyRemoveCandidate() {
+ return this.blockStore.identifyRemoveCandidate();
+ }
+
//------------------------
@Override
diff --git a/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java
new file mode 100644
index 0000000..8ce0ea2
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java
@@ -0,0 +1,51 @@
+package org.apache.crail.namenode;
+
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.RpcNameNodeService;
+import org.apache.crail.utils.CrailUtils;
+import org.slf4j.Logger;
+
+public abstract class PolicyRunner implements Runnable {
+
+ static final Logger LOG = CrailUtils.getLogger();
+ NameNodeService service;
+ int instances = 0;
+
+ PolicyRunner(RpcNameNodeService service){
+ this.service = (NameNodeService) service;
+ Thread runner = new Thread(this);
+ runner.start();
+ }
+
+ public abstract void checkPolicy();
+
+ public void run() {
+
+ while (true) {
+ checkPolicy();
+
+ try {
+ Thread.sleep(CrailConstants.ELASTICSTORE_POLICYRUNNER_INTERVAL);
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ public void launchDatanode() {
+
+ try {
+ String port = Integer.toString(50020+this.instances);
+ Process p = new ProcessBuilder(System.getenv("CRAIL_HOME") + "/bin/crail", "datanode", "--", "-p" + port).start();
+
+ LOG.info("Launched new datanode instance");
+ this.instances++;
+
+ } catch(Exception e) {
+ LOG.error("Unable to launch datanode");
+ e.printStackTrace();
+ }
+
+ }
+}