blob: 1a9b602adaf65d36a85ea98296be9aa61caa8a61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samoa.apex.topology.impl;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.samoa.apex.topology.impl.ApexStreamUtils.JavaSerializationStreamCodec;
import org.apache.samoa.core.ContentEvent;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class ApexTask implements StreamingApplication {
LogicalPlan dag;
String appName;
List<OperatorMeta> visited = Lists.newArrayList();
Set<StreamMeta> loopStreams = Sets.newHashSet();
Map<String, Integer> operatorNames = Maps.newHashMap();
public ApexTask(ApexTopology apexTopo) {
this.dag = (LogicalPlan) apexTopo.getDAG();
appName = apexTopo.getTopologyName();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void populateDAG(DAG dag, Configuration conf) {
LogicalPlan dag2 = new LogicalPlan();
for (OperatorMeta o : this.dag.getAllOperators()) {
dag2.addOperator(o.getName(), o.getOperator());
}
for (StreamMeta s : this.dag.getAllStreams()) {
for (InputPortMeta i : s.getSinks()) {
Operator.OutputPort<Object> op = (OutputPort<Object>) s.getSource().getPortObject();
Operator.InputPort<Object> ip = (InputPort<Object>) i.getPortObject();
dag2.addStream(s.getName(), op, ip);
}
}
detectLoops(dag2, conf);
// Reconstruct Dag
for (OperatorMeta o : this.dag.getAllOperators()) {
dag.addOperator(o.getName(), o.getOperator());
for (Entry<Attribute<?>, Object> attr : o.getAttributes().entrySet()) {
dag.setAttribute(o.getOperator(), (Attribute) attr.getKey(), attr.getValue());
}
for(InputPortMeta meta: o.getInputStreams().keySet()) {
AttributeMap map = meta.getAttributes();
for(Entry<Attribute<?>, Object> entry: map.entrySet()) {
dag.setInputPortAttribute(meta.getPortObject(), (Attribute) entry.getKey(), entry.getValue());
}
}
dag.setAttribute(Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 300);
}
for (StreamMeta s : this.dag.getAllStreams()) {
if (loopStreams.contains(s)) {
// Add delay Operator
DelayOperatorSerializable<ContentEvent> d = dag.addOperator("Delay_" + s.getName(),
new DelayOperatorSerializable<ContentEvent>());
dag.addStream("Delay" + s.getName() + "toDelay",
(DefaultOutputPort<ContentEvent>) s.getSource().getPortObject(), d.input);
dag.addStream("Delay" + s.getName() + "fromDelay", d.output,
(DefaultInputPort<ContentEvent>) s.getSinks().get(0).getPortObject());
dag.setInputPortAttribute(d.input, Context.PortContext.STREAM_CODEC, new JavaSerializationStreamCodec<ContentEvent>());
dag.setInputPortAttribute(s.getSinks().get(0).getPortObject(), Context.PortContext.STREAM_CODEC, new JavaSerializationStreamCodec<ContentEvent>());
continue;
}
for (InputPortMeta i : s.getSinks()) {
DefaultOutputPort<Object> op = (DefaultOutputPort<Object>) s.getSource().getPortObject();
DefaultInputPort<Object> ip = (DefaultInputPort<Object>) i.getPortObject();
Preconditions.checkArgument(op != null && ip != null);
dag.addStream(s.getName(), op, ip);
}
}
dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName);
}
public void detectLoops(DAG dag, Configuration conf) {
List<OperatorMeta> inputOperators = Lists.newArrayList();
for (OperatorMeta om : this.dag.getAllOperators()) {
if (om.getOperator() instanceof InputOperator) {
inputOperators.add(om);
}
}
for (OperatorMeta o : inputOperators) {
visited.clear();
List<OperatorMeta> visited = Lists.newArrayList();
dfs(o, visited);
}
}
public void dfs(OperatorMeta o, List<OperatorMeta> visited) {
visited.add(o);
for (Entry<OutputPortMeta, StreamMeta> opm : o.getOutputStreams().entrySet()) {
// Samoa won't allow one output port to multiple input port kind of streams
OperatorMeta downStreamOp = opm.getValue().getSinks().get(0).getOperatorWrapper();
if (visited.contains(downStreamOp)) {
loopStreams.add(opm.getValue());
} else {
List<OperatorMeta> v2 = Lists.newArrayList();
v2.addAll(visited);
dfs(downStreamOp, v2);
}
}
}
}