blob: f40e8cd4cafab300bb3a379b25b2168ce255ace4 [file] [log] [blame]
package edu.uci.ics.hyracks.control.cc.scheduler.naive;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.constraints.expressions.BelongsToExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
import edu.uci.ics.hyracks.api.constraints.expressions.EnumeratedCollectionExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.RelationalExpression;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobAttempt;
import edu.uci.ics.hyracks.control.cc.job.JobStageAttempt;
import edu.uci.ics.hyracks.control.cc.scheduler.IJobAttemptSchedulerState;
import edu.uci.ics.hyracks.control.cc.scheduler.ISchedule;
import edu.uci.ics.hyracks.control.cc.scheduler.IScheduler;
public class NaiveScheduler implements IScheduler {
private static final Logger LOGGER = Logger.getLogger(NaiveScheduler.class.getName());
private final ClusterControllerService ccs;
public NaiveScheduler(ClusterControllerService ccs) {
this.ccs = ccs;
}
@Override
public IJobAttemptSchedulerState createJobAttemptState(JobAttempt ja) {
return new JobAttemptState(ja.getJobRun().getConstraints());
}
@Override
public void schedule(Set<JobStageAttempt> runnableStageAttempts) throws HyracksException {
for (JobStageAttempt jsa : runnableStageAttempts) {
Set<OperatorDescriptorId> operators = new HashSet<OperatorDescriptorId>();
for (ActivityNodeId aid : jsa.getJobStage().getTasks()) {
operators.add(aid.getOperatorDescriptorId());
}
jsa.setSchedule(computeSchedule(jsa, operators));
}
}
private ISchedule computeSchedule(JobStageAttempt jsa, Set<OperatorDescriptorId> operators) throws HyracksException {
Set<String> nodeSet = ccs.getNodeMap().keySet();
if (nodeSet.isEmpty()) {
throw new HyracksException("0 usable nodes found");
}
String[] liveNodes = ccs.getNodeMap().keySet().toArray(new String[nodeSet.size()]);
JobAttempt ja = jsa.getJobAttempt();
final JobAttemptState jas = (JobAttemptState) ja.getSchedulerState();
List<PartitionLocationExpression> rrAssignment = new ArrayList<PartitionLocationExpression>();
for (OperatorDescriptorId oid : operators) {
String[] opParts = null;
if (!jas.allocations.containsKey(oid)) {
Set<ConstraintExpression> opConstraints = jas.opConstraints.get(oid);
System.err.println("Constraints: " + opConstraints);
for (ConstraintExpression ce : opConstraints) {
int nParts = getNumPartitions(oid, ce);
if (nParts != -1) {
opParts = new String[nParts];
break;
}
}
if (opParts == null) {
throw new HyracksException("Unable to satisfy constraints for operator : " + oid);
}
jas.allocations.put(oid, opParts);
BitSet unassignedPartsIds = new BitSet(opParts.length);
unassignedPartsIds.set(0, opParts.length);
for (ConstraintExpression ce : opConstraints) {
if (ce.getTag() == ConstraintExpression.ExpressionTag.BELONGS_TO) {
BelongsToExpression bE = (BelongsToExpression) ce;
if (bE.getItemExpression().getTag() == ConstraintExpression.ExpressionTag.PARTITION_LOCATION) {
PartitionLocationExpression plE = (PartitionLocationExpression) bE.getItemExpression();
if (plE.getOperatorDescriptorId().equals(oid)) {
int part = plE.getPartition();
if (bE.getSetExpression().getTag() == ConstraintExpression.ExpressionTag.ENUMERATED_SET) {
EnumeratedCollectionExpression ecE = (EnumeratedCollectionExpression) bE
.getSetExpression();
for (ConstraintExpression value : ecE.getMembers()) {
if (value.getTag() == ConstraintExpression.ExpressionTag.CONSTANT) {
ConstantExpression nodeConst = (ConstantExpression) value;
String nodeId = (String) nodeConst.getValue();
if (nodeSet.contains(nodeId)) {
opParts[part] = nodeId;
unassignedPartsIds.clear(part);
LOGGER.info("Assigned: " + oid + ":" + part + ": " + nodeId);
break;
}
}
}
}
if (unassignedPartsIds.get(part)) {
throw new HyracksException("Unsatisfiable constraint for operator: " + oid);
}
}
}
}
}
if (!unassignedPartsIds.isEmpty()) {
// Do round robin assignment.
for (int i = unassignedPartsIds.nextSetBit(0); i >= 0; i = unassignedPartsIds.nextSetBit(i + 1)) {
rrAssignment.add(new PartitionLocationExpression(oid, i));
}
}
}
}
int n = rrAssignment.size();
for (int i = 0; i < n; ++i) {
PartitionLocationExpression plE = rrAssignment.get(i);
String[] opParts = jas.allocations.get(plE.getOperatorDescriptorId());
String node = liveNodes[i % liveNodes.length];
LOGGER.info("Assigned: " + plE.getOperatorDescriptorId() + ":" + plE.getPartition() + ": " + node);
opParts[plE.getPartition()] = node;
}
return new ISchedule() {
@Override
public String[] getPartitions(ActivityNodeId aid) {
return jas.allocations.get(aid.getOperatorDescriptorId());
}
};
}
private int getNumPartitions(OperatorDescriptorId oid, ConstraintExpression ce) {
System.err.println(ce);
if (ce.getTag() == ExpressionTag.RELATIONAL) {
RelationalExpression re = (RelationalExpression) ce;
if (re.getOperator() == RelationalExpression.Operator.EQUAL) {
System.err.println("Left: " + re.getLeft());
System.err.println("Right: " + re.getRight());
if (re.getLeft().getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
return getNumPartitions(oid, (PartitionCountExpression) re.getLeft(), re.getRight());
} else if (re.getRight().getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) {
return getNumPartitions(oid, (PartitionCountExpression) re.getRight(), re.getLeft());
}
}
}
return -1;
}
private int getNumPartitions(OperatorDescriptorId oid, PartitionCountExpression pce, ConstraintExpression ce) {
if (pce.getOperatorDescriptorId().equals(oid)) {
if (ce.getTag() == ConstraintExpression.ExpressionTag.CONSTANT) {
ConstantExpression constExpr = (ConstantExpression) ce;
Integer n = (Integer) constExpr.getValue();
return n.intValue();
}
}
return -1;
}
private static class JobAttemptState implements IJobAttemptSchedulerState {
final Map<OperatorDescriptorId, String[]> allocations;
final Map<OperatorDescriptorId, Set<ConstraintExpression>> opConstraints;
public JobAttemptState(Set<ConstraintExpression> constraints) {
allocations = new HashMap<OperatorDescriptorId, String[]>();
opConstraints = new HashMap<OperatorDescriptorId, Set<ConstraintExpression>>();
List<ConstraintExpression> ceList = new ArrayList<ConstraintExpression>();
for (ConstraintExpression ce : constraints) {
ceList.clear();
ceList.add(ce);
getAllConstraints(ceList);
for (ConstraintExpression ce2 : ceList) {
switch (ce2.getTag()) {
case PARTITION_COUNT:
addToOpConstraints(opConstraints,
((PartitionCountExpression) ce2).getOperatorDescriptorId(), ce);
break;
case PARTITION_LOCATION:
addToOpConstraints(opConstraints,
((PartitionLocationExpression) ce2).getOperatorDescriptorId(), ce);
break;
}
}
}
}
private static void addToOpConstraints(Map<OperatorDescriptorId, Set<ConstraintExpression>> opc,
OperatorDescriptorId opId, ConstraintExpression ce) {
Set<ConstraintExpression> opSet = opc.get(opId);
if (opSet == null) {
opSet = new HashSet<ConstraintExpression>();
opc.put(opId, opSet);
}
opSet.add(ce);
}
private static void getAllConstraints(List<ConstraintExpression> ceList) {
for (int i = 0; i < ceList.size(); ++i) {
ConstraintExpression cE = ceList.get(i);
cE.getChildren(ceList);
}
}
}
}