blob: 930d299a76667def2ef0793fe74fda58992e6024 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.api.job;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.util.Pair;
public class JobActivityGraph implements Serializable {
private static final long serialVersionUID = 1L;
private final String appName;
private final JobSpecification jobSpec;
private final EnumSet<JobFlag> jobFlags;
private final Map<ActivityId, IActivity> activityNodes;
private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
private final Map<OperatorDescriptorId, Set<ActivityId>> operatorActivityMap;
private final Map<ActivityId, List<Integer>> activityInputMap;
private final Map<ActivityId, List<Integer>> activityOutputMap;
private final Map<OperatorDescriptorId, List<ActivityId>> operatorInputMap;
private final Map<OperatorDescriptorId, List<ActivityId>> operatorOutputMap;
public JobActivityGraph(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
this.appName = appName;
this.jobSpec = jobSpec;
this.jobFlags = jobFlags;
activityNodes = new HashMap<ActivityId, IActivity>();
blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityId>>();
activityInputMap = new HashMap<ActivityId, List<Integer>>();
activityOutputMap = new HashMap<ActivityId, List<Integer>>();
operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
}
public String getApplicationName() {
return appName;
}
public JobSpecification getJobSpecification() {
return jobSpec;
}
public EnumSet<JobFlag> getJobFlags() {
return jobFlags;
}
public Map<ActivityId, IActivity> getActivityNodeMap() {
return activityNodes;
}
public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
return blocker2blockedMap;
}
public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
return blocked2blockerMap;
}
public Map<OperatorDescriptorId, Set<ActivityId>> getOperatorActivityMap() {
return operatorActivityMap;
}
public Map<ActivityId, List<Integer>> getActivityInputMap() {
return activityInputMap;
}
public Map<ActivityId, List<Integer>> getActivityOutputMap() {
return activityOutputMap;
}
public Map<OperatorDescriptorId, List<ActivityId>> getOperatorInputMap() {
return operatorInputMap;
}
public Map<OperatorDescriptorId, List<ActivityId>> getOperatorOutputMap() {
return operatorOutputMap;
}
public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityId hanId) {
List<Integer> inputIndexes = activityInputMap.get(hanId);
if (inputIndexes == null) {
return null;
}
OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
List<IConnectorDescriptor> inputs = new ArrayList<IConnectorDescriptor>();
for (Integer i : inputIndexes) {
inputs.add(jobSpec.getInputConnectorDescriptor(ownerId, i));
}
return inputs;
}
public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityId hanId) {
List<Integer> outputIndexes = activityOutputMap.get(hanId);
if (outputIndexes == null) {
return null;
}
OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
List<IConnectorDescriptor> outputs = new ArrayList<IConnectorDescriptor>();
for (Integer i : outputIndexes) {
outputs.add(jobSpec.getOutputConnectorDescriptor(ownerId, i));
}
return outputs;
}
public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
.getConnectorOperatorMap().get(cdId);
OperatorDescriptorId consumerOpId = connEdge.second.first.getOperatorId();
int consumerInputIdx = connEdge.second.second;
for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
List<Integer> anInputs = activityInputMap.get(anId);
if (anInputs != null) {
for (Integer idx : anInputs) {
if (idx.intValue() == consumerInputIdx) {
return anId;
}
}
}
}
return null;
}
public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
.getConnectorOperatorMap().get(cdId);
OperatorDescriptorId producerOpId = connEdge.first.first.getOperatorId();
int producerInputIdx = connEdge.first.second;
for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
List<Integer> anOutputs = activityOutputMap.get(anId);
if (anOutputs != null) {
for (Integer idx : anOutputs) {
if (idx.intValue() == producerInputIdx) {
return anId;
}
}
}
}
return null;
}
public RecordDescriptor getActivityInputRecordDescriptor(ActivityId hanId, int inputIndex) {
int opInputIndex = getActivityInputMap().get(hanId).get(inputIndex);
return jobSpec.getOperatorInputRecordDescriptor(hanId.getOperatorDescriptorId(), opInputIndex);
}
public RecordDescriptor getActivityOutputRecordDescriptor(ActivityId hanId, int outputIndex) {
int opOutputIndex = getActivityOutputMap().get(hanId).get(outputIndex);
return jobSpec.getOperatorOutputRecordDescriptor(hanId.getOperatorDescriptorId(), opOutputIndex);
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("ActivityNodes: " + activityNodes);
buffer.append('\n');
buffer.append("Blocker->Blocked: " + blocker2blockedMap);
buffer.append('\n');
buffer.append("Blocked->Blocker: " + blocked2blockerMap);
buffer.append('\n');
return buffer.toString();
}
public JSONObject toJSON() throws JSONException {
JSONObject jplan = new JSONObject();
jplan.put("type", "plan");
jplan.put("flags", jobFlags.toString());
JSONArray jans = new JSONArray();
for (IActivity an : activityNodes.values()) {
JSONObject jan = new JSONObject();
jan.put("type", "activity");
jan.put("id", an.getActivityId().toString());
jan.put("java-class", an.getClass().getName());
jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
List<IConnectorDescriptor> inputs = getActivityInputConnectorDescriptors(an.getActivityId());
if (inputs != null) {
JSONArray jInputs = new JSONArray();
for (int i = 0; i < inputs.size(); ++i) {
JSONObject jInput = new JSONObject();
jInput.put("type", "activity-input");
jInput.put("input-port", i);
jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
jInputs.put(jInput);
}
jan.put("inputs", jInputs);
}
List<IConnectorDescriptor> outputs = getActivityOutputConnectorDescriptors(an.getActivityId());
if (outputs != null) {
JSONArray jOutputs = new JSONArray();
for (int i = 0; i < outputs.size(); ++i) {
JSONObject jOutput = new JSONObject();
jOutput.put("type", "activity-output");
jOutput.put("output-port", i);
jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
jOutputs.put(jOutput);
}
jan.put("outputs", jOutputs);
}
Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
if (blockers != null) {
JSONArray jDeps = new JSONArray();
for (ActivityId blocker : blockers) {
jDeps.put(blocker.toString());
}
jan.put("depends-on", jDeps);
}
jans.put(jan);
}
jplan.put("activities", jans);
return jplan;
}
}