blob: 9fb2b081d88cec3545972764eba59c1133dc14b5 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.api.job;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class ActivityCluster implements Serializable {
private static final long serialVersionUID = 1L;
private final ActivityClusterGraph acg;
private final ActivityClusterId id;
private final List<IActivity> roots;
private final Map<ActivityId, IActivity> activities;
private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectors;
private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
private final List<ActivityCluster> dependencies;
private IConnectorPolicyAssignmentPolicy cpap;
public ActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
this.acg = acg;
this.id = id;
roots = new ArrayList<IActivity>();
activities = new HashMap<ActivityId, IActivity>();
connectors = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
dependencies = new ArrayList<ActivityCluster>();
}
public ActivityClusterGraph getActivityClusterGraph() {
return acg;
}
public ActivityClusterId getId() {
return id;
}
public void addRoot(IActivity activity) {
roots.add(activity);
}
public void addActivity(IActivity activity) {
activities.put(activity.getActivityId(), activity);
}
public void addConnector(IConnectorDescriptor connector) {
connectors.put(connector.getConnectorId(), connector);
}
public void connect(IConnectorDescriptor connector, IActivity producerActivity, int producerPort,
IActivity consumerActivity, int consumerPort, RecordDescriptor recordDescriptor) {
if (!activities.containsKey(producerActivity.getActivityId())
|| !activities.containsKey(consumerActivity.getActivityId())) {
throw new IllegalStateException("Connected Activities belong to different Activity Clusters: "
+ producerActivity.getActivityId() + " and " + consumerActivity.getActivityId());
}
insertIntoIndexedMap(activityInputMap, consumerActivity.getActivityId(), consumerPort, connector);
insertIntoIndexedMap(activityOutputMap, producerActivity.getActivityId(), producerPort, connector);
connectorActivityMap.put(
connector.getConnectorId(),
Pair.<Pair<IActivity, Integer>, Pair<IActivity, Integer>> of(
Pair.<IActivity, Integer> of(producerActivity, producerPort),
Pair.<IActivity, Integer> of(consumerActivity, consumerPort)));
connectorRecordDescriptorMap.put(connector.getConnectorId(), recordDescriptor);
}
public List<IActivity> getRoots() {
return roots;
}
public Map<ActivityId, IActivity> getActivityMap() {
return activities;
}
public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
return connectors;
}
public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
return connectorRecordDescriptorMap;
}
public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
return connectorActivityMap;
}
public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
return activityInputMap;
}
public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
return activityOutputMap;
}
public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
return connEdge.getRight().getLeft().getActivityId();
}
public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
return connEdge.getLeft().getLeft().getActivityId();
}
public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
return blocked2blockerMap;
}
public List<ActivityCluster> getDependencies() {
return dependencies;
}
public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
return cpap;
}
public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy cpap) {
this.cpap = cpap;
}
private <T> void extend(List<T> list, int index) {
int n = list.size();
for (int i = n; i <= index; ++i) {
list.add(null);
}
}
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
vList = new ArrayList<V>();
map.put(key, vList);
}
extend(vList, index);
vList.set(index, value);
}
public JSONObject toJSON() throws JSONException {
JSONObject jac = new JSONObject();
JSONArray jans = new JSONArray();
for (IActivity an : activities.values()) {
JSONObject jan = new JSONObject();
jan.put("id", an.getActivityId().toString());
jan.put("java-class", an.getClass().getName());
List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
if (inputs != null) {
JSONArray jInputs = new JSONArray();
for (int i = 0; i < inputs.size(); ++i) {
JSONObject jInput = new JSONObject();
jInput.put("input-port", i);
jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
jInputs.put(jInput);
}
jan.put("inputs", jInputs);
}
List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
if (outputs != null) {
JSONArray jOutputs = new JSONArray();
for (int i = 0; i < outputs.size(); ++i) {
JSONObject jOutput = new JSONObject();
jOutput.put("output-port", i);
jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
jOutputs.put(jOutput);
}
jan.put("outputs", jOutputs);
}
Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
if (blockers != null) {
JSONArray jDeps = new JSONArray();
for (ActivityId blocker : blockers) {
jDeps.put(blocker.toString());
}
jan.put("depends-on", jDeps);
}
jans.put(jan);
}
jac.put("activities", jans);
return jac;
}
}