blob: 6effd84d9c87e91aab607aa760c4f4244124323c [file] [log] [blame]
package edu.uci.ics.hyracks.control.cc.job;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class JobActivityGraphBuilder implements IActivityGraphBuilder {
private static final Logger LOGGER = Logger.getLogger(JobActivityGraphBuilder.class.getName());
private JobActivityGraph jag;
@Override
public void addBlockingEdge(IActivity blocker, IActivity blocked) {
addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityId(), blocked.getActivityId());
addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
}
@Override
public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+ operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
}
insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, operatorInputIndex);
insertIntoIndexedMap(jag.getOperatorInputMap(), task.getActivityId().getOperatorDescriptorId(),
operatorInputIndex, task.getActivityId());
}
@Override
public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+ operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
}
insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, operatorOutputIndex);
insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getActivityId().getOperatorDescriptorId(),
operatorOutputIndex, task.getActivityId());
}
@Override
public void addActivity(IActivity task) {
ActivityId activityId = task.getActivityId();
jag.getActivityNodeMap().put(activityId, task);
addToValueSet(jag.getOperatorActivityMap(), activityId.getOperatorDescriptorId(), activityId);
}
private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
Set<V> targets = map.get(n1);
if (targets == null) {
targets = new HashSet<V>();
map.put(n1, targets);
}
targets.add(n2);
}
private <T> void extend(List<T> list, int index) {
int n = list.size();
for (int i = n; i <= index; ++i) {
list.add(null);
}
}
public void init(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
jag = new JobActivityGraph(appName, jobSpec, jobFlags);
}
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
vList = new ArrayList<V>();
map.put(key, vList);
}
extend(vList, index);
vList.set(index, value);
}
public JobActivityGraph getActivityGraph() {
return jag;
}
}