blob: e6e6c2f7908f66ab12df86cbc102482b588781d5 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.cc.scheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
public class ActivityClusterPlanner {
private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
private final JobScheduler scheduler;
private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
public ActivityClusterPlanner(JobScheduler newJobScheduler) {
this.scheduler = newJobScheduler;
partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
}
public void planActivityCluster(ActivityCluster ac) throws HyracksException {
JobRun jobRun = scheduler.getJobRun();
Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
assignConnectorPolicy(ac, activityPlanMap);
TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Plan for " + ac);
LOGGER.info("Built " + taskClusters.length + " Task Clusters");
for (TaskCluster tc : taskClusters) {
LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
}
}
ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
}
private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
Map<ActivityId, ActivityPartitionDetails> pcMap) {
Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
Set<ActivityId> depAnIds = new HashSet<ActivityId>();
for (ActivityId anId : ac.getActivities()) {
depAnIds.clear();
getDependencyActivityIds(depAnIds, anId);
ActivityPartitionDetails apd = pcMap.get(anId);
Task[] tasks = new Task[apd.getPartitionCount()];
ActivityPlan activityPlan = new ActivityPlan(apd);
for (int i = 0; i < tasks.length; ++i) {
TaskId tid = new TaskId(anId, i);
tasks[i] = new Task(tid, activityPlan);
for (ActivityId danId : depAnIds) {
ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
ActivityClusterPlan dACP = dAC.getPlan();
assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ danId;
Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ danId;
assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
+ dATasks.length + " != " + tasks.length;
Task dTask = dATasks[i];
TaskId dTaskId = dTask.getTaskId();
tasks[i].getDependencies().add(dTaskId);
dTask.getDependents().add(tid);
}
}
activityPlan.setTasks(tasks);
activityPlanMap.put(anId, activityPlan);
}
return activityPlanMap;
}
private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
Map<ActivityId, ActivityPlan> activityPlanMap) {
Set<ActivityId> activities = ac.getActivities();
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
activityPlanMap, activities);
TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
for (TaskCluster tc : taskClusters) {
Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
for (Task ts : tc.getTasks()) {
TaskId tid = ts.getTaskId();
List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
if (cInfoList != null) {
for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
Task targetTS = activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft()
.getPartition()];
TaskCluster targetTC = targetTS.getTaskCluster();
if (targetTC != tc) {
ConnectorDescriptorId cdId = p.getRight();
PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(), p.getLeft()
.getPartition());
tc.getProducedPartitions().add(pid);
targetTC.getRequiredPartitions().add(pid);
partitionProducingTaskClusterMap.put(pid, tc);
}
}
}
for (TaskId dTid : ts.getDependencies()) {
TaskCluster dTC = getTaskCluster(dTid);
dTC.getDependentTaskClusters().add(tc);
tcDependencyTaskClusters.add(dTC);
}
}
}
return taskClusters;
}
private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
Map<ActivityId, ActivityPlan> activityPlanMap) {
List<Task> taskStates = new ArrayList<Task>();
for (ActivityId anId : ac.getActivities()) {
ActivityPlan ap = activityPlanMap.get(anId);
Task[] tasks = ap.getTasks();
for (Task t : tasks) {
taskStates.add(t);
}
}
TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
taskStates.toArray(new Task[taskStates.size()]));
for (Task t : tc.getTasks()) {
t.setTaskCluster(tc);
}
return new TaskCluster[] { tc };
}
private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
JobActivityGraph jag = jobRun.getJobActivityGraph();
BitSet targetBitmap = new BitSet();
for (ActivityId ac1 : activities) {
Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
ActivityId ac2 = jag.getConsumerActivity(cdId);
Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
int nConsumers = ac2TaskStates.length;
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
.getTaskId());
if (cInfoList == null) {
cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
}
for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
TaskId targetTID = ac2TaskStates[j].getTaskId();
cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
}
}
}
}
}
return taskConnectivity;
}
private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
Map<ActivityId, ActivityPlan> activityPlanMap,
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
for (ActivityId anId : ac.getActivities()) {
ActivityPlan ap = activityPlanMap.get(anId);
Task[] tasks = ap.getTasks();
for (Task t : tasks) {
Set<TaskId> cluster = new HashSet<TaskId>();
TaskId tid = t.getTaskId();
cluster.add(tid);
taskClusterMap.put(tid, cluster);
}
}
JobRun jobRun = ac.getJobRun();
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
Set<TaskId> cluster = taskClusterMap.get(e.getKey());
for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
if (cPolicy.requiresProducerConsumerCoscheduling()) {
cluster.add(p.getLeft());
}
}
}
/*
* taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
* We compute the transitive closure of this relation to find the largest set of
* tasks that need to be co-scheduled
*/
int counter = 0;
TaskId[] ordinalList = new TaskId[taskClusterMap.size()];
Map<TaskId, Integer> ordinalMap = new HashMap<TaskId, Integer>();
for (TaskId tid : taskClusterMap.keySet()) {
ordinalList[counter] = tid;
ordinalMap.put(tid, counter);
++counter;
}
int n = ordinalList.length;
BitSet[] paths = new BitSet[n];
for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
int i = ordinalMap.get(e.getKey());
BitSet bsi = paths[i];
if (bsi == null) {
bsi = new BitSet(n);
paths[i] = bsi;
}
for (TaskId ttid : e.getValue()) {
int j = ordinalMap.get(ttid);
paths[i].set(j);
BitSet bsj = paths[j];
if (bsj == null) {
bsj = new BitSet(n);
paths[j] = bsj;
}
bsj.set(i);
}
}
for (int k = 0; k < n; ++k) {
for (int i = paths[k].nextSetBit(0); i >= 0; i = paths[k].nextSetBit(i + 1)) {
for (int j = paths[i].nextClearBit(0); j < n && j >= 0; j = paths[i].nextClearBit(j + 1)) {
paths[i].set(j, paths[k].get(j));
paths[j].set(i, paths[i].get(j));
}
}
}
BitSet pending = new BitSet(n);
pending.set(0, n);
List<List<TaskId>> clusters = new ArrayList<List<TaskId>>();
for (int i = pending.nextSetBit(0); i >= 0; i = pending.nextSetBit(i)) {
List<TaskId> cluster = new ArrayList<TaskId>();
for (int j = paths[i].nextSetBit(0); j >= 0; j = paths[i].nextSetBit(j + 1)) {
cluster.add(ordinalList[j]);
pending.clear(j);
}
clusters.add(cluster);
}
List<TaskCluster> tcSet = new ArrayList<TaskCluster>();
counter = 0;
for (List<TaskId> cluster : clusters) {
List<Task> taskStates = new ArrayList<Task>();
for (TaskId tid : cluster) {
taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
}
TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), counter++), ac,
taskStates.toArray(new Task[taskStates.size()]));
tcSet.add(tc);
for (TaskId tid : cluster) {
activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()].setTaskCluster(tc);
}
}
TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
return taskClusters;
}
private TaskCluster getTaskCluster(TaskId tid) {
ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
ActivityClusterPlan acp = ac.getPlan();
Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
Task task = tasks[tid.getPartition()];
assert task.getTaskId().equals(tid);
return task.getTaskCluster();
}
private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId) {
JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(anId);
if (blockers != null) {
depAnIds.addAll(blockers);
}
}
private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
Set<ActivityId> activities = ac.getActivities();
BitSet targetBitmap = new BitSet();
for (ActivityId a1 : activities) {
Task[] ac1TaskStates = taskMap.get(a1).getTasks();
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
ActivityId a2 = jag.getConsumerActivity(cdId);
Task[] ac2TaskStates = taskMap.get(a2).getTasks();
int nConsumers = ac2TaskStates.length;
int[] fanouts = new int[nProducers];
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
fanouts[i] = targetBitmap.cardinality();
}
IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
cPolicyMap.put(cdId, cp);
}
}
}
ac.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
}
private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph().getJobSpecification()
.getConnectorPolicyAssignmentPolicy();
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
}
return new PipeliningConnectorPolicy();
}
private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
throws HyracksException {
PartitionConstraintSolver solver = scheduler.getSolver();
JobRun jobRun = scheduler.getJobRun();
Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
for (ActivityId anId : ac.getActivities()) {
lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
}
solver.solve(lValues);
Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
for (LValueConstraintExpression lv : lValues) {
Object value = solver.getValue(lv);
if (value == null) {
throw new HyracksException("No value found for " + lv);
}
if (!(value instanceof Number)) {
throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
+ value + ")");
}
int nParts = ((Number) value).intValue();
if (nParts <= 0) {
throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
}
nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
}
Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
for (ActivityId anId : ac.getActivities()) {
int nParts = nPartMap.get(anId.getOperatorDescriptorId());
int[] nInputPartitions = null;
List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
if (inputs != null) {
nInputPartitions = new int[inputs.size()];
for (int i = 0; i < nInputPartitions.length; ++i) {
nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
.getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
}
}
int[] nOutputPartitions = null;
List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
anId);
if (outputs != null) {
nOutputPartitions = new int[outputs.size()];
for (int i = 0; i < nOutputPartitions.length; ++i) {
nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
.getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
}
}
ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
activityPartsMap.put(anId, apd);
}
return activityPartsMap;
}
public Map<? extends PartitionId, ? extends TaskCluster> getPartitionProducingTaskClusterMap() {
return partitionProducingTaskClusterMap;
}
}