| /** |
| * Copyright (C) 2015 DataTorrent, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.api; |
| |
| import java.io.Serializable; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.datatorrent.api.Attribute.AttributeMap; |
| import com.datatorrent.api.Operator.ProcessingMode; |
| import com.datatorrent.api.StringCodec.*; |
| import com.datatorrent.api.annotation.Stateless; |
| |
| /** |
| * The base interface for context for all of the streaming platform objects<p> |
| * <br> |
| * |
| * @since 0.3.2 |
| */ |
| public interface Context |
| { |
| /** |
| * Get the attributes associated with this context. |
| * The returned map does not contain any attributes that may have been defined in the parent context of this context. |
| * |
| * @return attributes defined for the current context. |
| */ |
| public AttributeMap getAttributes(); |
| |
| /** |
| * Get the value of the attribute associated with the current key by recursively traversing the contexts upwards to |
| * the application level. If the attribute is not found, then return the defaultValue. |
| * |
| * @param <T> - Type of the value stored against the attribute |
| * @param key - Attribute to identify the attribute. |
| * @return The value for the attribute if found or the defaultValue passed in as argument. |
| */ |
| public <T> T getValue(Attribute<T> key); |
| |
| /** |
| * Custom stats provided by the operator implementation. Reported as part of operator stats in the context of the |
| * current window, reset at window boundary. |
| * |
| * @param counters |
| * @deprecated use {@link AutoMetric} |
| */ |
| @Deprecated |
| void setCounters(Object counters); |
| |
| /** |
| * This can be used to report only a subset of metrics in the context of the current application window. It |
| * will reset at the application window boundary. |
| * |
| * @param metricNames the name of all the metrics that will be reported to application master. |
| */ |
| void sendMetrics(Collection<String> metricNames); |
| |
| /** |
| * Aggregates counters of physical instances. |
| * |
| * @deprecated use {@link AutoMetric.Aggregator} |
| */ |
| @Deprecated |
| interface CountersAggregator |
| { |
| Object aggregate(Collection<?> countersList); |
| } |
| |
| /** |
| * The interface to control the container JVM Opts based on the operator(s) configuration |
| */ |
| public interface ContainerOptConfigurator extends Serializable |
| { |
| /** |
| * Get the container JVM opts based on the operator(s) configuration. |
| * @param operatorMetaList The list of operators that are assigned to the container |
| * @return The JVM options for the container |
| */ |
| String getJVMOptions(List<DAG.OperatorMeta> operatorMetaList); |
| } |
| |
| public interface PortContext extends Context |
| { |
| /** |
| * Number of tuples the poll buffer can cache without blocking the input stream to the port. |
| */ |
| Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024); |
| /** |
| * The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container. |
| * Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may |
| * not be adhered to. |
| */ |
| Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64); |
| /** |
| * Poll period in milliseconds when the port buffer reaches its limits. |
| */ |
| Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10); |
| /** |
| * Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge. |
| * Can be used to form parallel partitions that span a group of operators. |
| * Defined on input port to allow for stream to be shared with non-partitioned sinks. |
| * If multiple ports of an operator have the setting, incoming streams must track back to |
| * a common root partition, i.e. the operator join forks of the same origin. |
| */ |
| Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false); |
| /** |
| * Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the |
| * number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions |
| * with the limit set to 2 will result in 3 unifiers arranged in 2 levels. The setting can be used to cap the |
| * network I/O or other resource requirement for each unifier container (depends on the specific functionality of |
| * the unifier), enabling horizontal scale by overcoming the single unifier bottleneck. |
| */ |
| Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE); |
| |
| /** |
| * Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning |
| * case there is a need to unify all the outputs of the M stage into a single unifier before sending the results to |
| * the N stage. The attribute can be specified either on the output port or the input port, the output port being |
| * the usual. The specification on the input port overrides that specified on the output port. This is useful in |
| * cases when an output port is connected to multiple input ports and different unifier behavior is desired for |
| * the inputs. In this case the default unifier behavior can be specified on the output port and individual |
| * exceptions can be specified on the corresponding input ports. |
| */ |
| Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE); |
| /** |
| * Whether or not to auto record the tuples |
| */ |
| Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false); |
| /** |
| * Whether the output is unified. |
| * This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified. |
| */ |
| Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false); |
| /** |
| * Provide the codec which can be used to serialize or deserialize the data |
| * that can be received on the port. If it is unspecified the engine may use |
| * a generic codec. |
| */ |
| Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<StreamCodec<?>>(new Object2String<StreamCodec<?>>()); |
| |
| /** |
| * Provides the tuple class which the port receives or emits. While this attribute is null by default, |
| * whether it is needed or not is controlled through the port annotation. |
| */ |
| Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>()); |
| |
| @SuppressWarnings("FieldNameHidesFieldInSuperclass") |
| long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class); |
| } |
| |
| public interface OperatorContext extends Context |
| { |
| /** |
| * The windowId at which the operator's current run got activated. |
| * When the operator is deployed the first time during it's activation, this value is the default value |
| * of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state |
| * is recovered. |
| */ |
| Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID); |
| /** |
| * Poll period in milliseconds when there are no tuples available on any of the input ports of the operator. |
| * Default value is 10 milliseconds. |
| */ |
| Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10); |
| /** |
| * The maximum number of attempts to restart a failing operator before shutting down the application. |
| * Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the |
| * attempts are exhausted, the application is shutdown. The default value for this attribute is null or unset and |
| * is equivalent to infinity; The operator hence will be attempted to be recovered indefinitely unless this value |
| * is set to anything else. |
| */ |
| Attribute<Integer> RECOVERY_ATTEMPTS = new Attribute<Integer>(new Integer2String()); |
| /** |
| * Specify a listener to process and optionally react to operator status updates. |
| * The handler will be called for each physical operator as statistics are updated during heartbeat processing. |
| */ |
| Attribute<Collection<StatsListener>> STATS_LISTENERS = new Attribute<Collection<StatsListener>>(new Collection2String<StatsListener>(",", new Object2String<StatsListener>(":"))); |
| /** |
| * Conveys whether the Operator is stateful or stateless. If the operator is stateless, no checkpointing is required |
| * by the engine. The attribute is ignored when the operator was already declared stateless through the |
| * {@link Stateless} annotation. |
| */ |
| Attribute<Boolean> STATELESS = new Attribute<Boolean>(false); |
| /** |
| * Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers. |
| */ |
| Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024); |
| /** |
| * CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers. |
| */ |
| Attribute<Integer> VCORES = new Attribute<Integer>(0); |
| |
| /** |
| * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here. |
| */ |
| Attribute<String> JVM_OPTIONS = new Attribute<String>(new String2String()); |
| /** |
| * Attribute of the operator that tells the platform how many streaming windows make 1 application window. |
| */ |
| Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1); |
| /** |
| * When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is |
| * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}. |
| * The value should range between (0 - {@link #APPLICATION_WINDOW_COUNT}) |
| */ |
| Attribute<Integer> SLIDE_BY_WINDOW_COUNT = new Attribute<Integer>(new Integer2String()); |
| |
| /** |
| * Attribute of the operator that hints at the optimal checkpoint boundary. |
| * By default checkpointing happens after every predetermined streaming windows. Application developer can override |
| * this behavior by defining the following attribute. When this attribute is defined, checkpointing will be done after |
| * completion of later of regular checkpointing window and the window whose serial number is divisible by the attribute |
| * value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing |
| * will be done at application window boundary. |
| */ |
| Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1); |
| /** |
| * Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity). |
| * For example, the user may wish to specify a locality constraint for an input operator relative to its data source. |
| * The attribute can then be set to the host name that is specified in the operator specific connect string property. |
| */ |
| Attribute<String> LOCALITY_HOST = new Attribute<String>(new String2String()); |
| /** |
| * Name of rack to directly control locality of an operator. Complementary to stream locality (RACK_LOCAL affinity). |
| */ |
| Attribute<String> LOCALITY_RACK = new Attribute<String>(new String2String()); |
| /** |
| * The agent which can be used to checkpoint the windows. |
| */ |
| Attribute<StorageAgent> STORAGE_AGENT = new Attribute<StorageAgent>(new Object2String<StorageAgent>()); |
| /** |
| * The payload processing mode for this operator - at most once, exactly once, or default at least once. |
| * If the processing mode for an operator is specified as AT_MOST_ONCE and no processing mode is specified for the downstream |
| * operators if any, the processing mode of the downstream operators is automatically set to AT_MOST_ONCE. If a different processing |
| * mode is specified for the downstream operators it will result in an error. |
| * If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators |
| * should be specified as AT_MOST_ONCE otherwise it will result in an error. |
| */ |
| Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE); |
| /** |
| * Timeout to identify stalled processing, specified as count of streaming windows. If the last processed |
| * window does not advance within the specified timeout count, the operator will be considered stuck and the |
| * container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue, |
| * blocking operator logic, etc. |
| */ |
| Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120); |
| /** |
| * Whether or not to auto record the tuples |
| */ |
| Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false); |
| /** |
| * How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute. |
| * If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the |
| * state of the operator and the inputs. If this attribute is set to null then default partitioning is used. |
| * If the attribute is not set and the operator implements Partitioner interface, then the instance of the operator |
| * is used otherwise default default partitioning is used. |
| */ |
| Attribute<Partitioner<? extends Operator>> PARTITIONER = new Attribute<Partitioner<? extends Operator>>(new Object2String<Partitioner<? extends Operator>>()); |
| |
| /** |
| * Aggregates physical counters to a logical counter. |
| * @deprecated use {@link #METRICS_AGGREGATOR} |
| */ |
| @Deprecated |
| Attribute<CountersAggregator> COUNTERS_AGGREGATOR = new Attribute<CountersAggregator>(new Object2String<CountersAggregator>()); |
| |
| /** |
| * Aggregates metrics of physical instances of an operator. This handler is called with the metrics data of a |
| * particular window from all the physical instances so that it can be aggregated into a logical view. |
| */ |
| Attribute<AutoMetric.Aggregator> METRICS_AGGREGATOR = new Attribute<AutoMetric.Aggregator>(new Object2String<AutoMetric.Aggregator>()); |
| |
| /** |
| * Provides dimension aggregations and time buckets information for logical metrics. The information provided |
| * by this construct is conveyed to tracker application and influences the aggregations done on it by the tracker. |
| */ |
| Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<AutoMetric.DimensionsScheme>(new |
| Object2String<AutoMetric.DimensionsScheme>()); |
| |
| /** |
| * Return the operator runtime id. |
| * |
| * @return The id |
| */ |
| int getId(); |
| |
| @SuppressWarnings("FieldNameHidesFieldInSuperclass") |
| long serialVersionUID = AttributeMap.AttributeInitializer.initialize(OperatorContext.class); |
| } |
| |
| /** |
| * <p> |
| * DAGContext interface.</p> |
| * |
| * @since 0.3.2 |
| */ |
| interface DAGContext extends Context |
| { |
| /** |
| * Name under which the application will be shown in the resource manager. |
| * If not set, the default is the configuration Java class or property file name. |
| */ |
| Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name"); |
| /** |
| * URL to the application's documentation. |
| */ |
| Attribute<String> APPLICATION_DOC_LINK = new Attribute<String>(new String2String()); |
| |
| /** |
| * URL to the application's app data, if any. If not set, an empty string is the default. |
| * <p> |
| * Please note that if the string <code>"{appId}"</code> is present in this atttribute value, the |
| * DataTorrent UI Console will replace it with the full application ID. For example, if it is set |
| * to <code>"http://mynetwork.net/my/appdata/dashboard?appId={appId}"</code>, it will be converted to |
| * <code>"http://mynetwork.net/my/appdata/dashboard?appId=application_1355713111917_0002"</code>. |
| * </p> |
| */ |
| Attribute<String> APPLICATION_DATA_LINK = new Attribute<String>(new String2String()); |
| /** |
| * Transport to push the stats and the metrics, "builtin:{topic}" if STRAM should push the data directly |
| * using websocket with the given topic |
| */ |
| Attribute<String> METRICS_TRANSPORT = new Attribute<String>(new String2String()); |
| /** |
| * Application instance identifier. An application with the same name can run in multiple instances, each with a |
| * unique identifier. The identifier is set by the client that submits the application and can be used in operators |
| * along with the operator ID to segregate output etc. |
| * <p> |
| * When running in distributed mode, the value is the YARN application id as shown in the resource manager (example: |
| * <code>application_1355713111917_0002</code>). Note that only the full id string uniquely identifies an application, |
| * the integer offset will reset on RM restart. |
| */ |
| Attribute<String> APPLICATION_ID = new Attribute<String>(new String2String()); |
| /** |
| * Application package source. If the application is launched using an app package, this attribute contains the |
| * information of the app package. It is in the format of {user}|{appPackageName}|{appPackageVersion} |
| */ |
| Attribute<String> APP_PACKAGE_SOURCE = new Attribute<String>(new String2String()); |
| /** |
| * Dump extra debug information in launcher, master and containers. |
| */ |
| Attribute<Boolean> DEBUG = new Attribute<Boolean>(false); |
| /** |
| * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here. |
| */ |
| Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<String>(new String2String()); |
| /** |
| * The amount of memory to be requested for the application master. Not used in local mode. |
| * Default value is 1GB. |
| */ |
| Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024); |
| /** |
| * Where to spool the data once the buffer server capacity is reached. |
| */ |
| Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true); |
| /** |
| * The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms. |
| */ |
| Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500); |
| /** |
| * The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator |
| * state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows. |
| */ |
| Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60); |
| /** |
| * The path to store application dependencies, recording and other generated files for application master and containers. |
| */ |
| Attribute<String> APPLICATION_PATH = new Attribute<String>(new String2String()); |
| /** |
| * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored |
| * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k. |
| */ |
| Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024); |
| /** |
| * The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored |
| * in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file |
| * is created and the tuples start getting stored in the new file. Default value is 30hrs. |
| */ |
| Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000); |
| /** |
| * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration. |
| */ |
| Attribute<String> GATEWAY_CONNECT_ADDRESS = new Attribute<String>(new String2String()); |
| /** |
| * Whether or not gateway is expecting SSL connection. |
| */ |
| Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false); |
| /** |
| * The username for logging in to the gateway, if authentication is enabled. |
| */ |
| Attribute<String> GATEWAY_USER_NAME = new Attribute<String>(new String2String()); |
| /** |
| * The password for logging in to the gateway, if authentication is enabled. |
| */ |
| Attribute<String> GATEWAY_PASSWORD = new Attribute<String>(new String2String()); |
| /** |
| * Maximum number of simultaneous heartbeat connections to process. Default value is 30. |
| */ |
| Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30); |
| /** |
| * How frequently should operators heartbeat to stram. Recommended setting is |
| * 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms. |
| */ |
| Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000); |
| /** |
| * Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart. |
| * Default value is 30s. |
| */ |
| Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000); |
| /** |
| * Timeout for allocating container resources. Default value is 60s. |
| */ |
| Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE); |
| /** |
| * Maximum number of windows that can be pending for statistics calculation. Statistics are computed when |
| * the metrics are available from all operators for a window. If the information is not available from all operators then |
| * the window is pending. When the number of pending windows reaches this limit the information for the oldest window |
| * is purged. Default value is 1000 windows. |
| */ |
| Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000); |
| /** |
| * Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false. |
| */ |
| Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false); |
| /** |
| * The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or |
| * equal to the throughput calculation interval. The default value is 10s. |
| */ |
| Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000); |
| /** |
| * The maximum number of samples to use when calculating throughput. In practice fewer samples may be used |
| * if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples. |
| */ |
| Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000); |
| /** |
| * The number of samples to use when using RPC latency to compensate for clock skews and network latency when |
| * calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100 |
| * samples. |
| */ |
| Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100); |
| /** |
| * The agent which can be used to find the jvm options for the container. |
| */ |
| Attribute<ContainerOptConfigurator> CONTAINER_OPTS_CONFIGURATOR = new Attribute<ContainerOptConfigurator>(new Object2String<ContainerOptConfigurator>()); |
| /** |
| * The string codec map for classes that are to be set or get through properties as strings. |
| * Only supports string codecs that have a constructor with no arguments |
| */ |
| Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>> STRING_CODECS = new Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>(new Map2String<Class<?>, Class<? extends StringCodec<?>>>(",", "=", new Class2String<Object>(), new Class2String<StringCodec<?>>())); |
| @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass") |
| long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class); |
| } |
| |
| long serialVersionUID = AttributeMap.AttributeInitializer.initialize(Context.class); |
| } |