| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.api.job; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.json.JSONArray; |
| import org.json.JSONException; |
| import org.json.JSONObject; |
| |
| import edu.uci.ics.hyracks.api.dataflow.ActivityId; |
| import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.IActivity; |
| import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy; |
| import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; |
| |
| public class ActivityCluster implements Serializable { |
| private static final long serialVersionUID = 1L; |
| |
| private final ActivityClusterGraph acg; |
| |
| private final ActivityClusterId id; |
| |
| private final List<IActivity> roots; |
| |
| private final Map<ActivityId, IActivity> activities; |
| |
| private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectors; |
| |
| private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap; |
| |
| private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap; |
| |
| private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap; |
| |
| private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap; |
| |
| private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap; |
| |
| private final List<ActivityCluster> dependencies; |
| |
| private IConnectorPolicyAssignmentPolicy cpap; |
| |
| public ActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) { |
| this.acg = acg; |
| this.id = id; |
| roots = new ArrayList<IActivity>(); |
| activities = new HashMap<ActivityId, IActivity>(); |
| connectors = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>(); |
| connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>(); |
| activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>(); |
| activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>(); |
| connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>(); |
| blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>(); |
| dependencies = new ArrayList<ActivityCluster>(); |
| } |
| |
| public ActivityClusterGraph getActivityClusterGraph() { |
| return acg; |
| } |
| |
| public ActivityClusterId getId() { |
| return id; |
| } |
| |
| public void addRoot(IActivity activity) { |
| roots.add(activity); |
| } |
| |
| public void addActivity(IActivity activity) { |
| activities.put(activity.getActivityId(), activity); |
| } |
| |
| public void addConnector(IConnectorDescriptor connector) { |
| connectors.put(connector.getConnectorId(), connector); |
| } |
| |
| public void connect(IConnectorDescriptor connector, IActivity producerActivity, int producerPort, |
| IActivity consumerActivity, int consumerPort, RecordDescriptor recordDescriptor) { |
| if (!activities.containsKey(producerActivity.getActivityId()) |
| || !activities.containsKey(consumerActivity.getActivityId())) { |
| throw new IllegalStateException("Connected Activities belong to different Activity Clusters: " |
| + producerActivity.getActivityId() + " and " + consumerActivity.getActivityId()); |
| } |
| insertIntoIndexedMap(activityInputMap, consumerActivity.getActivityId(), consumerPort, connector); |
| insertIntoIndexedMap(activityOutputMap, producerActivity.getActivityId(), producerPort, connector); |
| connectorActivityMap.put( |
| connector.getConnectorId(), |
| Pair.<Pair<IActivity, Integer>, Pair<IActivity, Integer>> of( |
| Pair.<IActivity, Integer> of(producerActivity, producerPort), |
| Pair.<IActivity, Integer> of(consumerActivity, consumerPort))); |
| connectorRecordDescriptorMap.put(connector.getConnectorId(), recordDescriptor); |
| } |
| |
| public List<IActivity> getRoots() { |
| return roots; |
| } |
| |
| public Map<ActivityId, IActivity> getActivityMap() { |
| return activities; |
| } |
| |
| public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() { |
| return connectors; |
| } |
| |
| public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() { |
| return connectorRecordDescriptorMap; |
| } |
| |
| public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() { |
| return connectorActivityMap; |
| } |
| |
| public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() { |
| return activityInputMap; |
| } |
| |
| public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() { |
| return activityOutputMap; |
| } |
| |
| public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) { |
| Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId); |
| return connEdge.getRight().getLeft().getActivityId(); |
| } |
| |
| public ActivityId getProducerActivity(ConnectorDescriptorId cdId) { |
| Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId); |
| return connEdge.getLeft().getLeft().getActivityId(); |
| } |
| |
| public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() { |
| return blocked2blockerMap; |
| } |
| |
| public List<ActivityCluster> getDependencies() { |
| return dependencies; |
| } |
| |
| public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() { |
| return cpap; |
| } |
| |
| public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy cpap) { |
| this.cpap = cpap; |
| } |
| |
| private <T> void extend(List<T> list, int index) { |
| int n = list.size(); |
| for (int i = n; i <= index; ++i) { |
| list.add(null); |
| } |
| } |
| |
| private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) { |
| List<V> vList = map.get(key); |
| if (vList == null) { |
| vList = new ArrayList<V>(); |
| map.put(key, vList); |
| } |
| extend(vList, index); |
| vList.set(index, value); |
| } |
| |
| public JSONObject toJSON() throws JSONException { |
| JSONObject jac = new JSONObject(); |
| |
| JSONArray jans = new JSONArray(); |
| for (IActivity an : activities.values()) { |
| JSONObject jan = new JSONObject(); |
| jan.put("id", an.getActivityId().toString()); |
| jan.put("java-class", an.getClass().getName()); |
| |
| List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId()); |
| if (inputs != null) { |
| JSONArray jInputs = new JSONArray(); |
| for (int i = 0; i < inputs.size(); ++i) { |
| JSONObject jInput = new JSONObject(); |
| jInput.put("input-port", i); |
| jInput.put("connector-id", inputs.get(i).getConnectorId().toString()); |
| jInputs.put(jInput); |
| } |
| jan.put("inputs", jInputs); |
| } |
| |
| List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId()); |
| if (outputs != null) { |
| JSONArray jOutputs = new JSONArray(); |
| for (int i = 0; i < outputs.size(); ++i) { |
| JSONObject jOutput = new JSONObject(); |
| jOutput.put("output-port", i); |
| jOutput.put("connector-id", outputs.get(i).getConnectorId().toString()); |
| jOutputs.put(jOutput); |
| } |
| jan.put("outputs", jOutputs); |
| } |
| |
| Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId()); |
| if (blockers != null) { |
| JSONArray jDeps = new JSONArray(); |
| for (ActivityId blocker : blockers) { |
| jDeps.put(blocker.toString()); |
| } |
| jan.put("depends-on", jDeps); |
| } |
| jans.put(jan); |
| } |
| jac.put("activities", jans); |
| |
| return jac; |
| } |
| } |