blob: 471950b5a07dea5e23febf9ba572523ac1933bc9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.api;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.annotation.OperatorAnnotation;
/**
* DAG contains the logical declarations of operators and streams.
* <p>
* Operators have ports that are connected through streams. Ports can be
* mandatory or optional with respect to their need to connect a stream to it.
* Each port can be connected to a single stream only. A stream has to be
* connected to one output port and can go to multiple input ports.
* <p>
* The DAG will be serialized and deployed to the cluster, where it is translated
* into the physical plan.
*
* @since 0.3.2
*/
public interface DAG extends DAGContext, Serializable
{
interface InputPortMeta extends Serializable, PortContext
{
/**
* Return port object represented by this InputPortMeta
* @return
*/
Operator.InputPort<?> getPort();
<T extends OperatorMeta> T getOperatorMeta();
}
interface OutputPortMeta extends Serializable, PortContext
{
OperatorMeta getUnifierMeta();
/**
* Return port object represented by this OutputPortMeta
* @return
*/
Operator.OutputPort<?> getPort();
<T extends OperatorMeta> T getOperatorMeta();
}
/**
* Locality setting affects how operators are scheduled for deployment by
* the platform. The setting serves as hint to the planner and can yield
* significant performance gains. Optimizations are subject to resource
* availability.
*/
enum Locality
{
/**
* Adjacent operators should be deployed into the same executing thread,
* effectively serializing the computation. This setting is beneficial
* where the cost of intermediate queuing exceeds the benefit of parallel
* processing. An example could be chaining of multiple operators with low
* compute requirements in a parallel partition setup.
*/
THREAD_LOCAL,
/**
* Adjacent operators should be deployed into the same process, executing
* in different threads. Useful when interprocess communication is a
* limiting factor and sufficient resources can be provisioned in a single
* container. Eliminates data serialization and networking stack overhead.
*/
CONTAINER_LOCAL,
/**
* Adjacent operators should be deployed into processes on the same machine.
* Eliminates network as bottleneck, as the loop back interface can be used
* instead.
*/
NODE_LOCAL,
/**
* Adjacent operators should be deployed into processes on nodes in the same
* rack. Best effort to not have allocation on same node.
* Not implemented yet.
*/
RACK_LOCAL
}
/**
* Representation of streams in the logical layer. Instances are created through {@link DAG#addStream}.
*/
interface StreamMeta extends Serializable
{
String getName();
/**
* Returns the locality for this stream.
* @return locality for this stream, default is null.
*/
Locality getLocality();
/**
* Set locality for the stream. The setting is best-effort, engine can
* override due to other settings or constraints.
*
* @param locality
* @return Object that describes the meta for the stream.
*/
StreamMeta setLocality(Locality locality);
StreamMeta setSource(Operator.OutputPort<?> port);
StreamMeta addSink(Operator.InputPort<?> port);
/**
* Persist entire stream using operator passed.
*
* @param Persist Operator name
* @param Operator to use for persisting
* @param Input port to use for persisting
* @return Object that describes the meta for the stream.
*/
StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort);
/**
* Set locality for the stream. The setting is best-effort, engine can
* override due to other settings or constraints.
*
* @param Persist Operator name
* @param Operator to use for persisting
* @return Object that describes the meta for the stream.
*/
StreamMeta persistUsing(String name, Operator persistOperator);
/**
* Set locality for the stream. The setting is best-effort, engine can
* override due to other settings or constraints.
*
* @param Persist Operator name
* @param Operator to use for persisting
* @param Input port to use for persisting
* @param Sink to persist
* @return Object that describes the meta for the stream.
*/
StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?> sinkToPersist);
/**
* Return source of the stream.
* @param <T>
* @return
*/
<T extends OutputPortMeta> T getSource();
/**
* Return all sinks connected to this stream.
* @param <T>
* @return
*/
<T extends InputPortMeta> Collection<T> getSinks();
}
/**
* Operator meta object.
*/
interface OperatorMeta extends Serializable, Context
{
String getName();
Operator getOperator();
InputPortMeta getMeta(Operator.InputPort<?> port);
OutputPortMeta getMeta(Operator.OutputPort<?> port);
OperatorAnnotation getOperatorAnnotation();
/**
* Return collection of stream which are connected to this operator's
* input ports.
* @param <T>
* @return
*/
<K extends InputPortMeta, V extends StreamMeta> Map<K, V> getInputStreams();
/**
* Return collection of stream which are connected to this operator's
* output ports.
* @param <T>
* @return
*/
<K extends OutputPortMeta, V extends StreamMeta> Map<K, V> getOutputStreams();
}
/**
* Add new instance of operator under given name to the DAG.
* The operator class must have a default constructor.
* If the class extends {@link BaseOperator}, the name is passed on to the instance.
* Throws exception if the name is already linked to another operator instance.
*
* @param <T> Concrete type of the operator
* @param name Logical name of the operator used to identify the operator in the DAG
* @param clazz Concrete class with default constructor so that instance of it can be initialized and added to the DAG.
* @return Instance of the operator that has been added to the DAG.
*/
<T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz);
/**
* <p>addOperator.</p>
* @param <T> Concrete type of the operator
* @param name Logical name of the operator used to identify the operator in the DAG
* @param operator Instance of the operator that needs to be added to the DAG
* @return Instance of the operator that has been added to the DAG.
*/
<T extends Operator> T addOperator(@Nonnull String name, T operator);
@InterfaceStability.Evolving
<T extends Module> T addModule(@Nonnull String name, Class<T> moduleClass);
@InterfaceStability.Evolving
<T extends Module> T addModule(@Nonnull String name, T module);
/**
* <p>addStream.</p>
* @param id Identifier of the stream that will be used to identify stream in DAG
* @return
*/
StreamMeta addStream(@Nonnull String id);
/**
* Add identified stream for given source and sinks. Multiple sinks can be
* connected to a stream, but each port can only be connected to a single
* stream. Attempt to add stream to an already connected port will throw an
* error.
* <p>
* This method allows to connect all interested ports to a stream at
* once. Alternatively, use the returned {@link StreamMeta} builder object to
* add more sinks and set other stream properties.
*
* @param <T>
* @param id
* @param source
* @param sinks
* @return StreamMeta
*/
<T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
/**
* Overload varargs version to avoid generic array type safety warnings in calling code.
* "Type safety: A generic array of Operator.InputPort<> is created for a varargs parameter"
*
* @param <T>
* @link <a href=http://www.angelikalanger.com/GenericsFAQ/FAQSections/ProgrammingIdioms.html#FAQ300>Programming Idioms</a>
* @param id
* @param source
* @param sink1
* @return StreamMeta
*/
<T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
/**
* <p>addStream.</p>
*/
<T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
/**
* <p>setAttribute.</p>
*/
<T> void setAttribute(Attribute<T> key, T value);
/**
* @Deprecated
* Use {@link #setOperatorAttribute} instead
*/
@Deprecated
<T> void setAttribute(Operator operator, Attribute<T> key, T value);
/**
* Set an attribute for an operator.
* @param <T> Value type of the attribute.
* @param operator The Operator for which the attribute is being set.
* @param key The attribute which needs to be tuned.
* @param value The new value of the attribute.
*/
<T> void setOperatorAttribute(Operator operator, Attribute<T> key, T value);
/**
* <p>setOutputPortAttribute.</p>
*/
<T> void setOutputPortAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value);
/**
* Set an attribute on the unifier for an output of an operator.
* @param <T> Object type of the attribute
* @param port The port for which the unifier is needed.
* @param key The attribute which needs to be tuned.
* @param value The new value of the attribute.
*/
<T> void setUnifierAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value);
/**
* <p>setInputPortAttribute.</p>
*/
<T> void setInputPortAttribute(Operator.InputPort<?> port, Attribute<T> key, T value);
/**
* <p>getOperatorMeta.</p>
*/
OperatorMeta getOperatorMeta(String operatorId);
/**
* <p>getMeta.</p>
*/
OperatorMeta getMeta(Operator operator);
/**
* Return all operators present in the DAG.
* @param <T>
* @return
*/
<T extends OperatorMeta> Collection<T> getAllOperatorsMeta();
/**
* Get all input operators in the DAG. This method returns operators which are
* not connected to any upstream operator. i.e the operators which do not have
* any input ports or operators which is not connected through any input ports
* in the DAG.
*
* @param <T>
* @return list of {@see OperatorMeta} for root operators in the DAG.
*/
<T extends OperatorMeta> Collection<T> getRootOperatorsMeta();
/**
* Returns all Streams present in the DAG.
* @param <T>
* @return
*/
<T extends StreamMeta> Collection<T> getAllStreamsMeta();
/**
* Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG
* needs to implement this interface.
*/
interface GenericOperator
{
}
}