blob: c6761e936bb28099ae2a43e6f31fec2cbd8110a9 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.api.rewriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.ActivityClusterId;
import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
/**
* This class rewrite the AcivityClusterGraph to eliminate
* all one-to-one connections and merge one-to-one connected
* DAGs into super activities.
* </p>
* Each super activity internally maintains a DAG and execute it at the runtime.
*
* @author yingyib
*/
public class ActivityClusterGraphRewriter {
private static String ONE_TO_ONE_CONNECTOR = "edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
/**
* rewrite an activity cluster graph to eliminate
* all one-to-one connections and merge one-to-one connected
* DAGs into super activities.
*
* @param acg
* the activity cluster graph
*/
public void rewrite(ActivityClusterGraph acg) {
acg.getActivityMap().clear();
acg.getConnectorMap().clear();
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
rewriteIntraActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
}
for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
rewriteInterActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
}
invertedActivitySuperActivityMap.clear();
}
/**
* rewrite the blocking relationship among activity cluster
*
* @param ac
* the activity cluster to be rewritten
*/
private void rewriteInterActivityCluster(ActivityCluster ac,
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
}
Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
ActivityId blocked = entry.getKey();
ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
Set<ActivityId> blockers = entry.getValue();
Set<ActivityId> replacedBlockers = null;
if (blockers != null) {
replacedBlockers = new HashSet<ActivityId>();
for (ActivityId blocker : blockers) {
replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
.get(invertedAid2SuperAidMap.get(blocker));
if (!ac.getDependencies().contains(dependingAc)) {
ac.getDependencies().add(dependingAc);
}
}
}
if (replacedBlockers != null) {
Set<ActivityId> existingBlockers = replacedBlocked2BlockerMap.get(replacedBlocked);
if (existingBlockers == null) {
replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
} else {
existingBlockers.addAll(replacedBlockers);
replacedBlocked2BlockerMap.put(replacedBlocked, existingBlockers);
}
}
}
blocked2BlockerMap.clear();
blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
}
/**
* rewrite an activity cluster internally
*
* @param ac
* the activity cluster to be rewritten
*/
private void rewriteIntraActivityCluster(ActivityCluster ac,
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
Map<ActivityId, IActivity> activities = ac.getActivityMap();
Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
.getConnectorActivityMap();
ActivityClusterGraph acg = ac.getActivityClusterGraph();
Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
/**
* Build the initial super activities
*/
for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
ActivityId activityId = entry.getKey();
IActivity activity = entry.getValue();
if (activityInputMap.get(activityId) == null) {
startActivities.put(activityId, activity);
/**
* use the start activity's id as the id of the super activity
*/
createNewSuperActivity(ac, superActivities, toBeExpendedMap, invertedActivitySuperActivityMap,
activityId, activity);
}
}
/**
* expand one-to-one connected activity cluster by the BFS order.
* after the while-loop, the original activities are partitioned
* into equivalent classes, one-per-super-activity.
*/
Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
while (toBeExpendedMap.size() > 0) {
clonedSuperActivities.clear();
clonedSuperActivities.putAll(superActivities);
for (Entry<ActivityId, SuperActivity> entry : clonedSuperActivities.entrySet()) {
ActivityId superActivityId = entry.getKey();
SuperActivity superActivity = entry.getValue();
/**
* for the case where the super activity has already been swallowed
*/
if (superActivities.get(superActivityId) == null) {
continue;
}
/**
* expend the super activity
*/
Queue<IActivity> toBeExpended = toBeExpendedMap.get(superActivityId);
if (toBeExpended == null) {
/**
* Nothing to expand
*/
continue;
}
IActivity expendingActivity = toBeExpended.poll();
List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
if (outputConnectors != null) {
for (IConnectorDescriptor outputConn : outputConnectors) {
Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
.get(outputConn.getConnectorId());
IActivity newActivity = endPoints.getRight().getLeft();
SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
if (outputConn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
/**
* expend the super activity cluster on an one-to-one out-bound connection
*/
if (existingSuperActivity == null) {
superActivity.addActivity(newActivity);
toBeExpended.add(newActivity);
invertedActivitySuperActivityMap.put(newActivity, superActivity);
} else {
/**
* the two activities already in the same super activity
*/
if (existingSuperActivity == superActivity) {
continue;
}
/**
* swallow an existing super activity
*/
swallowExistingSuperActivity(superActivities, toBeExpendedMap,
invertedActivitySuperActivityMap, superActivity, superActivityId,
existingSuperActivity);
}
} else {
if (existingSuperActivity == null) {
/**
* create new activity
*/
createNewSuperActivity(ac, superActivities, toBeExpendedMap,
invertedActivitySuperActivityMap, newActivity.getActivityId(), newActivity);
}
}
}
}
/**
* remove the to-be-expended queue if it is empty
*/
if (toBeExpended.size() == 0) {
toBeExpendedMap.remove(superActivityId);
}
}
}
Map<ConnectorDescriptorId, IConnectorDescriptor> connMap = ac.getConnectorMap();
Map<ConnectorDescriptorId, RecordDescriptor> connRecordDesc = ac.getConnectorRecordDescriptorMap();
Map<SuperActivity, Integer> superActivityProducerPort = new HashMap<SuperActivity, Integer>();
Map<SuperActivity, Integer> superActivityConsumerPort = new HashMap<SuperActivity, Integer>();
for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
superActivityProducerPort.put(entry.getValue(), 0);
superActivityConsumerPort.put(entry.getValue(), 0);
}
/**
* create a new activity cluster to replace the old activity cluster
*/
ActivityCluster newActivityCluster = new ActivityCluster(acg, ac.getId());
newActivityCluster.setConnectorPolicyAssignmentPolicy(ac.getConnectorPolicyAssignmentPolicy());
for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
newActivityCluster.addActivity(entry.getValue());
acg.getActivityMap().put(entry.getKey(), newActivityCluster);
}
/**
* Setup connectors: either inside a super activity or among super activities
*/
for (Entry<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> entry : connectorActivityMap
.entrySet()) {
ConnectorDescriptorId connectorId = entry.getKey();
Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = entry.getValue();
IActivity producerActivity = endPoints.getLeft().getLeft();
IActivity consumerActivity = endPoints.getRight().getLeft();
int producerPort = endPoints.getLeft().getRight();
int consumerPort = endPoints.getRight().getRight();
RecordDescriptor recordDescriptor = connRecordDesc.get(connectorId);
IConnectorDescriptor conn = connMap.get(connectorId);
if (conn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
/**
* connection edge between inner activities
*/
SuperActivity residingSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
residingSuperActivity.connect(conn, producerActivity, producerPort, consumerActivity, consumerPort,
recordDescriptor);
} else {
/**
* connection edge between super activities
*/
SuperActivity producerSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
newActivityCluster.addConnector(conn);
newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
consumerSAPort, recordDescriptor);
/**
* bridge the port
*/
producerSuperActivity.setClusterOutputIndex(producerSAPort, producerActivity.getActivityId(),
producerPort);
consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
consumerPort);
acg.getConnectorMap().put(connectorId, newActivityCluster);
/**
* increasing the port number for the producer and consumer
*/
superActivityProducerPort.put(producerSuperActivity, ++producerSAPort);
superActivityConsumerPort.put(consumerSuperActivity, ++consumerSAPort);
}
}
/**
* Set up the roots of the new activity cluster
*/
for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
List<IConnectorDescriptor> connIds = newActivityCluster.getActivityOutputMap().get(entry.getKey());
if (connIds == null || connIds.size() == 0) {
newActivityCluster.addRoot(entry.getValue());
}
}
/**
* set up the blocked2Blocker mapping, which will be updated in the rewriteInterActivityCluster call
*/
newActivityCluster.getBlocked2BlockerMap().putAll(ac.getBlocked2BlockerMap());
/**
* replace the old activity cluster with the new activity cluster
*/
acg.getActivityClusterMap().put(ac.getId(), newActivityCluster);
}
/**
* Create a new super activity
*
* @param acg
* the activity cluster
* @param superActivities
* the map from activity id to current super activities
* @param toBeExpendedMap
* the map from an existing super activity to its BFS expansion queue of the original activities
* @param invertedActivitySuperActivityMap
* the map from the original activities to their hosted super activities
* @param activityId
* the activity id for the new super activity, which is the first added acitivty's id in the super activity
* @param activity
* the first activity added to the new super activity
*/
private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, ActivityId activityId, IActivity activity) {
SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
superActivities.put(activityId, superActivity);
superActivity.addActivity(activity);
Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
toBeExpended.add(activity);
toBeExpendedMap.put(activityId, toBeExpended);
invertedActivitySuperActivityMap.put(activity, superActivity);
}
/**
* One super activity swallows another existing super activity.
*
* @param superActivities
* the map from activity id to current super activities
* @param toBeExpendedMap
* the map from an existing super activity to its BFS expansion queue of the original activities
* @param invertedActivitySuperActivityMap
* the map from the original activities to their hosted super activities
* @param superActivity
* the "swallowing" super activity
* @param superActivityId
* the activity id for the "swallowing" super activity, which is also the first added acitivty's id in the super activity
* @param existingSuperActivity
* an existing super activity which is to be swallowed by the "swallowing" super activity
*/
private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, SuperActivity superActivity,
ActivityId superActivityId, SuperActivity existingSuperActivity) {
ActivityId existingSuperActivityId = existingSuperActivity.getActivityId();
superActivities.remove(existingSuperActivityId);
for (Entry<ActivityId, IActivity> existingEntry : existingSuperActivity.getActivityMap().entrySet()) {
IActivity existingActivity = existingEntry.getValue();
superActivity.addActivity(existingActivity);
invertedActivitySuperActivityMap.put(existingActivity, superActivity);
}
Queue<IActivity> tbeQueue = toBeExpendedMap.get(superActivityId);
Queue<IActivity> existingTbeQueque = toBeExpendedMap.remove(existingSuperActivityId);
if (existingTbeQueque != null) {
tbeQueue.addAll(existingTbeQueque);
}
}
}