blob: f407ae302d485b655001ef70edb3c87b89bef178 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.cc.scheduler;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
public class ActivityClusterGraphBuilder {
private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
private final JobRun jobRun;
public ActivityClusterGraphBuilder(JobRun jobRun) {
this.jobRun = jobRun;
}
private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
Set<ActivityCluster> eqSets) {
Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
for (ActivityCluster eqSet : eqSets) {
for (ActivityId t : eqSet.getActivities()) {
IActivity activity = activityNodeMap.get(t);
List<Integer> inputList = jag.getActivityInputMap().get(t);
if (inputList != null) {
for (Integer idx : inputList) {
IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
.getOperatorDescriptorId(), idx);
OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
int producerOutputIndex = spec.getProducerOutputIndex(conn);
ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
if (!eqSet.getActivities().contains(inTask)) {
return Pair.<ActivityId, ActivityId> of(t, inTask);
}
}
}
List<Integer> outputList = jag.getActivityOutputMap().get(t);
if (outputList != null) {
for (Integer idx : outputList) {
IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
.getOperatorDescriptorId(), idx);
OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
int consumerInputIndex = spec.getConsumerInputIndex(conn);
ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
if (!eqSet.getActivities().contains(outTask)) {
return Pair.<ActivityId, ActivityId> of(t, outTask);
}
}
}
}
}
return null;
}
public Set<ActivityCluster> inferActivityClusters(JobActivityGraph jag) {
JobSpecification spec = jag.getJobSpecification();
/*
* Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
*/
Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
for (ActivityId taskId : taskIds) {
Set<ActivityId> eqSet = new HashSet<ActivityId>();
eqSet.add(taskId);
ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
stageMap.put(taskId, stage);
stages.add(stage);
}
}
boolean changed = true;
while (changed) {
changed = false;
Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
if (pair != null) {
merge(stageMap, stages, pair.getLeft(), pair.getRight());
changed = true;
}
}
Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
for (ActivityCluster s : stages) {
Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
for (ActivityId t : s.getActivities()) {
Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
if (blockedTasks != null) {
for (ActivityId bt : blockedTasks) {
blockedStages.add(stageMap.get(bt));
}
}
}
for (ActivityCluster bs : blockedStages) {
bs.addDependency(s);
s.addDependent(bs);
}
}
Set<ActivityCluster> roots = new HashSet<ActivityCluster>();
int idCounter = 0;
for (ActivityCluster s : stages) {
s.setActivityClusterId(new ActivityClusterId(idCounter++));
if (s.getDependents().isEmpty()) {
roots.add(s);
}
}
jobRun.setActivityClusters(stages);
jobRun.getActivityClusterMap().putAll(stageMap);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Inferred " + stages.size() + " stages");
for (ActivityCluster s : stages) {
LOGGER.info(s.toString());
}
}
return roots;
}
private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
ActivityId t2) {
ActivityCluster stage1 = eqSetMap.get(t1);
Set<ActivityId> s1 = stage1.getActivities();
ActivityCluster stage2 = eqSetMap.get(t2);
Set<ActivityId> s2 = stage2.getActivities();
Set<ActivityId> mergedSet = new HashSet<ActivityId>();
mergedSet.addAll(s1);
mergedSet.addAll(s2);
eqSets.remove(stage1);
eqSets.remove(stage2);
ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
eqSets.add(mergedStage);
for (ActivityId t : mergedSet) {
eqSetMap.put(t, mergedStage);
}
}
}