blob: 01d9a1c560b171c10c4e1aaa22021306b7bb0c90 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.plan.logical;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.*;
import java.lang.reflect.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.*;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;
import com.datatorrent.api.*;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.Module.ProxyInputPort;
import com.datatorrent.api.Module.ProxyOutputPort;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Operator.Unifier;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.metric.MetricsAggregator;
import com.datatorrent.common.metric.SingleMetricAggregator;
import com.datatorrent.common.metric.sum.DoubleSumAggregator;
import com.datatorrent.common.metric.sum.LongSumAggregator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.Slider;
import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
/**
* DAG contains the logical declarations of operators and streams.
* <p>
* Operators have ports that are connected through streams. Ports can be
* mandatory or optional with respect to their need to connect a stream to it.
* Each port can be connected to a single stream only. A stream has to be
* connected to one output port and can go to multiple input ports.
* <p>
* The DAG will be serialized and deployed to the cluster, where it is translated
* into the physical plan.
*
* @since 0.3.2
*/
public class LogicalPlan implements Serializable, DAG
{
/**
* Attribute of input port.
* This is a read-only attribute to query whether the input port is connected to a DelayOperator
* This is for iterative processing.
*/
public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = -2099729915606048704L;
private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class);
// The name under which the application master expects its configuration.
public static final String SER_FILE_NAME = "dt-conf.ser";
public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml";
private static final transient AtomicInteger logicalOperatorSequencer = new AtomicInteger();
public static final String MODULE_NAMESPACE_SEPARATOR = "$";
/**
* Constant
* <code>SUBDIR_CHECKPOINTS="checkpoints"</code>
*/
public static String SUBDIR_CHECKPOINTS = "checkpoints";
/**
* Constant
* <code>SUBDIR_STATS="stats"</code>
*/
public static String SUBDIR_STATS = "stats";
/**
* Constant
* <code>SUBDIR_EVENTS="events"</code>
*/
public static String SUBDIR_EVENTS = "events";
/**
* A flag to specify whether to use the fast publisher or not. This attribute was moved
* from DAGContext. This can be here till the fast publisher is fully tested and working as desired.
* Then it can be moved back to DAGContext.
*/
public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<Boolean>(false);
public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<Long>(604800000l);
public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<Long>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
public static Attribute<String> KEY_TAB_FILE = new Attribute<String>((String) null, new StringCodec.String2String());
public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<Double>(0.7);
/**
* Comma separated list of jar file dependencies to be deployed with the application.
* The launcher will combine the list with built-in dependencies and those specified
* that are made available through the distributed file system to application master
* and child containers.
*/
public static Attribute<String> LIBRARY_JARS = new Attribute<String>(new StringCodec.String2String());
/**
* Comma separated list of archives to be deployed with the application.
* The launcher will include the archives into the final set of resources
* that are made available through the distributed file system to application master
* and child containers.
*/
public static Attribute<String> ARCHIVES = new Attribute<String>(new StringCodec.String2String());
/**
* Comma separated list of files to be deployed with the application. The launcher will include the files into the
* final set of resources that are made available through the distributed file system to application master and child
* containers.
*/
public static Attribute<String> FILES = new Attribute<String>(new StringCodec.String2String());
/**
* The maximum number of containers (excluding the application master) that the application is allowed to request.
* If the DAG plan requires less containers, remaining count won't be allocated from the resource manager.
* Example: DAG with several operators and all streams container local would require one container,
* only one container will be requested from the resource manager.
*/
public static Attribute<Integer> CONTAINERS_MAX_COUNT = new Attribute<Integer>(Integer.MAX_VALUE);
/**
* The application attempt ID from YARN
*/
public static Attribute<Integer> APPLICATION_ATTEMPT_ID = new Attribute<>(1);
static {
Attribute.AttributeMap.AttributeInitializer.initialize(LogicalPlan.class);
}
private final Map<String, StreamMeta> streams = new HashMap<String, StreamMeta>();
private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>();
public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
@Override
public Attribute.AttributeMap getAttributes()
{
return attributes;
}
@Override
public <T> T getValue(Attribute<T> key)
{
T val = attributes.get(key);
if (val == null) {
return key.defaultValue;
}
return val;
}
public LogicalPlan()
{
}
@Override
public void setCounters(Object counters)
{
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void sendMetrics(Collection<String> metricNames)
{
throw new UnsupportedOperationException("Not supported yet.");
}
public final class InputPortMeta implements DAG.InputPortMeta, Serializable
{
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = 1L;
private OperatorMeta operatorMeta;
private String fieldName;
private InputPortFieldAnnotation portAnnotation;
private AppData.QueryPort adqAnnotation;
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
//This is null when port is not hidden
private Class<?> classDeclaringHiddenPort;
public OperatorMeta getOperatorWrapper()
{
return operatorMeta;
}
public String getPortName()
{
return fieldName;
}
public InputPort<?> getPortObject() {
for (Map.Entry<InputPort<?>, InputPortMeta> e : operatorMeta.getPortMapping().inPortMap.entrySet()) {
if (e.getValue() == this) {
return e.getKey();
}
}
throw new AssertionError("Cannot find the port object for " + this);
}
public boolean isAppDataQueryPort()
{
return adqAnnotation != null;
}
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
append("operator", this.operatorMeta).
append("portAnnotation", this.portAnnotation).
append("adqAnnotation", this.adqAnnotation).
append("field", this.fieldName).
toString();
}
@Override
public Attribute.AttributeMap getAttributes()
{
return attributes;
}
@Override
public <T> T getValue(Attribute<T> key)
{
T attr = attributes.get(key);
if (attr == null) {
return key.defaultValue;
}
return attr;
}
@Override
public void setCounters(Object counters)
{
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void sendMetrics(Collection<String> metricNames)
{
throw new UnsupportedOperationException("Not supported yet.");
}
public StreamCodec<?> getStreamCodec()
{
return attributes.get(STREAM_CODEC);
}
void setStreamCodec(StreamCodec<?> streamCodec)
{
if (streamCodec != null) {
StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
if (oldStreamCodec != null && oldStreamCodec != streamCodec) {
LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec);
}
}
}
}
public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
{
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = 201412091633L;
private OperatorMeta operatorMeta;
private OperatorMeta unifierMeta;
private OperatorMeta sliderMeta;
private String fieldName;
private OutputPortFieldAnnotation portAnnotation;
private AppData.ResultPort adrAnnotation;
private final DefaultAttributeMap attributes;
//This is null when port is not hidden
private Class<?> classDeclaringHiddenPort;
public OutputPortMeta()
{
this.attributes = new DefaultAttributeMap();
}
public OperatorMeta getOperatorMeta()
{
return operatorMeta;
}
@Override
public OperatorMeta getUnifierMeta()
{
if (unifierMeta == null) {
unifierMeta = new OperatorMeta(operatorMeta.getName() + '.' + fieldName + "#unifier", getUnifier());
}
return unifierMeta;
}
public OperatorMeta getSlidingUnifier(int numberOfBuckets, int slidingApplicationWindowCount, int numberOfSlidingWindows)
{
if (sliderMeta == null) {
@SuppressWarnings("unchecked")
Slider slider = new Slider((Unifier<Object>) getUnifier(), numberOfBuckets, numberOfSlidingWindows);
try {
sliderMeta = new OperatorMeta(operatorMeta.getName() + '.' + fieldName + "#slider", slider, getUnifierMeta().attributes.clone());
}
catch (CloneNotSupportedException ex) {
throw new RuntimeException(ex);
}
sliderMeta.getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, slidingApplicationWindowCount);
}
return sliderMeta;
}
public String getPortName()
{
return fieldName;
}
public OutputPort<?> getPortObject() {
for (Map.Entry<OutputPort<?>, OutputPortMeta> e : operatorMeta.getPortMapping().outPortMap.entrySet()) {
if (e.getValue() == this) {
return e.getKey();
}
}
throw new AssertionError("Cannot find the port object for " + this);
}
public Operator.Unifier<?> getUnifier() {
for (Map.Entry<OutputPort<?>, OutputPortMeta> e : operatorMeta.getPortMapping().outPortMap.entrySet()) {
if (e.getValue() == this) {
Unifier<?> unifier = e.getKey().getUnifier();
if (unifier == null) {
break;
}
LOG.debug("User supplied unifier is {}", unifier);
return unifier;
}
}
LOG.debug("Using default unifier for {}", this);
return new DefaultUnifier();
}
@Override
public Attribute.AttributeMap getAttributes()
{
return attributes;
}
@Override
public <T> T getValue(Attribute<T> key)
{
T attr = attributes.get(key);
if (attr == null) {
return key.defaultValue;
}
return attr;
}
public boolean isAppDataResultPort()
{
return adrAnnotation != null;
}
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
append("operator", this.operatorMeta).
append("portAnnotation", this.portAnnotation).
append("adrAnnotation", this.adrAnnotation).
append("field", this.fieldName).
toString();
}
@Override
public void setCounters(Object counters)
{
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void sendMetrics(Collection<String> metricNames)
{
throw new UnsupportedOperationException("Not supported yet.");
}
}
/**
* Representation of streams in the logical layer. Instances are created through {@link LogicalPlan#addStream}.
*/
public final class StreamMeta implements DAG.StreamMeta, Serializable
{
private static final long serialVersionUID = 1L;
private Locality locality;
private final List<InputPortMeta> sinks = new ArrayList<InputPortMeta>();
private OutputPortMeta source;
private final String id;
private OperatorMeta persistOperatorForStream;
private InputPortMeta persistOperatorInputPort;
private Set<InputPortMeta> enableSinksForPersisting;
private String persistOperatorName;
public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap;
public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap;
private StreamMeta(String id)
{
this.id = id;
enableSinksForPersisting = new HashSet<InputPortMeta>();
sinkSpecificPersistOperatorMap = new HashMap<LogicalPlan.InputPortMeta, OperatorMeta>();
sinkSpecificPersistInputPortMap = new HashMap<LogicalPlan.InputPortMeta, InputPortMeta>();
}
@Override
public String getName()
{
return id;
}
@Override
public Locality getLocality()
{
return this.locality;
}
@Override
public StreamMeta setLocality(Locality locality)
{
this.locality = locality;
return this;
}
public OutputPortMeta getSource()
{
return source;
}
@Override
public StreamMeta setSource(Operator.OutputPort<?> port)
{
OutputPortMeta portMeta = assertGetPortMeta(port);
OperatorMeta om = portMeta.getOperatorMeta();
if (om.outputStreams.containsKey(portMeta)) {
String msg = String.format("Operator %s already connected to %s", om.name, om.outputStreams.get(portMeta).id);
throw new IllegalArgumentException(msg);
}
this.source = portMeta;
om.outputStreams.put(portMeta, this);
return this;
}
public List<InputPortMeta> getSinks()
{
return sinks;
}
@Override
public StreamMeta addSink(Operator.InputPort<?> port)
{
InputPortMeta portMeta = assertGetPortMeta(port);
OperatorMeta om = portMeta.getOperatorWrapper();
String portName = portMeta.getPortName();
if (om.inputStreams.containsKey(portMeta)) {
throw new IllegalArgumentException(String.format("Port %s already connected to stream %s", portName, om.inputStreams.get(portMeta)));
}
/*
finalizeValidate(portMeta);
*/
sinks.add(portMeta);
om.inputStreams.put(portMeta, this);
rootOperators.remove(portMeta.operatorMeta);
return this;
}
public void remove()
{
for (InputPortMeta ipm : this.sinks) {
ipm.getOperatorWrapper().inputStreams.remove(ipm);
if (ipm.getOperatorWrapper().inputStreams.isEmpty()) {
rootOperators.add(ipm.getOperatorWrapper());
}
}
// Remove persist operator for at stream level if present:
if (getPersistOperator() != null) {
removeOperator(getPersistOperator().getOperator());
}
// Remove persist operators added for specific sinks :
if (!sinkSpecificPersistOperatorMap.isEmpty()) {
for (Entry<InputPortMeta, OperatorMeta> entry : sinkSpecificPersistOperatorMap.entrySet()) {
removeOperator(entry.getValue().getOperator());
}
sinkSpecificPersistOperatorMap.clear();
sinkSpecificPersistInputPortMap.clear();
}
this.sinks.clear();
if (this.source != null) {
this.source.getOperatorMeta().outputStreams.remove(this.source);
}
this.source = null;
streams.remove(this.id);
}
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
append("id", this.id).
toString();
}
@Override
public int hashCode()
{
int hash = 7;
hash = 31 * hash + (this.locality != null ? this.locality.hashCode() : 0);
hash = 31 * hash + (this.source != null ? this.source.hashCode() : 0);
hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0);
return hash;
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final StreamMeta other = (StreamMeta)obj;
if (this.locality != other.locality) {
return false;
}
if (this.sinks != other.sinks && (this.sinks == null || !this.sinks.equals(other.sinks))) {
return false;
}
if (this.source != other.source && (this.source == null || !this.source.equals(other.source))) {
return false;
}
return !((this.id == null) ? (other.id != null) : !this.id.equals(other.id));
}
@Override
public StreamMeta persistUsing(String name, Operator persistOperator, InputPort<?> port)
{
persistOperatorName = name;
enablePersistingForSinksAddedSoFar(persistOperator);
OperatorMeta persistOpMeta = createPersistOperatorMeta(persistOperator);
if (!persistOpMeta.getPortMapping().inPortMap.containsKey(port)) {
String msg = String.format("Port argument %s does not belong to persist operator passed %s", port, persistOperator);
throw new IllegalArgumentException(msg);
}
setPersistOperatorInputPort(persistOpMeta.getPortMapping().inPortMap.get(port));
return this;
}
@Override
public StreamMeta persistUsing(String name, Operator persistOperator)
{
persistOperatorName = name;
enablePersistingForSinksAddedSoFar(persistOperator);
OperatorMeta persistOpMeta = createPersistOperatorMeta(persistOperator);
InputPortMeta port = persistOpMeta.getPortMapping().inPortMap.values().iterator().next();
setPersistOperatorInputPort(port);
return this;
}
private void enablePersistingForSinksAddedSoFar(Operator persistOperator)
{
for (InputPortMeta portMeta : getSinks()) {
enableSinksForPersisting.add(portMeta);
}
}
private OperatorMeta createPersistOperatorMeta(Operator persistOperator)
{
addOperator(persistOperatorName, persistOperator);
OperatorMeta persistOpMeta = getOperatorMeta(persistOperatorName);
setPersistOperator(persistOpMeta);
if (persistOpMeta.getPortMapping().inPortMap.isEmpty()) {
String msg = String.format("Persist operator passed %s has no input ports to connect", persistOperator);
throw new IllegalArgumentException(msg);
}
Map<InputPort<?>, InputPortMeta> inputPortMap = persistOpMeta.getPortMapping().inPortMap;
int nonOptionalInputPortCount = 0;
for (InputPortMeta inputPort : inputPortMap.values()) {
if (inputPort.portAnnotation == null || !inputPort.portAnnotation.optional()) {
// By default input port is non-optional unless specified
nonOptionalInputPortCount++;
}
}
if (nonOptionalInputPortCount > 1) {
String msg = String.format("Persist operator %s has more than 1 non optional input port", persistOperator);
throw new IllegalArgumentException(msg);
}
Map<OutputPort<?>, OutputPortMeta> outputPortMap = persistOpMeta.getPortMapping().outPortMap;
for (OutputPortMeta outPort : outputPortMap.values()) {
if (outPort.portAnnotation != null && !outPort.portAnnotation.optional()) {
// By default output port is optional unless specified
String msg = String.format("Persist operator %s has non optional output port %s", persistOperator, outPort.fieldName);
throw new IllegalArgumentException(msg);
}
}
return persistOpMeta;
}
public OperatorMeta getPersistOperator()
{
return persistOperatorForStream;
}
private void setPersistOperator(OperatorMeta persistOperator)
{
this.persistOperatorForStream = persistOperator;
}
public InputPortMeta getPersistOperatorInputPort()
{
return persistOperatorInputPort;
}
private void setPersistOperatorInputPort(InputPortMeta inport)
{
this.addSink(inport.getPortObject());
this.persistOperatorInputPort = inport;
}
public Set<InputPortMeta> getSinksToPersist()
{
return enableSinksForPersisting;
}
private String getPersistOperatorName(Operator operator)
{
return id + "_persister";
}
private String getPersistOperatorName(InputPort<?> sinkToPersist)
{
InputPortMeta portMeta = assertGetPortMeta(sinkToPersist);
OperatorMeta operatorMeta = portMeta.getOperatorWrapper();
return id + "_" + operatorMeta.getName() + "_persister";
}
@Override
public StreamMeta persistUsing(String name, Operator persistOperator, InputPort<?> port, InputPort<?> sinkToPersist)
{
// When persist Stream is invoked for a specific sink, persist operator can directly be added
String persistOperatorName = name;
addOperator(persistOperatorName, persistOperator);
addSink(port);
InputPortMeta sinkPortMeta = assertGetPortMeta(sinkToPersist);
addStreamCodec(sinkPortMeta, port);
updateSinkSpecificPersistOperatorMap(sinkPortMeta, persistOperatorName, port);
return this;
}
private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
{
StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
if (inputStreamCodec != null) {
Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
setInputPortAttribute(port, STREAM_CODEC, codec);
}
}
private void updateSinkSpecificPersistOperatorMap(InputPortMeta sinkToPersistPortMeta, String persistOperatorName, InputPort<?> persistOperatorInPort)
{
OperatorMeta persistOpMeta = operators.get(persistOperatorName);
this.sinkSpecificPersistOperatorMap.put(sinkToPersistPortMeta, persistOpMeta);
this.sinkSpecificPersistInputPortMap.put(sinkToPersistPortMeta, persistOpMeta.getMeta(persistOperatorInPort));
}
public void resetStreamPersistanceOnSinkRemoval(InputPortMeta sinkBeingRemoved)
{
/*
* If persistStream was enabled for the entire stream and the operator
* to be removed was the only one enabled for persisting, Remove the persist operator
*/
if (enableSinksForPersisting.contains(sinkBeingRemoved)) {
enableSinksForPersisting.remove(sinkBeingRemoved);
if (enableSinksForPersisting.isEmpty()) {
removeOperator(getPersistOperator().getOperator());
setPersistOperator(null);
}
}
// If persisting was added specific to this sink, remove the persist operator
if (sinkSpecificPersistInputPortMap.containsKey(sinkBeingRemoved)) {
sinkSpecificPersistInputPortMap.remove(sinkBeingRemoved);
}
if (sinkSpecificPersistOperatorMap.containsKey(sinkBeingRemoved)) {
OperatorMeta persistOpMeta = sinkSpecificPersistOperatorMap.get(sinkBeingRemoved);
sinkSpecificPersistOperatorMap.remove(sinkBeingRemoved);
removeOperator(persistOpMeta.getOperator());
}
}
}
/**
* Operator meta object.
*/
public final class OperatorMeta implements DAG.OperatorMeta, Serializable
{
private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<InputPortMeta, StreamMeta>();
private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<OutputPortMeta, StreamMeta>();
private final Attribute.AttributeMap attributes;
@SuppressWarnings("unused")
private final int id;
@NotNull
private final String name;
private final OperatorAnnotation operatorAnnotation;
private final LogicalOperatorStatus status;
private transient Integer nindex; // for cycle detection
private transient Integer lowlink; // for cycle detection
private transient Operator operator;
private MetricAggregatorMeta metricAggregatorMeta;
private String moduleName; // Name of the module which has this operator. null if this is a top level operator.
/*
* Used for OIO validation,
* value null => node not visited yet
* other value => represents the root oio node for this node
*/
private transient Integer oioRoot = null;
private OperatorMeta(String name, Operator operator)
{
this(name, operator, new DefaultAttributeMap());
}
private OperatorMeta(String name, Operator operator, Attribute.AttributeMap attributeMap)
{
LOG.debug("Initializing {} as {}", name, operator.getClass().getName());
this.operatorAnnotation = operator.getClass().getAnnotation(OperatorAnnotation.class);
this.name = name;
this.operator = operator;
this.id = logicalOperatorSequencer.decrementAndGet();
this.status = new LogicalOperatorStatus(name);
this.attributes = attributeMap;
}
@Override
public String getName()
{
return name;
}
@Override
public Attribute.AttributeMap getAttributes()
{
return attributes;
}
@Override
public <T> T getValue(Attribute<T> key)
{
T attr = attributes.get(key);
if (attr == null) {
attr = LogicalPlan.this.getValue(key);
}
if(attr == null){
return key.defaultValue;
}
return attr;
}
public LogicalOperatorStatus getStatus()
{
return status;
}
private void writeObject(ObjectOutputStream out) throws IOException
{
//getValue2(OperatorContext.STORAGE_AGENT).save(operator, id, Checkpoint.STATELESS_CHECKPOINT_WINDOW_ID);
out.defaultWriteObject();
FSStorageAgent.store(out, operator);
}
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
{
input.defaultReadObject();
// TODO: not working because - we don't have the storage agent in parent attribuet map
//operator = (Operator)getValue2(OperatorContext.STORAGE_AGENT).load(id, Checkpoint.STATELESS_CHECKPOINT_WINDOW_ID);
operator = (Operator)FSStorageAgent.retrieve(input);
}
@Override
public void setCounters(Object counters)
{
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void sendMetrics(Collection<String> metricNames)
{
throw new UnsupportedOperationException("Not supported yet.");
}
public MetricAggregatorMeta getMetricAggregatorMeta()
{
return metricAggregatorMeta;
}
public String getModuleName()
{
return moduleName;
}
public void setModuleName(String moduleName)
{
this.moduleName = moduleName;
}
protected void populateAggregatorMeta()
{
AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR);
if (aggregator == null && operator instanceof AutoMetric.Aggregator) {
aggregator = new MetricAggregatorMeta.MetricsAggregatorProxy(this);
}
if (aggregator == null) {
MetricsAggregator defAggregator = null;
Set<String> metricNames = Sets.newHashSet();
for (Field field : ReflectionUtils.getDeclaredFieldsIncludingInherited(operator.getClass())) {
if (field.isAnnotationPresent(AutoMetric.class)) {
metricNames.add(field.getName());
if (field.getType() == int.class || field.getType() == Integer.class ||
field.getType() == long.class || field.getType() == Long.class) {
if (defAggregator == null) {
defAggregator = new MetricsAggregator();
}
defAggregator.addAggregators(field.getName(), new SingleMetricAggregator[]{new LongSumAggregator()});
}
else if (field.getType() == float.class || field.getType() == Float.class ||
field.getType() == double.class || field.getType() == Double.class) {
if (defAggregator == null) {
defAggregator = new MetricsAggregator();
}
defAggregator.addAggregators(field.getName(), new SingleMetricAggregator[]{new DoubleSumAggregator()});
}
}
}
try {
for (PropertyDescriptor pd : Introspector.getBeanInfo(operator.getClass()).getPropertyDescriptors()) {
Method readMethod = pd.getReadMethod();
if (readMethod != null) {
AutoMetric rfa = readMethod.getAnnotation(AutoMetric.class);
if (rfa != null) {
String propName = pd.getName();
if (metricNames.contains(propName)) {
continue;
}
if (readMethod.getReturnType() == int.class || readMethod.getReturnType() == Integer.class ||
readMethod.getReturnType() == long.class || readMethod.getReturnType() == Long.class) {
if (defAggregator == null) {
defAggregator = new MetricsAggregator();
}
defAggregator.addAggregators(propName, new SingleMetricAggregator[]{new LongSumAggregator()});
} else if (readMethod.getReturnType() == float.class || readMethod.getReturnType() == Float.class ||
readMethod.getReturnType() == double.class || readMethod.getReturnType() == Double.class) {
if (defAggregator == null) {
defAggregator = new MetricsAggregator();
}
defAggregator.addAggregators(propName, new SingleMetricAggregator[]{new DoubleSumAggregator()});
}
}
}
}
} catch (IntrospectionException e) {
throw new RuntimeException("finding methods", e);
}
if (defAggregator != null) {
aggregator = defAggregator;
}
}
this.metricAggregatorMeta = new MetricAggregatorMeta(aggregator,
getValue(OperatorContext.METRICS_DIMENSIONS_SCHEME));
}
/**
* Copy attribute from source attributeMap to destination attributeMap.
*
* @param dest destination attribute map.
* @param source source attribute map.
*/
private void copyAttributes(AttributeMap dest, AttributeMap source)
{
for (Entry<Attribute<?>, ?> a : source.entrySet()) {
dest.put((Attribute<Object>)a.getKey(), a.getValue());
}
}
/**
* Copy attribute of operator and port from provided operatorMeta. This function requires
* operatorMeta argument is for the same operator.
*
* @param operatorMeta copy attribute from this OperatorMeta to the object.
*/
private void copyAttributesFrom(OperatorMeta operatorMeta)
{
if (operator != operatorMeta.getOperator()) {
throw new IllegalArgumentException("Operator meta is not for the same operator ");
}
// copy operator attributes
copyAttributes(attributes, operatorMeta.getAttributes());
// copy Input port attributes
for (Map.Entry<InputPort<?>, InputPortMeta> entry : operatorMeta.getPortMapping().inPortMap.entrySet()) {
copyAttributes(getPortMapping().inPortMap.get(entry.getKey()).attributes, entry.getValue().attributes);
}
// copy Output port attributes
for (Map.Entry<OutputPort<?>, OutputPortMeta> entry : operatorMeta.getPortMapping().outPortMap.entrySet()) {
copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).attributes, entry.getValue().attributes);
}
}
private class PortMapping implements Operators.OperatorDescriptor
{
private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new HashMap<Operator.InputPort<?>, InputPortMeta>();
private final Map<Operator.OutputPort<?>, OutputPortMeta> outPortMap = new HashMap<Operator.OutputPort<?>, OutputPortMeta>();
private final Map<String, Object> portNameMap = new HashMap<String, Object>();
@Override
public void addInputPort(InputPort<?> portObject, Field field, InputPortFieldAnnotation portAnnotation, AppData.QueryPort adqAnnotation)
{
if (!OperatorMeta.this.inputStreams.isEmpty()) {
for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> e : OperatorMeta.this.inputStreams.entrySet()) {
LogicalPlan.InputPortMeta pm = e.getKey();
if (pm.operatorMeta == OperatorMeta.this && pm.fieldName.equals(field.getName())) {
//LOG.debug("Found existing port meta for: " + field);
inPortMap.put(portObject, pm);
markInputPortIfHidden(pm.getPortName(), pm, field.getDeclaringClass());
return;
}
}
}
InputPortMeta metaPort = new InputPortMeta();
metaPort.operatorMeta = OperatorMeta.this;
metaPort.fieldName = field.getName();
metaPort.portAnnotation = portAnnotation;
metaPort.adqAnnotation = adqAnnotation;
inPortMap.put(portObject, metaPort);
markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
if (metaPort.getStreamCodec() == null) {
metaPort.setStreamCodec(portObject.getStreamCodec());
} else if (portObject.getStreamCodec() != null) {
LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(),
portObject.getStreamCodec(), metaPort.getStreamCodec());
}
}
@Override
public void addOutputPort(OutputPort<?> portObject, Field field, OutputPortFieldAnnotation portAnnotation, AppData.ResultPort adrAnnotation)
{
if (!OperatorMeta.this.outputStreams.isEmpty()) {
for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> e : OperatorMeta.this.outputStreams.entrySet()) {
LogicalPlan.OutputPortMeta pm = e.getKey();
if (pm.operatorMeta == OperatorMeta.this && pm.fieldName.equals(field.getName())) {
//LOG.debug("Found existing port meta for: " + field);
outPortMap.put(portObject, pm);
markOutputPortIfHidden(pm.getPortName(), pm, field.getDeclaringClass());
return;
}
}
}
OutputPortMeta metaPort = new OutputPortMeta();
metaPort.operatorMeta = OperatorMeta.this;
metaPort.fieldName = field.getName();
metaPort.portAnnotation = portAnnotation;
metaPort.adrAnnotation = adrAnnotation;
outPortMap.put(portObject, metaPort);
markOutputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
}
private void markOutputPortIfHidden(String portName, OutputPortMeta portMeta, Class<?> declaringClass)
{
if (!portNameMap.containsKey(portName)) {
portNameMap.put(portName, portMeta);
} else {
// make the port optional
portMeta.classDeclaringHiddenPort = declaringClass;
}
}
private void markInputPortIfHidden(String portName, InputPortMeta portMeta, Class<?> declaringClass)
{
if (!portNameMap.containsKey(portName)) {
portNameMap.put(portName, portMeta);
} else {
// make the port optional
portMeta.classDeclaringHiddenPort = declaringClass;
}
}
}
/**
* Ports objects are transient, we keep a lazy initialized mapping
*/
private transient PortMapping portMapping = null;
private PortMapping getPortMapping()
{
if (this.portMapping == null) {
this.portMapping = new PortMapping();
Operators.describe(this.getOperator(), portMapping);
}
return portMapping;
}
@Override
public OutputPortMeta getMeta(Operator.OutputPort<?> port)
{
return getPortMapping().outPortMap.get(port);
}
@Override
public InputPortMeta getMeta(Operator.InputPort<?> port)
{
return getPortMapping().inPortMap.get(port);
}
public Map<InputPortMeta, StreamMeta> getInputStreams()
{
return this.inputStreams;
}
public Map<OutputPortMeta, StreamMeta> getOutputStreams()
{
return this.outputStreams;
}
@Override
public Operator getOperator()
{
return operator;
}
public LogicalPlan getDAG()
{
return LogicalPlan.this;
}
@Override
public String toString()
{
return "OperatorMeta{" + "name=" + name + ", operator=" + operator + ", attributes=" + attributes + '}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof OperatorMeta)) {
return false;
}
OperatorMeta that = (OperatorMeta) o;
if (attributes != null ? !attributes.equals(that.attributes) : that.attributes != null) {
return false;
}
if (!name.equals(that.name)) {
return false;
}
if (operatorAnnotation != null ? !operatorAnnotation.equals(that.operatorAnnotation) : that.operatorAnnotation != null) {
return false;
}
return !(operator != null ? !operator.equals(that.operator) : that.operator != null);
}
@Override
public int hashCode()
{
return name.hashCode();
}
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = 201401091635L;
}
@Override
public <T extends Operator> T addOperator(String name, Class<T> clazz)
{
T instance;
try {
instance = clazz.newInstance();
} catch (Exception ex) {
throw new IllegalArgumentException(ex);
}
addOperator(name, instance);
return instance;
}
@Override
public <T extends Operator> T addOperator(String name, T operator)
{
if (operators.containsKey(name)) {
if (operators.get(name).operator == operator) {
return operator;
}
throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
}
// Avoid name conflict with module.
if (modules.containsKey(name)) {
throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
}
OperatorMeta decl = new OperatorMeta(name, operator);
rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
operators.put(name, decl);
return operator;
}
/**
* Module meta object.
*/
public final class ModuleMeta implements DAG.ModuleMeta, Serializable
{
private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
private final Attribute.AttributeMap attributes;
@NotNull
private String name;
private transient Module module;
private ModuleMeta parent;
private LogicalPlan dag = null;
private transient String fullName;
private ModuleMeta(String name, Module module)
{
LOG.debug("Initializing {} as {}", name, module.getClass().getName());
this.name = name;
this.module = module;
this.attributes = new DefaultAttributeMap();
this.dag = new LogicalPlan();
}
@Override
public String getName()
{
return name;
}
@Override
public Module getModule()
{
return module;
}
@Override
public Attribute.AttributeMap getAttributes()
{
return attributes;
}
@Override
public <T> T getValue(Attribute<T> key)
{
return attributes.get(key);
}
@Override
public void setCounters(Object counters)
{
}
@Override
public void sendMetrics(Collection<String> metricNames)
{
}
public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams()
{
return inputStreams;
}
public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams()
{
return outputStreams;
}
public LogicalPlan getDag()
{
return dag;
}
private void writeObject(ObjectOutputStream out) throws IOException
{
out.defaultWriteObject();
FSStorageAgent.store(out, module);
}
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
{
input.defaultReadObject();
module = (Module)FSStorageAgent.retrieve(input);
}
/**
* Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully
* with all its submodules also expanded. The parentDAG contains the operator added by all the modules.
*
* @param parentDAG parent dag to populate with operators from this and inner modules.
* @param conf configuration object.
*/
public void flattenModule(LogicalPlan parentDAG, Configuration conf)
{
module.populateDAG(dag, conf);
for (ModuleMeta subModuleMeta : dag.getAllModules()) {
subModuleMeta.setParent(this);
subModuleMeta.flattenModule(dag, conf);
}
dag.applyStreamLinks();
parentDAG.addDAGToCurrentDAG(this);
}
/**
* Return full name of the module. If this is a inner module, i.e module inside of module this method will traverse
* till the top level module, and construct the name by concatenating name of modules in the chain in reverse order
* separated by MODULE_NAMESPACE_SEPARATO.
*
* For example If there is module M1, which adds another module M2 in the DAG. Then the full name of the module M2
* is ("M1" ++ MODULE_NAMESPACE_SEPARATO + "M2")
*
* @return full name of the module.
*/
public String getFullName()
{
if (fullName != null) {
return fullName;
}
if (parent == null) {
fullName = name;
} else {
fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name;
}
return fullName;
}
private void setParent(ModuleMeta meta)
{
this.parent = meta;
}
private static final long serialVersionUID = 7562277769188329223L;
}
@Override
public <T extends Module> T addModule(String name, T module)
{
if (modules.containsKey(name)) {
if (modules.get(name).module == module) {
return module;
}
throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
}
if (operators.containsKey(name)) {
throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
}
ModuleMeta meta = new ModuleMeta(name, module);
modules.put(name, meta);
return module;
}
@Override
public <T extends Module> T addModule(String name, Class<T> clazz)
{
T instance;
try {
instance = clazz.newInstance();
} catch (Exception ex) {
throw new IllegalArgumentException(ex);
}
addModule(name, instance);
return instance;
}
public void removeOperator(Operator operator)
{
OperatorMeta om = getMeta(operator);
if (om == null) {
return;
}
Map<InputPortMeta, StreamMeta> inputStreams = om.getInputStreams();
for (Map.Entry<InputPortMeta, StreamMeta> e : inputStreams.entrySet()) {
StreamMeta stream = e.getValue();
if (e.getKey().getOperatorWrapper() == om) {
stream.sinks.remove(e.getKey());
}
// If persistStream was enabled for stream, reset stream when sink removed
stream.resetStreamPersistanceOnSinkRemoval(e.getKey());
}
this.operators.remove(om.getName());
rootOperators.remove(om);
}
@Override
public StreamMeta addStream(String id)
{
StreamMeta s = new StreamMeta(id);
StreamMeta o = streams.put(id, s);
if (o == null) {
return s;
}
throw new IllegalArgumentException("duplicate stream id: " + o);
}
@Override
@SuppressWarnings("unchecked")
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
{
StreamMeta s = addStream(id);
id = s.id;
ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create();
if (!(source instanceof ProxyOutputPort)) {
s.setSource(source);
}
for (Operator.InputPort<?> sink : sinks) {
if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
streamMap.put(source, sink);
streamLinks.put(id, streamMap);
} else {
if (s.getSource() == null) {
s.setSource(source);
}
s.addSink(sink);
}
}
return s;
}
/**
* This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with
* the actual ports that they refer to This method adds sources and sinks for the StreamMeta objects which were left
* empty in the addStream call.
*/
public void applyStreamLinks()
{
for (String id : streamLinks.keySet()) {
StreamMeta s = getStream(id);
for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
if (s.getSource() == null) {
Operator.OutputPort<?> outputPort = pair.getKey();
while (outputPort instanceof ProxyOutputPort) {
outputPort = ((ProxyOutputPort<?>)outputPort).get();
}
s.setSource(outputPort);
}
Operator.InputPort<?> inputPort = pair.getValue();
while (inputPort instanceof ProxyInputPort) {
inputPort = ((ProxyInputPort<?>)inputPort).get();
}
s.addSink(inputPort);
}
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
{
LogicalPlan subDag = moduleMeta.getDag();
String subDAGName = moduleMeta.getName();
String name;
for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
Operator op = this.addOperator(name, operatorMeta.getOperator());
OperatorMeta operatorMetaNew = this.getMeta(op);
operatorMetaNew.copyAttributesFrom(operatorMeta);
operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName :
subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
}
for (StreamMeta streamMeta : subDag.getAllStreams()) {
OutputPortMeta sourceMeta = streamMeta.getSource();
List<InputPort<?>> ports = new LinkedList<>();
for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
ports.add(inputPortMeta.getPortObject());
}
InputPort[] inputPorts = ports.toArray(new InputPort[]{});
name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
streamMetaNew.setLocality(streamMeta.getLocality());
}
}
@Override
@SuppressWarnings("unchecked")
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1)
{
@SuppressWarnings("rawtypes")
InputPort[] ports = new Operator.InputPort[]{sink1};
return addStream(id, source, ports);
}
@Override
@SuppressWarnings("unchecked")
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2)
{
@SuppressWarnings("rawtypes")
InputPort[] ports = new Operator.InputPort[] {sink1, sink2};
return addStream(id, source, ports);
}
public StreamMeta getStream(String id)
{
return this.streams.get(id);
}
/**
* Set attribute for the operator. For valid attributes, see {
*
* @param operator
* @return AttributeMap
*/
public Attribute.AttributeMap getContextAttributes(Operator operator)
{
return getMeta(operator).attributes;
}
@Override
public <T> void setAttribute(Attribute<T> key, T value)
{
this.getAttributes().put(key, value);
}
@Override
public <T> void setAttribute(Operator operator, Attribute<T> key, T value)
{
this.getMeta(operator).attributes.put(key, value);
}
private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port)
{
for (OperatorMeta o : getAllOperators()) {
OutputPortMeta opm = o.getPortMapping().outPortMap.get(port);
if (opm != null) {
return opm;
}
}
throw new IllegalArgumentException("Port is not associated to any operator in the DAG: " + port);
}
private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port)
{
for (OperatorMeta o : getAllOperators()) {
InputPortMeta opm = o.getPortMapping().inPortMap.get(port);
if (opm != null) {
return opm;
}
}
throw new IllegalArgumentException("Port is not associated to any operator in the DAG: " + port);
}
@Override
public <T> void setOutputPortAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value)
{
assertGetPortMeta(port).attributes.put(key, value);
}
@Override
public <T> void setUnifierAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value)
{
assertGetPortMeta(port).getUnifierMeta().attributes.put(key, value);
}
@Override
public <T> void setInputPortAttribute(Operator.InputPort<?> port, Attribute<T> key, T value)
{
assertGetPortMeta(port).attributes.put(key, value);
}
public List<OperatorMeta> getRootOperators()
{
return Collections.unmodifiableList(this.rootOperators);
}
public Collection<OperatorMeta> getAllOperators()
{
return Collections.unmodifiableCollection(this.operators.values());
}
public Collection<ModuleMeta> getAllModules()
{
return Collections.unmodifiableCollection(this.modules.values());
}
public Collection<StreamMeta> getAllStreams()
{
return Collections.unmodifiableCollection(this.streams.values());
}
@Override
public OperatorMeta getOperatorMeta(String operatorName)
{
return this.operators.get(operatorName);
}
@Override
public ModuleMeta getModuleMeta(String moduleName)
{
return this.modules.get(moduleName);
}
@Override
public OperatorMeta getMeta(Operator operator)
{
// TODO: cache mapping
for (OperatorMeta o: getAllOperators()) {
if (o.operator == operator) {
return o;
}
}
throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
}
@Override
public ModuleMeta getMeta(Module module)
{
for (ModuleMeta m : getAllModules()) {
if (m.module == module) {
return m;
}
}
throw new IllegalArgumentException("Module not associated with the DAG: " + module);
}
public int getMaxContainerCount()
{
return this.getValue(CONTAINERS_MAX_COUNT);
}
public boolean isDebug()
{
return this.getValue(DEBUG);
}
public int getMasterMemoryMB()
{
return this.getValue(MASTER_MEMORY_MB);
}
public String getMasterJVMOptions()
{
return this.getValue(CONTAINER_JVM_OPTIONS);
}
public String assertAppPath()
{
String path = getAttributes().get(LogicalPlan.APPLICATION_PATH);
if (path == null) {
throw new AssertionError("Missing " + LogicalPlan.APPLICATION_PATH);
}
return path;
}
/**
* Class dependencies for the topology. Used to determine jar file dependencies.
*
* @return Set<String>
*/
public Set<String> getClassNames()
{
Set<String> classNames = new HashSet<String>();
for (OperatorMeta n: this.operators.values()) {
String className = n.getOperator().getClass().getName();
if (className != null) {
classNames.add(className);
}
}
for (StreamMeta n: this.streams.values()) {
for (InputPortMeta sink : n.getSinks()) {
StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC);
if (streamCodec != null) {
classNames.add(streamCodec.getClass().getName());
} else {
StreamCodec<?> codec = sink.getPortObject().getStreamCodec();
if (codec != null) {
classNames.add(codec.getClass().getName());
}
}
}
}
return classNames;
}
public static class ValidationContext
{
public int nodeIndex = 0;
public Stack<OperatorMeta> stack = new Stack<OperatorMeta>();
public Stack<OperatorMeta> path = new Stack<OperatorMeta>();
public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<>();
public OperatorMeta invalidLoopAt;
public List<Set<OperatorMeta>> invalidCycles = new ArrayList<>();
}
public void resetNIndex()
{
for (OperatorMeta om : getAllOperators()) {
om.lowlink = null;
om.nindex = null;
}
}
/**
* Validate the plan. Includes checks that required ports are connected,
* required configuration parameters specified, graph free of cycles etc.
*
* @throws ConstraintViolationException
*/
public void validate() throws ConstraintViolationException
{
ValidatorFactory factory =
Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();
checkAttributeValueSerializable(this.getAttributes(), DAG.class.getName());
// clear oioRoot values in all operators
for (OperatorMeta n: operators.values()) {
n.oioRoot = null;
}
// clear visited on all operators
for (OperatorMeta n: operators.values()) {
n.nindex = null;
n.lowlink = null;
// validate configuration
Set<ConstraintViolation<Operator>> constraintViolations = validator.validate(n.getOperator());
if (!constraintViolations.isEmpty()) {
Set<ConstraintViolation<?>> copySet = new HashSet<ConstraintViolation<?>>(constraintViolations.size());
// workaround bug in ConstraintViolationException constructor
// (should be public <T> ConstraintViolationException(String message, Set<ConstraintViolation<T>> constraintViolations) { ... })
for (ConstraintViolation<Operator> cv: constraintViolations) {
copySet.add(cv);
}
throw new ConstraintViolationException("Operator " + n.getName() + " violates constraints " + copySet, copySet);
}
OperatorMeta.PortMapping portMapping = n.getPortMapping();
checkAttributeValueSerializable(n.getAttributes(), n.getName());
// Check operator annotation
if (n.operatorAnnotation != null) {
// Check if partition property of the operator is being honored
if (!n.operatorAnnotation.partitionable()) {
// Check if any of the input ports have partition attributes set
for (InputPortMeta pm: portMapping.inPortMap.values()) {
Boolean paralellPartition = pm.getValue(PortContext.PARTITION_PARALLEL);
if (paralellPartition) {
throw new ValidationException("Operator " + n.getName() + " is not partitionable but PARTITION_PARALLEL attribute is set");
}
}
// Check if the operator implements Partitioner
if (n.getValue(OperatorContext.PARTITIONER) != null
|| n.attributes != null && !n.attributes.contains(OperatorContext.PARTITIONER) && Partitioner.class.isAssignableFrom(n.getOperator().getClass())) {
throw new ValidationException("Operator " + n.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!");
}
}
//If operator can not be check-pointed in middle of application window then the checkpoint window count should be
// a multiple of application window count
if (!n.operatorAnnotation.checkpointableWithinAppWindow()) {
if (n.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT) % n.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 0) {
throw new ValidationException("Operator " + n.getName() + " cannot be check-pointed between an application window " +
"but the checkpoint-window-count " + n.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT) +
" is not a multiple application-window-count " + n.getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
}
}
}
// check that non-optional ports are connected
for (InputPortMeta pm: portMapping.inPortMap.values()) {
checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
StreamMeta sm = n.inputStreams.get(pm);
if (sm == null) {
if ((pm.portAnnotation == null || !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
throw new ValidationException("Input port connection required: " + n.name + "." + pm.getPortName());
}
} else {
if (pm.classDeclaringHiddenPort != null) {
throw new ValidationException(String.format("Invalid port connected: %s.%s is hidden by %s.%s", pm.classDeclaringHiddenPort.getName(),
pm.getPortName(), pm.operatorMeta.getOperator().getClass().getName(), pm.getPortName()));
}
// check locality constraints
DAG.Locality locality = sm.getLocality();
if (locality == DAG.Locality.THREAD_LOCAL) {
if (n.inputStreams.size() > 1) {
validateThreadLocal(n);
}
}
if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
//since schema is required, the port attribute TUPLE_CLASS should be present
if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {
throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
}
}
}
}
boolean allPortsOptional = true;
for (OutputPortMeta pm: portMapping.outPortMap.values()) {
checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
if (!n.outputStreams.containsKey(pm)) {
if ((pm.portAnnotation != null && !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
}
} else {
//port is connected
if (pm.classDeclaringHiddenPort != null) {
throw new ValidationException(String.format("Invalid port connected: %s.%s is hidden by %s.%s", pm.classDeclaringHiddenPort.getName(),
pm.getPortName(), pm.operatorMeta.getOperator().getClass().getName(), pm.getPortName()));
}
if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
//since schema is required, the port attribute TUPLE_CLASS should be present
if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {
throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
}
}
}
allPortsOptional &= (pm.portAnnotation != null && pm.portAnnotation.optional());
}
if (!allPortsOptional && n.outputStreams.isEmpty()) {
throw new ValidationException("At least one output port must be connected: " + n.name);
}
}
ValidationContext validatonContext = new ValidationContext();
for (OperatorMeta n: operators.values()) {
if (n.nindex == null) {
findStronglyConnected(n, validatonContext);
}
}
if (!validatonContext.invalidCycles.isEmpty()) {
throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles);
}
List<List<String>> invalidDelays = new ArrayList<>();
for (OperatorMeta n : rootOperators) {
findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>());
}
if (!invalidDelays.isEmpty()) {
throw new ValidationException("Invalid delays in graph: " + invalidDelays);
}
for (StreamMeta s: streams.values()) {
if (s.source == null) {
throw new ValidationException("Stream source not connected: " + s.getName());
}
if (s.sinks.isEmpty()) {
throw new ValidationException("Stream sink not connected: " + s.getName());
}
}
// Validate root operators are input operators
for (OperatorMeta om : this.rootOperators) {
if (!(om.getOperator() instanceof InputOperator)) {
throw new ValidationException(String.format("Root operator: %s is not a Input operator",
om.getName()));
}
}
// processing mode
Set<OperatorMeta> visited = Sets.newHashSet();
for (OperatorMeta om : this.rootOperators) {
validateProcessingMode(om, visited);
}
}
private void checkAttributeValueSerializable(AttributeMap attributes, String context)
{
StringBuilder sb = new StringBuilder();
String delim = "";
// Check all attributes got operator are serializable
for (Entry<Attribute<?>, Object> entry : attributes.entrySet()) {
if (entry.getValue() != null && !(entry.getValue() instanceof Serializable)) {
sb.append(delim).append(entry.getKey().getSimpleName());
delim = ", ";
}
}
if (sb.length() > 0) {
throw new ValidationException("Attribute value(s) for " + sb.toString() + " in " + context + " are not serializable");
}
}
/*
* Validates OIO constraints for operators with more than one input streams
* For a node to be OIO,
* 1. all its input streams should be OIO
* 2. all its input streams should have OIO from single source node
*/
private void validateThreadLocal(OperatorMeta om) {
Integer oioRoot = null;
// already visited and validated
if (om.oioRoot != null) {
return;
}
if (om.getOperator() instanceof Operator.DelayOperator) {
String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
throw new ValidationException(msg);
}
for (StreamMeta sm: om.inputStreams.values()){
// validation fail as each input stream should be OIO
if (sm.locality != Locality.THREAD_LOCAL){
String msg = String.format("Locality %s invalid for operator %s with multiple input streams as at least one of the input streams is not %s",
Locality.THREAD_LOCAL, om, Locality.THREAD_LOCAL);
throw new ValidationException(msg);
}
if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta);
throw new ValidationException(msg);
}
// gets oio root for input operator for the stream
Integer oioStreamRoot = getOioRoot(sm.source.operatorMeta);
// validation fail as each input stream should have a common OIO root
if (om.oioRoot != null && oioStreamRoot != om.oioRoot){
String msg = String.format("Locality %s invalid for operator %s with multiple input streams as at least one of the input streams is not originating from common OIO owner node",
Locality.THREAD_LOCAL, om, Locality.THREAD_LOCAL);
throw new ValidationException(msg);
}
// populate oioRoot with root OIO node id for first stream, then validate for subsequent streams to have same root OIO node
if (oioRoot == null) {
oioRoot = oioStreamRoot;
} else if (oioRoot.intValue() != oioStreamRoot.intValue()) {
String msg = String.format("Locality %s invalid for operator %s with multiple input streams as they origin from different owner OIO operators", sm.locality, om);
throw new ValidationException(msg);
}
}
om.oioRoot = oioRoot;
}
/**
* Helper method for validateThreadLocal method, runs recursively
* For a given operator, visits all upstream operators in DFS, validates and marks them as visited
* returns hashcode of owner oio node if it exists, else returns hashcode of the supplied node
*/
private Integer getOioRoot(OperatorMeta om) {
// operators which were already marked a visited
if (om.oioRoot != null){
return om.oioRoot;
}
// operators which were not visited before
switch (om.inputStreams.size()) {
case 1:
StreamMeta sm = om.inputStreams.values().iterator().next();
if (sm.locality == Locality.THREAD_LOCAL) {
om.oioRoot = getOioRoot(sm.source.operatorMeta);
}
else {
om.oioRoot = om.hashCode();
}
break;
case 0:
om.oioRoot = om.hashCode();
break;
default:
validateThreadLocal(om);
}
return om.oioRoot;
}
/**
* Check for cycles in the graph reachable from start node n. This is done by
* attempting to find strongly connected components.
*
* @see <a href="http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm">http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm</a>
*
* @param om
* @param ctx
*/
public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
{
om.nindex = ctx.nodeIndex;
om.lowlink = ctx.nodeIndex;
ctx.nodeIndex++;
ctx.stack.push(om);
ctx.path.push(om);
// depth first successors traversal
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink: downStream.sinks) {
OperatorMeta successor = sink.getOperatorWrapper();
if (successor == null) {
continue;
}
// check for self referencing node
if (om == successor) {
ctx.invalidCycles.add(Collections.singleton(om));
}
if (successor.nindex == null) {
// not visited yet
findStronglyConnected(successor, ctx);
om.lowlink = Math.min(om.lowlink, successor.lowlink);
}
else if (ctx.stack.contains(successor)) {
om.lowlink = Math.min(om.lowlink, successor.nindex);
boolean isDelayLoop = false;
for (int i=ctx.stack.size(); i>0; i--) {
OperatorMeta om2 = ctx.stack.get(i-1);
if (om2.getOperator() instanceof Operator.DelayOperator) {
isDelayLoop = true;
}
if (om2 == successor) {
break;
}
}
if (!isDelayLoop) {
ctx.invalidLoopAt = successor;
}
}
}
}
// pop stack for all root operators
if (om.lowlink.equals(om.nindex)) {
Set<OperatorMeta> connectedSet = new LinkedHashSet<>(ctx.stack.size());
while (!ctx.stack.isEmpty()) {
OperatorMeta n2 = ctx.stack.pop();
connectedSet.add(n2);
if (n2 == om) {
break; // collected all connected operators
}
}
// strongly connected (cycle) if more than one node in stack
if (connectedSet.size() > 1) {
ctx.stronglyConnected.add(connectedSet);
if (connectedSet.contains(ctx.invalidLoopAt)) {
ctx.invalidCycles.add(connectedSet);
}
}
}
ctx.path.pop();
}
public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack)
{
stack.push(om);
// depth first successors traversal
boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
if (isDelayOperator) {
if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
LOG.debug("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
invalidDelays.add(Collections.singletonList(om.getName()));
}
}
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink : downStream.sinks) {
OperatorMeta successor = sink.getOperatorWrapper();
if (isDelayOperator) {
sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
// Check whether all downstream operators are already visited in the path
if (successor != null && !stack.contains(successor)) {
LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
om.getName(), downStream.getSource().getPortName(), successor.getName(), sink.getPortName());
invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
}
} else {
findInvalidDelays(successor, invalidDelays, stack);
}
}
}
stack.pop();
}
private void validateProcessingMode(OperatorMeta om, Set<OperatorMeta> visited)
{
for (StreamMeta is : om.getInputStreams().values()) {
if (!visited.contains(is.getSource().getOperatorMeta())) {
// process all inputs first
return;
}
}
visited.add(om);
Operator.ProcessingMode pm = om.getValue(OperatorContext.PROCESSING_MODE);
for (StreamMeta os : om.outputStreams.values()) {
for (InputPortMeta sink: os.sinks) {
OperatorMeta sinkOm = sink.getOperatorWrapper();
Operator.ProcessingMode sinkPm = sinkOm.attributes == null? null: sinkOm.attributes.get(OperatorContext.PROCESSING_MODE);
if (sinkPm == null) {
// If the source processing mode is AT_MOST_ONCE and a processing mode is not specified for the sink then set it to AT_MOST_ONCE as well
if (Operator.ProcessingMode.AT_MOST_ONCE.equals(pm)) {
LOG.warn("Setting processing mode for operator {} to {}", sinkOm.getName(), pm);
sinkOm.getAttributes().put(OperatorContext.PROCESSING_MODE, pm);
} else if (Operator.ProcessingMode.EXACTLY_ONCE.equals(pm)) {
// If the source processing mode is EXACTLY_ONCE and a processing mode is not specified for the sink then throw a validation error
String msg = String.format("Processing mode for %s should be AT_MOST_ONCE for source %s/%s", sinkOm.getName(), om.getName(), pm);
throw new ValidationException(msg);
}
} else {
/*
* If the source processing mode is AT_MOST_ONCE and the processing mode for the sink is not AT_MOST_ONCE throw a validation error
* If the source processing mode is EXACTLY_ONCE and the processing mode for the sink is not AT_MOST_ONCE throw a validation error
*/
if ((Operator.ProcessingMode.AT_MOST_ONCE.equals(pm) && (sinkPm != pm))
|| (Operator.ProcessingMode.EXACTLY_ONCE.equals(pm) && !Operator.ProcessingMode.AT_MOST_ONCE.equals(sinkPm))) {
String msg = String.format("Processing mode %s/%s not valid for source %s/%s", sinkOm.getName(), sinkPm, om.getName(), pm);
throw new ValidationException(msg);
}
}
validateProcessingMode(sinkOm, visited);
}
}
}
public static void write(DAG dag, OutputStream os) throws IOException
{
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(dag);
}
public static LogicalPlan read(InputStream is) throws IOException, ClassNotFoundException
{
return (LogicalPlan)new ObjectInputStream(is).readObject();
}
public static Type getPortType(Field f)
{
if (f.getGenericType() instanceof ParameterizedType) {
ParameterizedType t = (ParameterizedType)f.getGenericType();
//LOG.debug("Field type is parameterized: " + Arrays.asList(t.getActualTypeArguments()));
//LOG.debug("rawType: " + t.getRawType()); // the port class
Type typeArgument = t.getActualTypeArguments()[0];
if (typeArgument instanceof Class) {
return typeArgument;
}
else if (typeArgument instanceof TypeVariable) {
TypeVariable<?> tv = (TypeVariable<?>)typeArgument;
LOG.debug("bounds: " + Arrays.asList(tv.getBounds()));
// variable may contain other variables, java.util.Map<java.lang.String, ? extends T2>
return tv.getBounds()[0];
}
else if (typeArgument instanceof GenericArrayType) {
LOG.debug("type {} is of GenericArrayType", typeArgument);
return typeArgument;
}
else if (typeArgument instanceof WildcardType) {
LOG.debug("type {} is of WildcardType", typeArgument);
return typeArgument;
}
else if (typeArgument instanceof ParameterizedType) {
return typeArgument;
}
else {
LOG.error("Type argument is of expected type {}", typeArgument);
return null;
}
}
else {
// ports are always parameterized
LOG.error("No type variable: {}, typeParameters: {}", f.getType(), Arrays.asList(f.getClass().getTypeParameters()));
return null;
}
}
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
append("operators", this.operators).
append("streams", this.streams).
append("properties", this.attributes).
toString();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof LogicalPlan)) {
return false;
}
LogicalPlan that = (LogicalPlan) o;
if (attributes != null ? !attributes.equals(that.attributes) : that.attributes != null) {
return false;
}
return !(streams != null ? !streams.equals(that.streams) : that.streams != null);
}
@Override
public int hashCode()
{
int result = streams != null ? streams.hashCode() : 0;
result = 31 * result + (attributes != null ? attributes.hashCode() : 0);
return result;
}
}