| /* |
| * Copyright 2009-2013 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package edu.uci.ics.hyracks.api.rewriter.runtime; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| |
| import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; |
| import edu.uci.ics.hyracks.api.dataflow.ActivityId; |
| import edu.uci.ics.hyracks.api.dataflow.IActivity; |
| import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable; |
| import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider; |
| import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.api.job.ActivityCluster; |
| import edu.uci.ics.hyracks.api.job.ActivityClusterGraph; |
| import edu.uci.ics.hyracks.api.job.ActivityClusterId; |
| import edu.uci.ics.hyracks.api.rewriter.OneToOneConnectedActivityCluster; |
| |
| /** |
| * This class can be used to execute a DAG of activities inside which |
| * there are only one-to-one connectors. |
| * |
| * @author yingyib |
| */ |
| public class SuperActivity extends OneToOneConnectedActivityCluster implements IActivity { |
| private static final long serialVersionUID = 1L; |
| private final ActivityId activityId; |
| |
| public SuperActivity(ActivityClusterGraph acg, ActivityClusterId id, ActivityId activityId) { |
| super(acg, id); |
| this.activityId = activityId; |
| } |
| |
| @Override |
| public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, |
| final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) |
| throws HyracksDataException { |
| final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>(); |
| Map<ActivityId, IActivity> activities = getActivityMap(); |
| for (Entry<ActivityId, IActivity> entry : activities.entrySet()) { |
| /** |
| * extract start activities |
| */ |
| List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey()); |
| if (conns == null || conns.size() == 0) { |
| startActivities.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| /** |
| * wrap a RecordDescriptorProvider for the super activity |
| */ |
| IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() { |
| |
| @Override |
| public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) { |
| if (startActivities.get(aid) != null) { |
| /** |
| * if the activity is a start (input boundary) activity |
| */ |
| int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex)); |
| if (superActivityInputChannel >= 0) { |
| return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel); |
| } |
| } |
| if (SuperActivity.this.getActivityMap().get(aid) != null) { |
| /** |
| * if the activity is an internal activity of the super activity |
| */ |
| IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex); |
| return getConnectorRecordDescriptorMap().get(conn.getConnectorId()); |
| } |
| |
| /** |
| * the following is for the case where the activity is in other SuperActivities |
| */ |
| ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph(); |
| for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) { |
| ActivityCluster ac = entry.getValue(); |
| for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) { |
| SuperActivity sa = (SuperActivity) saEntry.getValue(); |
| if (sa.getActivityMap().get(aid) != null) { |
| List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid); |
| if (conns != null && conns.size() >= inputIndex) { |
| IConnectorDescriptor conn = conns.get(inputIndex); |
| return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); |
| } else { |
| int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex)); |
| if (superActivityInputChannel >= 0) { |
| return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(), |
| superActivityInputChannel); |
| } |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) { |
| /** |
| * if the activity is an output-boundary activity |
| */ |
| int superActivityOutputChannel = SuperActivity.this.getClusterOutputIndex(Pair.of(aid, outputIndex)); |
| if (superActivityOutputChannel >= 0) { |
| return recordDescProvider.getOutputRecordDescriptor(activityId, superActivityOutputChannel); |
| } |
| |
| if (SuperActivity.this.getActivityMap().get(aid) != null) { |
| /** |
| * if the activity is an internal activity of the super activity |
| */ |
| IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex); |
| return getConnectorRecordDescriptorMap().get(conn.getConnectorId()); |
| } |
| |
| /** |
| * the following is for the case where the activity is in other SuperActivities |
| */ |
| ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph(); |
| for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) { |
| ActivityCluster ac = entry.getValue(); |
| for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) { |
| SuperActivity sa = (SuperActivity) saEntry.getValue(); |
| if (sa.getActivityMap().get(aid) != null) { |
| List<IConnectorDescriptor> conns = sa.getActivityOutputMap().get(aid); |
| if (conns != null && conns.size() >= outputIndex) { |
| IConnectorDescriptor conn = conns.get(outputIndex); |
| return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); |
| } else { |
| superActivityOutputChannel = sa.getClusterOutputIndex(Pair.of(aid, outputIndex)); |
| if (superActivityOutputChannel >= 0) { |
| return recordDescProvider.getOutputRecordDescriptor(sa.getActivityId(), |
| superActivityOutputChannel); |
| } |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| }; |
| return new SuperActivityOperatorNodePushable(this, startActivities, ctx, wrappedRecDescProvider, partition, |
| nPartitions); |
| } |
| |
| @Override |
| public ActivityId getActivityId() { |
| return activityId; |
| } |
| |
| @Override |
| public String toString() { |
| return getActivityMap().values().toString(); |
| } |
| } |