blob: b4b9b3cd8c7b3993c97dcb1ec0cad3c14088e603 [file] [log] [blame]
package edu.uci.ics.hyracks.api.client.impl;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class JobActivityGraphBuilder implements IActivityGraphBuilder {
private static final Logger LOGGER = Logger.getLogger(JobActivityGraphBuilder.class.getName());
private final Map<ActivityId, IOperatorDescriptor> activityOperatorMap;
private final JobActivityGraph jag;
private final JobSpecification jobSpec;
private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorProducerMap;
private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
public JobActivityGraphBuilder(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
jag = new JobActivityGraph();
this.jobSpec = jobSpec;
connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
}
public void addConnector(IConnectorDescriptor conn) {
jag.getConnectorMap().put(conn.getConnectorId(), conn);
jag.getConnectorRecordDescriptorMap().put(conn.getConnectorId(), jobSpec.getConnectorRecordDescriptor(conn));
}
@Override
public void addBlockingEdge(IActivity blocker, IActivity blocked) {
addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
}
@Override
public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Adding source edge: " + task.getActivityId() + ":" + operatorInputIndex + " -> "
+ task.getActivityId() + ":" + taskInputIndex);
}
IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, conn);
connectorConsumerMap.put(conn.getConnectorId(), Pair.of(task, taskInputIndex));
}
@Override
public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Adding target edge: " + task.getActivityId() + ":" + operatorOutputIndex + " -> "
+ task.getActivityId() + ":" + taskOutputIndex);
}
IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, conn);
connectorProducerMap.put(conn.getConnectorId(), Pair.of(task, taskOutputIndex));
}
@Override
public void addActivity(IOperatorDescriptor op, IActivity task) {
activityOperatorMap.put(task.getActivityId(), op);
ActivityId activityId = task.getActivityId();
jag.getActivityMap().put(activityId, task);
}
public void finish() {
Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> caMap = jag
.getConnectorActivityMap();
for (Map.Entry<ConnectorDescriptorId, Pair<IActivity, Integer>> e : connectorProducerMap.entrySet()) {
ConnectorDescriptorId cdId = e.getKey();
Pair<IActivity, Integer> producer = e.getValue();
Pair<IActivity, Integer> consumer = connectorConsumerMap.get(cdId);
caMap.put(cdId, Pair.of(producer, consumer));
}
}
private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
Set<V> targets = map.get(n1);
if (targets == null) {
targets = new HashSet<V>();
map.put(n1, targets);
}
targets.add(n2);
}
private <T> void extend(List<T> list, int index) {
int n = list.size();
for (int i = n; i <= index; ++i) {
list.add(null);
}
}
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
vList = new ArrayList<V>();
map.put(key, vList);
}
extend(vList, index);
vList.set(index, value);
}
public JobActivityGraph getActivityGraph() {
return jag;
}
}