blob: 70615aca2435a3562c7e23c0853a807ef56c0bd8 [file] [log] [blame]
package org.apache.crail.namenode;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.rpc.RpcNameNodeService;
public class FreeCapacityPolicy extends PolicyRunner {
double scaleUp;
double scaleDown;
int minDataNodes;
int maxDataNodes;
int datanodes; // maintains the desired number of datanodes (i.e. a datanode might be still starting / terminating)
boolean updated; // shows whether a launch/terminate datanode operation returned
long lastCapacity; // maintains the capacity that was available when the launch/terminate datanode operation was issued
FreeCapacityPolicy(RpcNameNodeService service, double scaleUp, double scaleDown, int minDataNodes, int maxDataNodes) {
super(service);
this.scaleUp = scaleUp;
this.scaleDown = scaleDown;
this.minDataNodes = minDataNodes;
this.maxDataNodes = maxDataNodes;
this.datanodes = 0;
this.updated = true;
this.lastCapacity = 0;
}
@Override
public void checkPolicy() {
try {
// log current usage information
double usage = this.service.getStorageUsedPercentage();
if (CrailConstants.ELASTICSTORE_LOGGING) {
LOG.info("Current block usage: " + this.service.getNumberOfBlocksUsed() + "/" + this.service.getNumberOfBlocks());
LOG.info("Current storage usage: " + 100*usage + "%");
LOG.info("Current number of datanodes: " + this.datanodes);
}
// check whether datanode launch/terminate operation finished
if (!this.updated && this.lastCapacity != this.service.getNumberOfBlocks()) {
this.updated = true;
}
// check whether scaling up or down is possible
if (this.updated) {
if (usage < scaleDown && this.datanodes > minDataNodes) {
LOG.info("Scale down detected");
DataNodeBlocks removeCandidate = this.service.identifyRemoveCandidate();
if (removeCandidate != null) {
this.lastCapacity = this.service.getNumberOfBlocks();
this.updated = false;
this.service.prepareDataNodeForRemoval(removeCandidate);
this.datanodes--;
}
}
if (usage > this.scaleUp && this.datanodes < maxDataNodes) {
LOG.info("Scale up detected");
this.lastCapacity = this.service.getNumberOfBlocks();
this.updated = false;
launchDatanode();
this.datanodes++;
}
}
} catch (Exception e) {
LOG.error("Unable to retrieve storage usage information");
}
}
}