Merge branch 'APEXCORE-686' of github.com:vrozov/apex-core
diff --git a/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml b/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml
index 876c39a..34679b6 100644
--- a/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml
+++ b/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml
@@ -2,22 +2,22 @@
 <configuration>
   <!-- 
   <property>
-    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <name>apex.application.{appName}.operator.{opName}.prop.{propName}</name>
     <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
   </property>
   -->
   <!-- memory assigned to app master
   <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <name>apex.attr.MASTER_MEMORY_MB</name>
     <value>1024</value>
   </property>
   -->
   <property>
-    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+    <name>apex.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
     <value>1000</value>
   </property>
   <property>
-    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+    <name>apex.application.MyFirstApplication.operator.console.prop.stringFormat</name>
     <value>hello world: %s</value>
   </property>
 </configuration>
diff --git a/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml b/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml
index ccb2b66..7ceba7c 100644
--- a/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml
+++ b/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml
@@ -1,11 +1,11 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?>
 <configuration>
   <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <name>apex.attr.MASTER_MEMORY_MB</name>
     <value>1024</value>
   </property>
   <property>
-    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+    <name>apex.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
     <value>1000</value>
   </property>
 </configuration>
diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml b/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml
index 9044325..0ee7dc2 100644
--- a/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml
+++ b/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml
@@ -2,24 +2,24 @@
 <configuration>
   <!-- 
   <property>
-    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <name>apex.application.{appName}.operator.{opName}.prop.{propName}</name>
     <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
   </property>
   -->
   <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <name>apex.attr.MASTER_MEMORY_MB</name>
     <value>1024</value>
   </property>
   <property>
-    <name>dt.application.MyFirstApplication.operator.seedGen.prop.seedStart</name>
+    <name>apex.application.MyFirstApplication.operator.seedGen.prop.seedStart</name>
     <value>1</value>
   </property>
   <property>
-    <name>dt.application.MyFirstApplication.operator.seedGen.prop.seedEnd</name>
+    <name>apex.application.MyFirstApplication.operator.seedGen.prop.seedEnd</name>
     <value>10</value>
   </property>
   <property>
-    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+    <name>apex.application.MyFirstApplication.operator.console.prop.stringFormat</name>
     <value>hello world: %s</value>
   </property>
 </configuration>
diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java
index 5e10ccd..304c086 100644
--- a/api/src/main/java/com/datatorrent/api/AffinityRule.java
+++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java
@@ -92,7 +92,7 @@
   public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators)
   {
     this(type, locality, relaxLocality);
-    LinkedList<String> operators = new LinkedList<String>();
+    LinkedList<String> operators = new LinkedList<>();
     if (firstOperator != null && otherOperators.length >= 1) {
       operators.add(firstOperator);
 
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 2efc84f..a3b2f97 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -107,7 +107,7 @@
    *
    * @since 0.3.2
    */
-  public static interface AttributeMap extends Cloneable
+  public interface AttributeMap extends Cloneable
   {
     /**
      * Return the attribute value for the given key. If the map does not have an
@@ -150,7 +150,7 @@
     /**
      * DefaultAttributeMap is the default implementation of AttributeMap. It's backed by a map internally.
      */
-    public static class DefaultAttributeMap implements AttributeMap, Serializable
+    class DefaultAttributeMap implements AttributeMap, Serializable
     {
       private HashMap<Attribute<?>, Object> map;
 
@@ -234,13 +234,13 @@
      *
      * Engine uses it internally to initialize the Interfaces that may have Attributes defined in them.
      */
-    public static class AttributeInitializer
+    class AttributeInitializer
     {
-      static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<Class<?>, Set<Attribute<Object>>>();
+      static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>();
 
       public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz)
       {
-        Map<Attribute<Object>, Object> result = new HashMap<Attribute<Object>, Object>();
+        Map<Attribute<Object>, Object> result = new HashMap<>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
             if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
@@ -273,7 +273,7 @@
         if (map.containsKey(clazz)) {
           return 0;
         }
-        Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
+        Set<Attribute<Object>> set = new HashSet<>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
             if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
diff --git a/api/src/main/java/com/datatorrent/api/AutoMetric.java b/api/src/main/java/com/datatorrent/api/AutoMetric.java
index b487e04..8369b87 100644
--- a/api/src/main/java/com/datatorrent/api/AutoMetric.java
+++ b/api/src/main/java/com/datatorrent/api/AutoMetric.java
@@ -42,7 +42,7 @@
   /**
    * Represents collection of physical metrics.
    */
-  public static interface PhysicalMetricsContext
+  interface PhysicalMetricsContext
   {
     /**
      * @return map of metric name to value
@@ -60,7 +60,7 @@
    * An aggregator is provided as operator attribute. By default, when there isn't any aggregator set explicitly,
    * the application master sums up all the number metrics.
    */
-  public static interface Aggregator
+  interface Aggregator
   {
     /**
      * Aggregates values of a specific metric.
@@ -77,7 +77,7 @@
    * Application data tracker by default does certain aggregations for 1m, 1h,& 1d time buckets unless it overridden by
    * the app developer by providing a dimension scheme as operator attribute.
    */
-  public static interface DimensionsScheme
+  interface DimensionsScheme
   {
     /**
      * Time buckets for eg. {1m, 1h}. Application data tracker by default does 1m, 1h & 1d aggregations but this
diff --git a/api/src/main/java/com/datatorrent/api/Component.java b/api/src/main/java/com/datatorrent/api/Component.java
index 05c1a30..e98ade4 100644
--- a/api/src/main/java/com/datatorrent/api/Component.java
+++ b/api/src/main/java/com/datatorrent/api/Component.java
@@ -32,13 +32,13 @@
    * It's recommended to use this separator to create scoped names for the components.
    * e.g. Port p on Operator o can be identified as o.concat(CONCAT_SEPARATOR).concat(p).
    */
-  public static final String CONCAT_SEPARATOR = ".";
+  String CONCAT_SEPARATOR = ".";
   /**
    * It's recommended to use this separator to split the scoped names into individual components.
    * e.g. o.concat(CONCAT_SEPARATOR).concat(p).split(SPLIT_SEPARATOR) will return String[]{o, p}.
    *
    */
-  public static final String SPLIT_SEPARATOR = "\\.";
+  String SPLIT_SEPARATOR = "\\.";
 
   /**
    * Callback to give the component a chance to perform tasks required as part of setting itself up.
@@ -47,13 +47,13 @@
    *
    * @param context - context in which the operator executues.
    */
-  public void setup(CONTEXT context);
+  void setup(CONTEXT context);
 
   /**
    * Callback to give the component a chance to perform tasks required as part of tearing itself down.
    * A recommended practice is to reciprocate the tasks in setup by doing exactly opposite.
    */
-  public void teardown();
+  void teardown();
 
   /**
    * A utility class to club component along with the entity such as context or configuration.
@@ -65,7 +65,7 @@
    * @param <COMPLEMENT>
    * @since 0.3.2
    */
-  public abstract static class ComponentComplementPair<COMPONENT extends Component<?>, COMPLEMENT>
+  abstract class ComponentComplementPair<COMPONENT extends Component<?>, COMPLEMENT>
   {
     public final COMPONENT component;
 
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 3d3cffe..eb241d6 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -48,7 +48,7 @@
    *
    * @return attributes defined for the current context.
    */
-  public AttributeMap getAttributes();
+  AttributeMap getAttributes();
 
   /**
    * Get the value of the attribute associated with the current key by recursively traversing the contexts upwards to
@@ -58,7 +58,7 @@
    * @param key - Attribute to identify the attribute.
    * @return The value for the attribute if found or the defaultValue passed in as argument.
    */
-  public <T> T getValue(Attribute<T> key);
+  <T> T getValue(Attribute<T> key);
 
   /**
    * Custom stats provided by the operator implementation. Reported as part of operator stats in the context of the
@@ -92,7 +92,7 @@
   /**
    * The interface to control the container JVM Opts based on the operator(s) configuration
    */
-  public interface ContainerOptConfigurator extends Serializable
+  interface ContainerOptConfigurator extends Serializable
   {
     /**
      * Get the container JVM opts based on the operator(s) configuration.
@@ -116,22 +116,22 @@
     ENABLE, FOLLOW_HADOOP_AUTH, FOLLOW_HADOOP_HTTP_AUTH, DISABLE
   }
 
-  public interface PortContext extends Context
+  interface PortContext extends Context
   {
     /**
      * Number of tuples the poll buffer can cache without blocking the input stream to the port.
      */
-    Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024);
+    Attribute<Integer> QUEUE_CAPACITY = new Attribute<>(1024);
     /**
      * The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container.
      * Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may
      * not be adhered to.
      */
-    Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64);
+    Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<>(8 * 64);
     /**
      * Poll period in milliseconds when the port buffer reaches its limits.
      */
-    Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+    Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
     /**
      * Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge.
      * Can be used to form parallel partitions that span a group of operators.
@@ -139,7 +139,7 @@
      * If multiple ports of an operator have the setting, incoming streams must track back to
      * a common root partition, i.e. the operator join forks of the same origin.
      */
-    Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false);
+    Attribute<Boolean> PARTITION_PARALLEL = new Attribute<>(false);
     /**
      * Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the
      * number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions
@@ -147,7 +147,7 @@
      * network I/O or other resource requirement for each unifier container (depends on the specific functionality of
      * the unifier), enabling horizontal scale by overcoming the single unifier bottleneck.
      */
-    Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE);
 
     /**
      * Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning
@@ -158,16 +158,16 @@
      * the inputs. In this case the default unifier behavior can be specified on the output port and individual
      * exceptions can be specified on the corresponding input ports.
      */
-    Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE);
+    Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE);
     /**
      * Whether or not to auto record the tuples
      */
-    Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+    Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
     /**
      * Whether the output is unified.
      * This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified.
      */
-    Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false);
+    Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<>(false);
     /**
      * Provide the codec which can be used to serialize or deserialize the data
      * that can be received on the port. If it is unspecified the engine may use
@@ -185,7 +185,7 @@
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
   }
 
-  public interface OperatorContext extends Context
+  interface OperatorContext extends Context
   {
     /**
      * The windowId at which the operator's current run got activated.
@@ -193,13 +193,13 @@
      * of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state
      * is recovered.
      */
-    Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID);
+    Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID);
     /**
      * It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the
      * operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds.
      * Default value is 10 milliseconds.
      */
-    Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+    Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
     /**
      * The maximum number of attempts to restart a failing operator before shutting down the application.
      * Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the
@@ -218,15 +218,15 @@
      * by the engine. The attribute is ignored when the operator was already declared stateless through the
      * {@link Stateless} annotation.
      */
-    Attribute<Boolean> STATELESS = new Attribute<Boolean>(false);
+    Attribute<Boolean> STATELESS = new Attribute<>(false);
     /**
      * Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers.
      */
-    Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024);
+    Attribute<Integer> MEMORY_MB = new Attribute<>(1024);
     /**
      * CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers.
      */
-    Attribute<Integer> VCORES = new Attribute<Integer>(0);
+    Attribute<Integer> VCORES = new Attribute<>(0);
 
     /**
      * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here.
@@ -235,7 +235,7 @@
     /**
      * Attribute of the operator that tells the platform how many streaming windows make 1 application window.
      */
-    Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1);
+    Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<>(1);
     /**
      * When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is
      * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}.
@@ -251,7 +251,7 @@
      * value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing
      * will be done at application window boundary.
      */
-    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1);
+    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(1);
     /**
      * Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity).
      * For example, the user may wish to specify a locality constraint for an input operator relative to its data source.
@@ -274,18 +274,18 @@
      * If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators
      * should be specified as AT_MOST_ONCE otherwise it will result in an error.
      */
-    Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE);
+    Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE);
     /**
      * Timeout to identify stalled processing, specified as count of streaming windows. If the last processed
      * window does not advance within the specified timeout count, the operator will be considered stuck and the
      * container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue,
      * blocking operator logic, etc.
      */
-    Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120);
+    Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<>(120);
     /**
      * Whether or not to auto record the tuples
      */
-    Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+    Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
     /**
      * How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute.
      * If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the
@@ -348,7 +348,7 @@
      * Name under which the application will be shown in the resource manager.
      * If not set, the default is the configuration Java class or property file name.
      */
-    Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name");
+    Attribute<String> APPLICATION_NAME = new Attribute<>("unknown-application-name");
     /**
      * URL to the application's documentation.
      */
@@ -387,7 +387,7 @@
     /**
      * Dump extra debug information in launcher, master and containers.
      */
-    Attribute<Boolean> DEBUG = new Attribute<Boolean>(false);
+    Attribute<Boolean> DEBUG = new Attribute<>(false);
     /**
      * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here.
      */
@@ -396,20 +396,20 @@
      * The amount of memory to be requested for the application master. Not used in local mode.
      * Default value is 1GB.
      */
-    Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024);
+    Attribute<Integer> MASTER_MEMORY_MB = new Attribute<>(1024);
     /**
      * Where to spool the data once the buffer server capacity is reached.
      */
-    Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true);
+    Attribute<Boolean> BUFFER_SPOOLING = new Attribute<>(true);
     /**
      * The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms.
      */
-    Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500);
+    Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500);
     /**
      * The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator
      * state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows.
      */
-    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60);
+    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(60);
     /**
      * The path to store application dependencies, recording and other generated files for application master and containers.
      */
@@ -418,21 +418,21 @@
      * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
      * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k.
      */
-    Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024);
+    Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024);
     /**
      * The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
      * in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file
      * is created and the tuples start getting stored in the new file. Default value is 30hrs.
      */
-    Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000);
+    Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000);
     /**
-     * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration.
+     * Address of optional web-socket pub-sub gateway to emit application events, in the form of host:port.
      */
     Attribute<String> GATEWAY_CONNECT_ADDRESS = new Attribute<>(String2String.getInstance());
     /**
      * Whether or not gateway is expecting SSL connection.
      */
-    Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false);
+    Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<>(false);
     /**
      * The username for logging in to the gateway, if authentication is enabled.
      */
@@ -448,48 +448,48 @@
     /**
      * Maximum number of simultaneous heartbeat connections to process. Default value is 30.
      */
-    Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30);
+    Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30);
     /**
      * How frequently should operators heartbeat to stram. Recommended setting is
      * 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms.
      */
-    Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000);
+    Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000);
     /**
      * Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart.
      * Default value is 30s.
      */
-    Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000);
+    Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000);
     /**
      * Timeout for allocating container resources. Default value is 60s.
      */
-    Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE);
     /**
      * Maximum number of windows that can be pending for statistics calculation. Statistics are computed when
      * the metrics are available from all operators for a window. If the information is not available from all operators then
      * the window is pending. When the number of pending windows reaches this limit the information for the oldest window
      * is purged. Default value is 1000 windows.
      */
-    Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000);
+    Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000);
     /**
      * Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false.
      */
-    Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false);
+    Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<>(false);
     /**
      * The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or
      * equal to the throughput calculation interval. The default value is 10s.
      */
-    Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000);
+    Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000);
     /**
      * The maximum number of samples to use when calculating throughput. In practice fewer samples may be used
      * if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples.
      */
-    Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000);
+    Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000);
     /**
      * The number of samples to use when using RPC latency to compensate for clock skews and network latency when
      * calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100
      * samples.
      */
-    Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100);
+    Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100);
     /**
      * The agent which can be used to find the jvm options for the container.
      */
@@ -511,12 +511,12 @@
      * blacklisting of nodes by application master
      * Blacklisting for nodes is disabled for the default value
      */
-    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE);
 
     /**
      * The amount of time to wait before removing failed nodes from blacklist
      */
-    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000));
+    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000));
 
     /**
      * Affinity rules for specifying affinity and anti-affinity between logical operators
diff --git a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
index eeb952a..40b7436 100644
--- a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
+++ b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
@@ -27,7 +27,7 @@
 @InterfaceStability.Evolving
 public interface ControlTupleEnabledSink<T> extends Sink<T>
 {
-  public static final ControlTupleEnabledSink<Object> BLACKHOLE = new ControlTupleEnabledSink<Object>()
+  ControlTupleEnabledSink<Object> BLACKHOLE = new ControlTupleEnabledSink<Object>()
   {
     @Override
     public void put(Object tuple)
@@ -52,5 +52,5 @@
    *
    * @param payload the control tuple payload
    */
-  public boolean putControl(ControlTuple payload);
+  boolean putControl(ControlTuple payload);
 }
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 532ff72..96420a3 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -47,9 +47,9 @@
      * Return port object represented by this InputPortMeta
      * @return
      */
-    public Operator.InputPort<?> getPort();
+    Operator.InputPort<?> getPort();
 
-    public <T extends OperatorMeta> T getOperatorMeta();
+    <T extends OperatorMeta> T getOperatorMeta();
   }
 
   interface OutputPortMeta extends Serializable, PortContext
@@ -60,9 +60,9 @@
      * Return port object represented by this OutputPortMeta
      * @return
      */
-    public Operator.OutputPort<?> getPort();
+    Operator.OutputPort<?> getPort();
 
-    public <T extends OperatorMeta> T getOperatorMeta();
+    <T extends OperatorMeta> T getOperatorMeta();
   }
 
   /**
@@ -105,15 +105,15 @@
   /**
    * Representation of streams in the logical layer. Instances are created through {@link DAG#addStream}.
    */
-  public interface StreamMeta extends Serializable
+  interface StreamMeta extends Serializable
   {
-    public String getName();
+    String getName();
 
     /**
      * Returns the locality for this stream.
      * @return locality for this stream, default is null.
      */
-    public Locality getLocality();
+    Locality getLocality();
 
     /**
      * Set locality for the stream. The setting is best-effort, engine can
@@ -122,11 +122,11 @@
      * @param locality
      * @return Object that describes the meta for the stream.
      */
-    public StreamMeta setLocality(Locality locality);
+    StreamMeta setLocality(Locality locality);
 
-    public StreamMeta setSource(Operator.OutputPort<?> port);
+    StreamMeta setSource(Operator.OutputPort<?> port);
 
-    public StreamMeta addSink(Operator.InputPort<?> port);
+    StreamMeta addSink(Operator.InputPort<?> port);
 
     /**
      * Persist entire stream using operator passed.
@@ -136,7 +136,7 @@
      * @param Input port to use for persisting
      * @return Object that describes the meta for the stream.
      */
-    public StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort);
+    StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort);
 
     /**
      * Set locality for the stream. The setting is best-effort, engine can
@@ -146,7 +146,7 @@
      * @param Operator to use for persisting
      * @return Object that describes the meta for the stream.
      */
-    public StreamMeta persistUsing(String name, Operator persistOperator);
+    StreamMeta persistUsing(String name, Operator persistOperator);
 
     /**
      * Set locality for the stream. The setting is best-effort, engine can
@@ -158,35 +158,35 @@
      * @param Sink to persist
      * @return Object that describes the meta for the stream.
      */
-    public StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?> sinkToPersist);
+    StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?> sinkToPersist);
 
     /**
      * Return source of the stream.
      * @param <T>
      * @return
      */
-    public <T extends OutputPortMeta> T getSource();
+    <T extends OutputPortMeta> T getSource();
 
     /**
      * Return all sinks connected to this stream.
      * @param <T>
      * @return
      */
-    public <T extends InputPortMeta> Collection<T> getSinks();
+    <T extends InputPortMeta> Collection<T> getSinks();
   }
 
   /**
    * Operator meta object.
    */
-  public interface OperatorMeta extends Serializable, Context
+  interface OperatorMeta extends Serializable, Context
   {
-    public String getName();
+    String getName();
 
-    public Operator getOperator();
+    Operator getOperator();
 
-    public InputPortMeta getMeta(Operator.InputPort<?> port);
+    InputPortMeta getMeta(Operator.InputPort<?> port);
 
-    public OutputPortMeta getMeta(Operator.OutputPort<?> port);
+    OutputPortMeta getMeta(Operator.OutputPort<?> port);
 
     /**
      * Return collection of stream which are connected to this operator's
@@ -194,7 +194,7 @@
      * @param <T>
      * @return
      */
-    public <K extends InputPortMeta, V extends StreamMeta> Map<K, V> getInputStreams();
+    <K extends InputPortMeta, V extends StreamMeta> Map<K, V> getInputStreams();
 
     /**
      * Return collection of stream which are connected to this operator's
@@ -202,7 +202,7 @@
      * @param <T>
      * @return
      */
-    public <K extends OutputPortMeta, V extends StreamMeta> Map<K, V> getOutputStreams();
+    <K extends OutputPortMeta, V extends StreamMeta> Map<K, V> getOutputStreams();
   }
 
   /**
@@ -216,7 +216,7 @@
    * @param clazz Concrete class with default constructor so that instance of it can be initialized and added to the DAG.
    * @return Instance of the operator that has been added to the DAG.
    */
-  public abstract <T extends Operator> T addOperator(String name, Class<T> clazz);
+  <T extends Operator> T addOperator(String name, Class<T> clazz);
 
   /**
    * <p>addOperator.</p>
@@ -225,7 +225,7 @@
    * @param operator Instance of the operator that needs to be added to the DAG
    * @return Instance of the operator that has been added to the DAG.
    */
-  public abstract <T extends Operator> T addOperator(String name, T operator);
+  <T extends Operator> T addOperator(String name, T operator);
 
   @InterfaceStability.Evolving
   <T extends Module> T addModule(String name, Class<T> moduleClass);
@@ -238,7 +238,7 @@
    * @param id Identifier of the stream that will be used to identify stream in DAG
    * @return
    */
-  public abstract StreamMeta addStream(String id);
+  StreamMeta addStream(String id);
 
   /**
    * Add identified stream for given source and sinks. Multiple sinks can be
@@ -256,7 +256,7 @@
    * @param sinks
    * @return StreamMeta
    */
-  public abstract <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
+  <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
 
   /**
    * Overload varargs version to avoid generic array type safety warnings in calling code.
@@ -269,24 +269,24 @@
    * @param sink1
    * @return StreamMeta
    */
-  public abstract <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
+  <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
 
   /**
    * <p>addStream.</p>
    */
-  public abstract <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
+  <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
 
   /**
    * <p>setAttribute.</p>
    */
-  public abstract <T> void setAttribute(Attribute<T> key, T value);
+  <T> void setAttribute(Attribute<T> key, T value);
 
   /**
    * @Deprecated
    * Use {@link #setOperatorAttribute} instead
    */
   @Deprecated
-  public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T value);
+  <T> void setAttribute(Operator operator, Attribute<T> key, T value);
 
   /**
    * Set an attribute for an operator.
@@ -295,12 +295,12 @@
    * @param key The attribute which needs to be tuned.
    * @param value The new value of the attribute.
    */
-  public abstract <T> void setOperatorAttribute(Operator operator, Attribute<T> key, T value);
+  <T> void setOperatorAttribute(Operator operator, Attribute<T> key, T value);
 
   /**
    * <p>setOutputPortAttribute.</p>
    */
-  public abstract <T> void setOutputPortAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value);
+  <T> void setOutputPortAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value);
 
   /**
    * Set an attribute on the unifier for an output of an operator.
@@ -309,29 +309,29 @@
    * @param key The attribute which needs to be tuned.
    * @param value The new value of the attribute.
    */
-  public abstract <T> void setUnifierAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value);
+  <T> void setUnifierAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value);
 
   /**
    * <p>setInputPortAttribute.</p>
    */
-  public abstract <T> void setInputPortAttribute(Operator.InputPort<?> port, Attribute<T> key, T value);
+  <T> void setInputPortAttribute(Operator.InputPort<?> port, Attribute<T> key, T value);
 
   /**
    * <p>getOperatorMeta.</p>
    */
-  public abstract OperatorMeta getOperatorMeta(String operatorId);
+  OperatorMeta getOperatorMeta(String operatorId);
 
   /**
    * <p>getMeta.</p>
    */
-  public abstract OperatorMeta getMeta(Operator operator);
+  OperatorMeta getMeta(Operator operator);
 
   /**
    * Return all operators present in the DAG.
    * @param <T>
    * @return
    */
-  public <T extends OperatorMeta> Collection<T> getAllOperatorsMeta();
+  <T extends OperatorMeta> Collection<T> getAllOperatorsMeta();
 
   /**
    * Get all input operators in the DAG. This method returns operators which are
@@ -342,14 +342,14 @@
    * @param <T>
    * @return list of {@see OperatorMeta} for root operators in the DAG.
    */
-  public <T extends OperatorMeta> Collection<T> getRootOperatorsMeta();
+  <T extends OperatorMeta> Collection<T> getRootOperatorsMeta();
 
   /**
    * Returns all Streams present in the DAG.
    * @param <T>
    * @return
    */
-  public <T extends StreamMeta> Collection<T> getAllStreamsMeta();
+  <T extends StreamMeta> Collection<T> getAllStreamsMeta();
 
   /**
    * Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG
diff --git a/api/src/main/java/com/datatorrent/api/InputOperator.java b/api/src/main/java/com/datatorrent/api/InputOperator.java
index e6a0a65..d9847cd 100644
--- a/api/src/main/java/com/datatorrent/api/InputOperator.java
+++ b/api/src/main/java/com/datatorrent/api/InputOperator.java
@@ -40,6 +40,6 @@
    * engine will make sure to call emitTuples multiple times within a giving streaming
    * window if it can. When it cannot, it will call endWindow.
    */
-  public void emitTuples();
+  void emitTuples();
 
 }
diff --git a/api/src/main/java/com/datatorrent/api/LocalMode.java b/api/src/main/java/com/datatorrent/api/LocalMode.java
index 0387506..9669fdd 100644
--- a/api/src/main/java/com/datatorrent/api/LocalMode.java
+++ b/api/src/main/java/com/datatorrent/api/LocalMode.java
@@ -39,15 +39,15 @@
 
   public interface Controller
   {
-    public void run();
+    void run();
 
-    public void run(long runMillis);
+    void run(long runMillis);
 
-    public void runAsync();
+    void runAsync();
 
-    public void shutdown();
+    void shutdown();
 
-    public void setHeartbeatMonitoringEnabled(boolean enabled);
+    void setHeartbeatMonitoringEnabled(boolean enabled);
 
   }
 
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index c016799..dd694d0 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -199,7 +199,7 @@
    * window and then call deactivate method on it if present.
    *
    */
-  static class ShutdownException extends RuntimeException
+  class ShutdownException extends RuntimeException
   {
     private static final long serialVersionUID = 201401081529L;
 
@@ -223,20 +223,20 @@
    * @param <CONTEXT> Context for the current run during which the operator is getting de/activated.
    * @since 0.3.2
    */
-  public static interface ActivationListener<CONTEXT extends Context>
+  interface ActivationListener<CONTEXT extends Context>
   {
     /**
      * Do the operations just before the operator starts processing tasks within the windows.
      * e.g. establish a network connection.
      * @param context - the context in which the operator is executing.
      */
-    public void activate(CONTEXT context);
+    void activate(CONTEXT context);
 
     /**
      * Do the opposite of the operations the operator did during activate.
      * e.g. close the network connection.
      */
-    public void deactivate();
+    void deactivate();
 
   }
 
@@ -247,21 +247,21 @@
    * @deprecated Use {@link CheckpointNotificationListener} instead
    * @since 0.3.2
    */
-  public static interface CheckpointListener
+  interface CheckpointListener
   {
     /**
      * Inform the operator that it's checkpointed.
      *
      * @param windowId Id of the window after which the operator was checkpointed.
      */
-    public void checkpointed(long windowId);
+    void checkpointed(long windowId);
 
     /**
      * Inform the operator that a particular windowId is processed successfully by all the operators in the DAG.
      *
      * @param windowId Id of the window which is processed by each operator.
      */
-    public void committed(long windowId);
+    void committed(long windowId);
 
   }
 
@@ -280,14 +280,14 @@
    *
    * @since 0.3.2
    */
-  public static interface IdleTimeHandler
+  interface IdleTimeHandler
   {
     /**
      * Callback for operators to implement if they are interested in using the idle cycles to do auxiliary processing.
      * If this method detects that it does not have any work to do, it should block the call for a short duration
      * to prevent busy loop. handleIdleTime is called over and over until operator has tuples to process.
      */
-    public void handleIdleTime();
+    void handleIdleTime();
 
   }
 
diff --git a/api/src/main/java/com/datatorrent/api/Partitioner.java b/api/src/main/java/com/datatorrent/api/Partitioner.java
index d743821..42b6dbb 100644
--- a/api/src/main/java/com/datatorrent/api/Partitioner.java
+++ b/api/src/main/java/com/datatorrent/api/Partitioner.java
@@ -52,7 +52,7 @@
    * @return New partitioning. Partitions from input list which should not be
    * changed can be returned as they are.
    */
-  public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context);
+  Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context);
 
   /**
    * The engine calls this method to notify partitioner of the changes to partitioning.
@@ -64,7 +64,7 @@
    */
   void partitioned(Map<Integer, Partition<T>> partitions);
 
-  public class PartitionKeys implements java.io.Serializable
+  class PartitionKeys implements java.io.Serializable
   {
     private static final long serialVersionUID = 201312271835L;
     public final int mask;
@@ -109,7 +109,7 @@
 
   }
 
-  public interface Partition<T>
+  interface Partition<T>
   {
     /**
      * Return the partition keys for this partition.
@@ -117,7 +117,7 @@
      *
      * @return Map<InputPort<?>, PartitionKeys>
      */
-    public Map<InputPort<?>, PartitionKeys> getPartitionKeys();
+    Map<InputPort<?>, PartitionKeys> getPartitionKeys();
 
     /**
      * Get an indication of the load handled by this partition. The indicator
@@ -129,7 +129,7 @@
      *
      * @return Integer indicative of the load handled by the partition.
      */
-    public int getLoad();
+    int getLoad();
 
     /**
      * Get the latest statistics for this partition. Null when no stats have been collected yet.
@@ -140,14 +140,14 @@
      *
      * @return
      */
-    public BatchedOperatorStats getStats();
+    BatchedOperatorStats getStats();
 
     /**
      * Get the frozen state of the operator which is currently handling the partition.
      *
      * @return frozen operator instance
      */
-    public T getPartitionedInstance();
+    T getPartitionedInstance();
 
     /**
      * Get the attributes associated with this partition.
@@ -155,14 +155,14 @@
      *
      * @return attributes defined for the current context.
      */
-    public com.datatorrent.api.Attribute.AttributeMap getAttributes();
+    com.datatorrent.api.Attribute.AttributeMap getAttributes();
 
   }
 
   /**
    * Contextual information presented to the partitioner.
    */
-  public interface PartitioningContext
+  interface PartitioningContext
   {
     /**
      * Number of partitions required for an operator that was configured to be parallel partitioned.
diff --git a/api/src/main/java/com/datatorrent/api/Sink.java b/api/src/main/java/com/datatorrent/api/Sink.java
index 1e7b1b3..e0c37c3 100644
--- a/api/src/main/java/com/datatorrent/api/Sink.java
+++ b/api/src/main/java/com/datatorrent/api/Sink.java
@@ -38,7 +38,7 @@
    * pass null otherwise.
    */
   @SuppressWarnings("unchecked")
-  public static final Sink<Object>[] NO_SINKS = (Sink<Object>[])Array.newInstance(Sink.class, 0);
+  Sink<Object>[] NO_SINKS = (Sink<Object>[])Array.newInstance(Sink.class, 0);
   /**
    * Constant
    * <code>BLACKHOLE</code>
@@ -46,7 +46,7 @@
    * This sink discards anything that's put into it silently. Use this sink if you need a sink that
    * discards everything with super low cost.
    */
-  public static final Sink<Object> BLACKHOLE = new Sink<Object>()
+  Sink<Object> BLACKHOLE = new Sink<Object>()
   {
     @Override
     public void put(Object tuple)
@@ -66,7 +66,7 @@
    *
    * @param tuple payload to be processed by this sink.
    */
-  public void put(T tuple);
+  void put(T tuple);
 
   /**
    * Give the count of the tuples processed since the last reset.
@@ -74,6 +74,6 @@
    * @param reset reset the count if true.
    * @return the count of tuples processed since the last reset.
    */
-  public int getCount(boolean reset);
+  int getCount(boolean reset);
 
 }
diff --git a/api/src/main/java/com/datatorrent/api/Stats.java b/api/src/main/java/com/datatorrent/api/Stats.java
index 0e4377d..b50c3d7 100644
--- a/api/src/main/java/com/datatorrent/api/Stats.java
+++ b/api/src/main/java/com/datatorrent/api/Stats.java
@@ -29,7 +29,7 @@
  */
 public interface Stats extends Serializable
 {
-  public static final long INVALID_TIME_MILLIS = -1;
+  long INVALID_TIME_MILLIS = -1;
 
   interface Checkpoint extends Serializable
   {
@@ -48,7 +48,7 @@
     }
   }
 
-  public static class OperatorStats implements Stats
+  class OperatorStats implements Stats
   {
     public long windowId;
     public Checkpoint checkpoint;
diff --git a/api/src/main/java/com/datatorrent/api/StatsListener.java b/api/src/main/java/com/datatorrent/api/StatsListener.java
index 624a095..ec4d5a0 100644
--- a/api/src/main/java/com/datatorrent/api/StatsListener.java
+++ b/api/src/main/java/com/datatorrent/api/StatsListener.java
@@ -44,7 +44,7 @@
    * Command to be executed at subsequent end of window on the operator instance that is deployed in the container.
    * Provides the opportunity to define operator specific actions such as method invocation or property set.
    */
-  public interface OperatorRequest
+  interface OperatorRequest
   {
     /**
      * Execute the command.
@@ -54,14 +54,14 @@
      * @param windowId
      * @throws IOException
      */
-    public OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException;
+    OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException;
   }
 
   /**
    * Use {@link OperatorRequest}
    */
   @Deprecated
-  public interface OperatorCommand
+  interface OperatorCommand
   {
     /**
      * Execute the command.
@@ -71,28 +71,28 @@
      * @param windowId
      * @throws IOException
      */
-    public void execute(Operator operator, int operatorId, long windowId) throws IOException;
+    void execute(Operator operator, int operatorId, long windowId) throws IOException;
   }
 
-  public interface OperatorResponse
+  interface OperatorResponse
   {
 
     /*
      * The Object to identify the response
      */
-    public Object getResponseId();
+    Object getResponseId();
 
     /*
      * The data payload that needs to be sent back
      */
-    public Object getResponse();
+    Object getResponse();
 
   }
 
   /**
    * List of recent, per window operator stats and moving averages.
    */
-  public interface BatchedOperatorStats
+  interface BatchedOperatorStats
   {
     /**
       Stats list will typically contain multiple entries, depending on streaming window size and heartbeat interval.
@@ -115,7 +115,7 @@
     List<OperatorResponse> getOperatorResponse();
   }
 
-  public class Response implements Serializable
+  class Response implements Serializable
   {
     /**
      * Set true to request repartition of the logical operator.
@@ -160,7 +160,7 @@
    */
   @Target(ElementType.TYPE)
   @Retention(RetentionPolicy.RUNTIME)
-  public @interface DataQueueSize
+  @interface DataQueueSize
   {
   }
 }
diff --git a/api/src/main/java/com/datatorrent/api/StorageAgent.java b/api/src/main/java/com/datatorrent/api/StorageAgent.java
index ed3681f..e2cc3b9 100644
--- a/api/src/main/java/com/datatorrent/api/StorageAgent.java
+++ b/api/src/main/java/com/datatorrent/api/StorageAgent.java
@@ -42,7 +42,7 @@
    * @param windowId - Identifier for the specific state of the operator.
    * @throws IOException
    */
-  public void save(Object object, int operatorId, long windowId) throws IOException;
+  void save(Object object, int operatorId, long windowId) throws IOException;
 
   /**
    * Get the input stream from which can be used to retrieve the stored objects back.
@@ -52,7 +52,7 @@
    * @return object (or a copy of it) which was saved earlier using the save call.
    * @throws IOException
    */
-  public Object load(int operatorId, long windowId) throws IOException;
+  Object load(int operatorId, long windowId) throws IOException;
 
   /**
    * Delete the artifacts related to store call of the operatorId and the windowId.
@@ -64,7 +64,7 @@
    * @param windowId
    * @throws IOException
    */
-  public void delete(int operatorId, long windowId) throws IOException;
+  void delete(int operatorId, long windowId) throws IOException;
 
   /**
    * Return an array windowId for which the object was saved but not deleted.
@@ -77,13 +77,13 @@
    * @return Collection of windowIds for available states that can be retrieved through load.
    * @throws IOException
    */
-  public long[] getWindowIds(int operatorId) throws IOException;
+  long[] getWindowIds(int operatorId) throws IOException;
 
   /**
    * Interface to pass application attributes to storage agent
    *
    */
-  public interface ApplicationAwareStorageAgent extends StorageAgent
+  interface ApplicationAwareStorageAgent extends StorageAgent
   {
 
     /**
@@ -91,7 +91,7 @@
      *
      * @param map attributes of application
      */
-    public void setApplicationAttributes(AttributeMap map);
+    void setApplicationAttributes(AttributeMap map);
   }
 
 }
diff --git a/api/src/main/java/com/datatorrent/api/StreamingApplication.java b/api/src/main/java/com/datatorrent/api/StreamingApplication.java
index cf1d6ec..854168a 100644
--- a/api/src/main/java/com/datatorrent/api/StreamingApplication.java
+++ b/api/src/main/java/com/datatorrent/api/StreamingApplication.java
@@ -39,6 +39,16 @@
  */
 public interface StreamingApplication
 {
+  /**
+   * Prefix used in configuration keys.
+   */
+  String APEX_PREFIX = "apex.";
+
+  /**
+   * Legacy prefix, to be removed in future release,
+   * when all code dependencies are upgraded.
+   */
+  @Deprecated
   String DT_PREFIX = "dt.";
   /**
    * Launch mode for the application.
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index d4a0a41..fa8ab23 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -302,7 +302,7 @@
           return clazz.getConstructor(String.class).newInstance(parts[1]);
         } else {
           T object = clazz.getConstructor(String.class).newInstance(parts[1]);
-          HashMap<String, String> hashMap = new HashMap<String, String>();
+          HashMap<String, String> hashMap = new HashMap<>();
           for (int i = 2; i < parts.length; i++) {
             String[] keyValPair = parts[i].split(propertySeparator, 2);
             hashMap.put(keyValPair[0], keyValPair[1]);
@@ -365,11 +365,11 @@
       }
 
       if (string.isEmpty()) {
-        return new HashMap<K, V>();
+        return new HashMap<>();
       }
 
       String[] parts = string.split(separator);
-      HashMap<K, V> map = new HashMap<K, V>();
+      HashMap<K, V> map = new HashMap<>();
       for (String part : parts) {
         String[] kvpair = part.split(equal, 2);
         map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1]));
@@ -433,7 +433,7 @@
       }
 
       String[] parts = string.split(separator);
-      ArrayList<T> arrayList = new ArrayList<T>(parts.length);
+      ArrayList<T> arrayList = new ArrayList<>(parts.length);
       for (String part : parts) {
         arrayList.add(codec.fromString(part));
       }
diff --git a/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java
index 7393cd5..cafe4b7 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java
@@ -44,6 +44,6 @@
    * <li>Runtime application alias -- specified in application code</li>
    *
    */
-  public String name();
+  String name();
 
 }
diff --git a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
index 76fe497..3c6da18 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java
@@ -39,7 +39,7 @@
    *
    * @return - true if port is optional, false otherwise.
    */
-  public boolean optional() default false;
+  boolean optional() default false;
 
   /**
    * Whether this port needs to know the tuple class. When true, application will have to set
@@ -47,5 +47,5 @@
    *
    * @return true if schema is required; false otherwise.
    */
-  public boolean schemaRequired() default false;
+  boolean schemaRequired() default false;
 }
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
index 8c708f3..16fd370 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
@@ -39,7 +39,7 @@
    * Default value is true indicating operator can be partitioned.
    * @return Whether operator can be partitioned or not
    */
-  public boolean partitionable() default true;
+  boolean partitionable() default true;
 
   /**
    * Element specifying whether an operator can be check-pointed in the middle of an application window.
@@ -48,5 +48,5 @@
    *
    * @return whether operator can be checkpointed in middle of an application window.
    */
-  public boolean checkpointableWithinAppWindow() default true;
+  boolean checkpointableWithinAppWindow() default true;
 }
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
index 62b43c6..749c59f 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java
@@ -38,11 +38,11 @@
   /**
    * <p>optional.</p>
    */
-  public boolean optional() default true;
+  boolean optional() default true;
   /**
    * <p>error.</p>
    */
-  public boolean error() default false;
+  boolean error() default false;
 
   /**
    * Whether this port needs to know the tuple class. When true, application will have to set
@@ -50,6 +50,6 @@
    *
    * @return  true if schema is required; false otherwise.
    */
-  public boolean schemaRequired() default false;
+  boolean schemaRequired() default false;
 }
 
diff --git a/api/src/main/java/com/datatorrent/api/annotation/RecordField.java b/api/src/main/java/com/datatorrent/api/annotation/RecordField.java
index 1675c54..4d6c126 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/RecordField.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/RecordField.java
@@ -37,7 +37,7 @@
    /**
     * <p>type.</p>
     */
-   public String type();
+   String type();
 
-   public boolean publish() default true;
+   boolean publish() default true;
 }
diff --git a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
index 6c54bed..3fc46bc 100644
--- a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
+++ b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
@@ -87,7 +87,7 @@
    */
   void postValidateDAG();
 
-  public static class DAGSetupPluginContext implements ApexPluginContext
+  class DAGSetupPluginContext implements ApexPluginContext
   {
     private final DAG dag;
     private final Configuration conf;
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
index 82cf50e..8ff0205 100644
--- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -36,17 +36,17 @@
   /**
    * Parameter to specify extra jars for launch.
    */
-  public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> LIB_JARS = new Attribute<>(new StringCodec.String2String());
 
   /**
    * Parameter to specify the previous application id to use to resume launch from.
    */
-  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String());
 
   /**
    * Parameter to specify the queue name to use for launch.
    */
-  public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> QUEUE_NAME = new Attribute<>(new StringCodec.String2String());
 
   static {
     Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);
diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
index fcb1809..b463619 100644
--- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
+++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
@@ -51,7 +51,7 @@
 
   interface iface
   {
-    Attribute<Greeting> greeting = new Attribute<Greeting>(Greeting.hello);
+    Attribute<Greeting> greeting = new Attribute<>(Greeting.hello);
   }
 
   @Test
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 84999fa..d08b9fc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -336,14 +336,14 @@
   {
     all_listeners.add(dl);
     //logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this);
-    ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+    ArrayList<BitVector> partitions = new ArrayList<>();
     if (dl.getPartitions(partitions) > 0) {
       for (BitVector partition : partitions) {
         HashSet<DataListener> set;
         if (listeners.containsKey(partition)) {
           set = listeners.get(partition);
         } else {
-          set = new HashSet<DataListener>();
+          set = new HashSet<>();
           listeners.put(partition, set);
         }
         set.add(dl);
@@ -353,7 +353,7 @@
       if (listeners.containsKey(DataListener.NULL_PARTITION)) {
         set = listeners.get(DataListener.NULL_PARTITION);
       } else {
-        set = new HashSet<DataListener>();
+        set = new HashSet<>();
         listeners.put(DataListener.NULL_PARTITION, set);
       }
 
@@ -363,7 +363,7 @@
 
   public void removeDataListener(DataListener dl)
   {
-    ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+    ArrayList<BitVector> partitions = new ArrayList<>();
     if (dl.getPartitions(partitions) > 0) {
       for (BitVector partition : partitions) {
         if (listeners.containsKey(partition)) {
@@ -459,7 +459,7 @@
 
     // When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it.
     Block b = first;
-    Map<Block, Integer> indices = new HashMap<Block, Integer>();
+    Map<Block, Integer> indices = new HashMap<>();
     int i = 0;
     while (b != null) {
       indices.put(b, i++);
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 08a483a..b06e60a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -71,8 +71,8 @@
     this.identifier = identifier;
     this.upstream = upstream;
     this.group = group;
-    this.physicalNodes = new HashSet<PhysicalNode>();
-    this.partitions = new HashSet<BitVector>();
+    this.physicalNodes = new HashSet<>();
+    this.partitions = new HashSet<>();
     this.iterator = iterator;
     this.skipWindowId = skipWindowId;
     this.eventloop = eventloop;
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7ac518b..8a56b51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -260,7 +260,7 @@
   }
 
   private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
-  private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
+  private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
   private final int blockSize;
   private final int numberOfCacheBlocks;
@@ -883,7 +883,7 @@
         }
       }
 
-      ArrayList<LogicalNode> list = new ArrayList<LogicalNode>();
+      ArrayList<LogicalNode> list = new ArrayList<>();
       String publisherIdentifier = datalist.getIdentifier();
       Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
       while (iterator.hasNext()) {
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/Storage.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/Storage.java
index 1a3a7fb..19ef681 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/Storage.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/Storage.java
@@ -43,7 +43,7 @@
    * @return instance of storage.
    * @throws IOException
    */
-  public Storage getInstance() throws IOException;
+  Storage getInstance() throws IOException;
 
   /**
    * Store memory block represented by block in non memory storage.
@@ -60,7 +60,7 @@
    * @param end - the offset of the last byte in the array.
    * @return unique identifier for the stored block.
    */
-  public int store(String Identifier, byte[] bytes, int start, int end);
+  int store(String Identifier, byte[] bytes, int start, int end);
 
   /**
    *
@@ -68,7 +68,7 @@
    * @param uniqueIdentifier secondary and unique identifier of the block which needs to be retrived.
    * @return memory block which was stored with the passed parameters as identifying information.
    */
-  public byte[] retrieve(String identifier, int uniqueIdentifier);
+  byte[] retrieve(String identifier, int uniqueIdentifier);
 
   /**
    * Discard the block stored from the secondary storage.
@@ -76,5 +76,5 @@
    * @param identifier
    * @param uniqueIdentifier
    */
-  public void discard(String identifier, int uniqueIdentifier);
+  void discard(String identifier, int uniqueIdentifier);
 }
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 124cc5f..000ce00 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -31,7 +31,7 @@
  */
 public class System
 {
-  private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<String, DefaultEventLoop>();
+  private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<>();
 
   public static void startup(String identifier)
   {
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index f7b8829..371f98a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -42,7 +42,7 @@
     String down_type = "SubscriberId/StreamType";
     String upstream_id = "PublisherId";
     int mask = 7;
-    ArrayList<Integer> partitions = new ArrayList<Integer>();
+    ArrayList<Integer> partitions = new ArrayList<>();
     partitions.add(5);
     long startingWindowId = 0xcafebabe00000078L;
 
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 3c0cb0e..f44ef1a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -32,7 +32,7 @@
  */
 public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber
 {
-  public final ArrayList<Object> resetPayloads = new ArrayList<Object>();
+  public final ArrayList<Object> resetPayloads = new ArrayList<>();
   public AtomicInteger tupleCount = new AtomicInteger(0);
   public WindowIdHolder firstPayload;
   public WindowIdHolder lastPayload;
@@ -134,15 +134,15 @@
 
   public interface WindowIdHolder
   {
-    public int getWindowId();
+    int getWindowId();
 
   }
 
   public interface ResetHolder
   {
-    public int getBaseSeconds();
+    int getBaseSeconds();
 
-    public int getWindowWidth();
+    int getWindowWidth();
 
   }
 
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index bbf9753..dd1b7c3 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -50,7 +50,7 @@
      * null then this Store should have a separate query operator connected to it.
      * @return The query connector which is used by the store operator to receive queries.
      */
-    public EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider();
+    EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider();
 
     /**
      * Sets the query connector which is used by the store operator to receive queries. The store operator will call
@@ -58,7 +58,7 @@
      * its {@link Operator#setup} method is called.
      * @param embeddableQueryInfoProvider The query connector which is used by the store operator to receive queries.
      */
-    public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider);
+    void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider);
   }
 
   /**
@@ -77,7 +77,7 @@
      * Gets the output port for queries.
      * @return The output port for queries.
      */
-    public DefaultOutputPort<QUERY_TYPE> getOutputPort();
+    DefaultOutputPort<QUERY_TYPE> getOutputPort();
 
     /**
      * If this method is called at least once then this operator will work as if it were embedded in an {@link AppData.Store}.
@@ -85,7 +85,7 @@
      * is set on an {@link AppData.Store} then the {@link AppData.Store} will call the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
      * method once before the {@link Operator.setup} is called.
      */
-    public void enableEmbeddedMode();
+    void enableEmbeddedMode();
   }
 
   /**
@@ -97,13 +97,13 @@
      * Returns the connection url used by the appdata Query or Result operator.
      * @return The connection url used by the AppData Query or Result operator.
      */
-    public String getAppDataURL();
+    String getAppDataURL();
 
     /**
      * Returns the topic that the appdata Query or Result operator sends data to.
      * @return The topic that the appdata Query or Result operator sends data to.
      */
-    public String getTopic();
+    String getTopic();
   }
 
   /**
@@ -113,7 +113,7 @@
   @Target(ElementType.TYPE)
   @Retention(RetentionPolicy.RUNTIME)
   @Inherited
-  public @interface AppendQueryIdToTopic
+  @interface AppendQueryIdToTopic
   {
     boolean value() default false;
   }
@@ -124,7 +124,7 @@
   @Documented
   @Target(ElementType.FIELD)
   @Retention(RetentionPolicy.RUNTIME)
-  public @interface QueryPort
+  @interface QueryPort
   {
   }
 
@@ -134,7 +134,7 @@
   @Documented
   @Target(ElementType.FIELD)
   @Retention(RetentionPolicy.RUNTIME)
-  public @interface ResultPort
+  @interface ResultPort
   {
   }
 }
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index 165d8cf..d77b1ae 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -114,7 +114,7 @@
       newPartitions = Lists.newArrayList();
 
       for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) {
-        newPartitions.add(new DefaultPartition<T>(partition.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
       }
 
       // partition the stream that was first connected in the DAG and send full data to remaining input ports
@@ -156,8 +156,8 @@
    */
   public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions)
   {
-    List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
-    HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<Integer, Partition<T>>();
+    List<Partition<T>> newPartitions = new ArrayList<>();
+    HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>();
     for (Partition<T> p: partitions) {
       int load = p.getLoad();
       if (load < 0) {
@@ -201,7 +201,7 @@
         }
 
         for (int key: newKeys) {
-          Partition<T> newPartition = new DefaultPartition<T>(p.getPartitionedInstance());
+          Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance());
           newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
           newPartitions.add(newPartition);
         }
@@ -224,8 +224,8 @@
    */
   public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions)
   {
-    List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
-    List<Partition<T>> lowLoadPartitions = new ArrayList<Partition<T>>();
+    List<Partition<T>> newPartitions = new ArrayList<>();
+    List<Partition<T>> lowLoadPartitions = new ArrayList<>();
     for (Partition<T> p: partitions) {
       int load = p.getLoad();
       if (load < 0) {
@@ -235,8 +235,8 @@
           lowLoadPartitions.add(p);
         }
       } else if (load > 0) {
-        newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
-        newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
       } else {
         newPartitions.add(p);
       }
@@ -274,7 +274,7 @@
       T anOperator = newPartitions.iterator().next().getPartitionedInstance();
 
       while (morePartitionsToCreate-- > 0) {
-        DefaultPartition<T> partition = new DefaultPartition<T>(anOperator);
+        DefaultPartition<T> partition = new DefaultPartition<>(anOperator);
         newPartitions.add(partition);
       }
     }
diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
index dccd7b7..3dc4dda 100644
--- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
+++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
@@ -32,17 +32,17 @@
   /**
    * Attribute for the user name for login.
    */
-  Attribute<String> USER_NAME = new Attribute<String>((String)null);
+  Attribute<String> USER_NAME = new Attribute<>((String)null);
 
   /**
    * Attribute for the password for login.
    */
 
-  Attribute<char[]> PASSWORD = new Attribute<char[]>((char[])null);
+  Attribute<char[]> PASSWORD = new Attribute<>((char[])null);
 
   /**
    * Attribute for the realm for login.
    */
-  Attribute<String> REALM = new Attribute<String>((String)null);
+  Attribute<String> REALM = new Attribute<>((String)null);
 
 }
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 24d850e..0c389a4 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -30,6 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -45,7 +46,7 @@
  *
  * @since 3.1.0
  */
-public class AsyncFSStorageAgent extends FSStorageAgent
+public class AsyncFSStorageAgent extends FSStorageAgent implements AsyncStorageAgent
 {
   private final transient Configuration conf;
   private transient volatile String localBasePath;
@@ -146,6 +147,16 @@
   }
 
   @Override
+  public void finalize(int operatorId, long windowId) throws IOException
+  {
+    // Checkpoint already present in HDFS during save, when syncCheckpoint is true.
+    if (isSyncCheckpoint()) {
+      return;
+    }
+    copyToHDFS(operatorId, windowId);
+  }
+
+  @Override
   public Object readResolve() throws ObjectStreamException
   {
     AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null);
@@ -153,6 +164,7 @@
     return asyncFSStorageAgent;
   }
 
+  @Override
   public boolean isSyncCheckpoint()
   {
     return syncCheckpoint;
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
index ca7490d..f90a888 100644
--- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -48,7 +48,7 @@
     }
   };
 
-  public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+  public transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
 
   protected List<T> lastWindowTuples = new ArrayList<>();
 
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index fe90b86..b5a43fe 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.common.util;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectStreamException;
@@ -153,6 +154,16 @@
   public long[] getWindowIds(int operatorId) throws IOException
   {
     Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId));
+    try {
+      FileStatus status = fileContext.getFileStatus(lPath);
+      if (!status.isDirectory()) {
+        throw new IOException("Checkpoint location is not a directory ");
+      }
+    } catch (FileNotFoundException ex) {
+      // During initialization this directory may not exists.
+      // return an empty array.
+      return new long[0];
+    }
 
     RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
     if (!fileStatusRemoteIterator.hasNext()) {
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 7723fed..ef837a8 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -55,9 +55,9 @@
     this.objectMapper = new ObjectMapper();
     objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
     objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
-    module.addSerializer(ObjectMapperString.class, new RawSerializer<Object>(Object.class));
-    module.addSerializer(JSONObject.class, new RawSerializer<Object>(Object.class));
-    module.addSerializer(JSONArray.class, new RawSerializer<Object>(Object.class));
+    module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class));
+    module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class));
+    module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class));
     objectMapper.registerModule(module);
   }
 
diff --git a/common/src/main/java/com/datatorrent/common/util/NumberAggregate.java b/common/src/main/java/com/datatorrent/common/util/NumberAggregate.java
index 9a89e83..6c4c31e 100644
--- a/common/src/main/java/com/datatorrent/common/util/NumberAggregate.java
+++ b/common/src/main/java/com/datatorrent/common/util/NumberAggregate.java
@@ -30,40 +30,40 @@
    *
    * @return The min
    */
-  public Number getMin();
+  Number getMin();
 
   /**
    * Gets the maximum of the given numbers
    *
    * @return The max
    */
-  public Number getMax();
+  Number getMax();
 
   /**
    * Gets the sum of the given numbers
    *
    * @return The sum
    */
-  public Number getSum();
+  Number getSum();
 
   /**
    * Gets the average of the given numbers
    *
    * @return The avg
    */
-  public Number getAvg();
+  Number getAvg();
 
   /**
    * Add a long to the number set
    *
    * @param num the number
    */
-  public void addNumber(Number num);
+  void addNumber(Number num);
 
   /**
    * This is the aggregate class for Long.
    */
-  public static class LongAggregate implements NumberAggregate
+  class LongAggregate implements NumberAggregate
   {
     private int count = 0;
     private long min = Long.MAX_VALUE;
@@ -134,7 +134,7 @@
   /**
    * This is the aggregate class for Double.
    */
-  public static class DoubleAggregate implements NumberAggregate
+  class DoubleAggregate implements NumberAggregate
   {
     private int count = 0;
     private double min = Double.MAX_VALUE;
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index 63d1646..af5e10e 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -53,7 +53,7 @@
    */
   public static <T> String constructPublishMessage(String topic, T data, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.PUBLISH);
     pubSubMessage.setTopic(topic);
     pubSubMessage.setData(data);
@@ -72,7 +72,7 @@
    */
   public static <T> String constructSubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.SUBSCRIBE);
     pubSubMessage.setTopic(topic);
 
@@ -90,7 +90,7 @@
    */
   public static <T> String constructUnsubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE);
     pubSubMessage.setTopic(topic);
 
@@ -108,7 +108,7 @@
    */
   public static <T> String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS);
     pubSubMessage.setTopic(topic);
 
@@ -126,7 +126,7 @@
    */
   public static <T> String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS);
     pubSubMessage.setTopic(topic);
 
@@ -135,7 +135,7 @@
 
   public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
   {
-    HashMap<String, Object> map = new HashMap<String, Object>();
+    HashMap<String, Object> map = new HashMap<>();
     map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
     map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
     T data = pubSubMessage.getData();
@@ -156,7 +156,7 @@
   public PubSubMessage<T> parseMessage(String message) throws IOException
   {
     HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));
     pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY));
     pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY));
diff --git a/common/src/main/java/com/datatorrent/common/util/ScheduledExecutorService.java b/common/src/main/java/com/datatorrent/common/util/ScheduledExecutorService.java
index 961dbed..3ab24e9 100644
--- a/common/src/main/java/com/datatorrent/common/util/ScheduledExecutorService.java
+++ b/common/src/main/java/com/datatorrent/common/util/ScheduledExecutorService.java
@@ -29,5 +29,5 @@
    *
    * @return long
    */
-  public long getCurrentTimeMillis();
+  long getCurrentTimeMillis();
 }
diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
new file mode 100644
index 0000000..337ccdd
--- /dev/null
+++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.common.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * Storage agent which can take checkpoints asynchronously.
+ * An AsyncStorageAgent enables quick checkpoints by taking local snapshot of an operator
+ * and unblocking the operator to process more data, while storage engine is pushing local snapshot to
+ * the distributed or globally accessible location for recovery.
+ */
+@InterfaceStability.Evolving
+public interface AsyncStorageAgent extends StorageAgent
+{
+  /**
+   * Make checkpoint for given windowID final. i.e after this method returns,
+   * the checkpoint is accessible for recovery.
+   *
+   * @param operatorId
+   * @param windowId
+   * @throws IOException
+   */
+  void finalize(int operatorId, long windowId) throws IOException;
+
+  /**
+   * Check if StorageAgent is configured to take synchronous checkpoints.
+   *
+   * @return true if StorageAgent is configured to take synchronous checkpoints.
+   * @return false otherwise.
+   */
+  boolean isSyncCheckpoint();
+
+}
diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
new file mode 100644
index 0000000..d6fec8e
--- /dev/null
+++ b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.common.util;
+
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.StorageAgent;
+
+/**
+ * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store
+ * the checkpoint, and use the parent agent to read old checkpoints. For application having
+ * large number of physical operators, the size and number of files to be copied could be
+ * large impacting application restart time. This storage-agent is used during application
+ * restart to avoiding copying checkpoints from old application directory to improve application
+ * restart time.
+ */
+public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
+{
+  private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
+  private final StorageAgent parent;
+  private final StorageAgent current;
+  private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
+
+  public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
+  {
+    this.parent = parent;
+    this.current = current;
+    oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
+  }
+
+  /**
+   * does the checkpoint belong to parent
+   */
+  private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException
+  {
+    long[] wids = getParentWindowIds(operatorId);
+    if (wids.length != 0) {
+      return (wid <= wids[wids.length - 1]);
+    }
+    return false;
+  }
+
+  /**
+   * Return window-id of checkpoints available in old storage agent. This function
+   * will call getWindowIds of old storage agent only once for the fist time, and
+   * return cached data for next calls for same operator.
+   *
+   * @param operatorId
+   * @return
+   * @throws IOException
+   */
+  private long[] getParentWindowIds(int operatorId) throws IOException
+  {
+    long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId);
+    if (oldWindowIds == null) {
+      oldWindowIds = parent.getWindowIds(operatorId);
+      if (oldWindowIds == null) {
+        oldWindowIds = new long[0];
+      }
+      Arrays.sort(oldWindowIds);
+      oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds);
+      logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}", operatorId, Arrays.toString(oldWindowIds));
+    }
+    return oldWindowIds;
+  }
+
+  /**
+   * Save object in current storage agent. This should not modify old storage agent
+   * in any way.
+   *
+   * @param object - The operator whose state needs to be saved.
+   * @param operatorId - Identifier of the operator.
+   * @param windowId - Identifier for the specific state of the operator.
+   * @throws IOException
+   */
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws IOException
+  {
+    current.save(object, operatorId, windowId);
+  }
+
+  /**
+   * Delete old checkpoints from the storage agent.
+   *
+   * The checkpoints are deleted from current directory if it is present in current
+   * storage agent. and cached state for old storage agent is removed.
+   *
+   * @param operatorId
+   * @param windowId
+   * @throws IOException
+   */
+  @Override
+  public void delete(int operatorId, long windowId) throws IOException
+  {
+    if (!isCheckpointFromParent(operatorId, windowId)) {
+      current.delete(operatorId, windowId);
+    }
+  }
+
+  /**
+   * Load checkpoint from storage agents. Do a basic comparision of windowIds
+   * to check the storage agent which has the checkpoint.
+   *
+   * @param operatorId Id for which the object was previously saved
+   * @param windowId WindowId for which the object was previously saved
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public Object load(int operatorId, long windowId) throws IOException
+  {
+    long[] oldWindowIds = getParentWindowIds(operatorId);
+    if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length - 1]) {
+      return parent.load(operatorId, windowId);
+    }
+    return current.load(operatorId, windowId);
+  }
+
+  @Override
+  public long[] getWindowIds(int operatorId) throws IOException
+  {
+    long[] currentIds = current.getWindowIds(operatorId);
+    long[] oldWindowIds = getParentWindowIds(operatorId);
+    return merge(currentIds, oldWindowIds);
+  }
+
+  private static final long[] EMPTY_LONG_ARRAY = new long[0];
+  private long[] merge(long[] currentIds, long[] oldWindowIds)
+  {
+    if (currentIds == null && oldWindowIds == null) {
+      return EMPTY_LONG_ARRAY;
+    }
+    if (currentIds == null) {
+      return oldWindowIds;
+    }
+    if (oldWindowIds == null) {
+      return currentIds;
+    }
+    long[] mergedArray = new long[currentIds.length + oldWindowIds.length];
+    System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length);
+    System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length);
+    Arrays.sort(mergedArray);
+    return mergedArray;
+  }
+
+  @Override
+  public void finalize(int operatorId, long windowId) throws IOException
+  {
+    if (current instanceof AsyncStorageAgent) {
+      ((AsyncStorageAgent)current).finalize(operatorId, windowId);
+    }
+  }
+
+  @Override
+  public boolean isSyncCheckpoint()
+  {
+    if (parent instanceof AsyncStorageAgent) {
+      return ((AsyncStorageAgent)parent).isSyncCheckpoint();
+    }
+    return true;
+  }
+
+  public Object readResolve() throws ObjectStreamException
+  {
+    return new CascadeStorageAgent(parent, current);
+  }
+
+  public StorageAgent getCurrentStorageAgent()
+  {
+    return current;
+  }
+
+  public StorageAgent getParentStorageAgent()
+  {
+    return parent;
+  }
+}
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index e7c4887..2e48f54 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -41,7 +41,7 @@
 
   public static class DummyOperator implements Operator
   {
-    public final DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     private Integer value;
 
@@ -93,10 +93,10 @@
   public void partition1Test()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
     partitions.add(defaultPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -111,10 +111,10 @@
   public void partition5Test()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(5);
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(5);
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
     partitions.add(defaultPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -137,10 +137,10 @@
   public void testParallelPartitionScaleUP()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+    partitions.add(new DefaultPartition<>(dummyOperator));
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
         new PartitioningContextImpl(null, 5));
@@ -151,12 +151,12 @@
   public void testParallelPartitionScaleDown()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
 
     for (int i = 5; i-- > 0; ) {
-      partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+      partitions.add(new DefaultPartition<>(dummyOperator));
     }
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
new file mode 100644
index 0000000..40f24f0
--- /dev/null
+++ b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.common.util.CascadeStorageAgent;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
+public class CascadeStorageAgentTest
+{
+
+  static class TestMeta extends TestWatcher
+  {
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      try {
+        FileUtils.forceMkdir(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testSingleIndirection() throws IOException
+  {
+    String oldAppPath = testMeta.applicationPath;
+    FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null);
+    storageAgent.save("1", 1, 1);
+    storageAgent.save("2", 1, 2);
+    storageAgent.save("3", 2, 1);
+
+    String newAppPath = oldAppPath + ".new";
+    CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
+    long[] operatorIds = cascade.getWindowIds(1);
+    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L});
+
+    operatorIds = cascade.getWindowIds(2);
+    Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L});
+
+    /* save should happen to new location */
+    cascade.save("4", 1, 4);
+    FileContext fileContext = FileContext.getFileContext();
+    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 4)));
+    Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4)));
+
+    // check for delete,
+    // delete for old checkpoint should be ignored
+    cascade.save("5", 1, 5);
+    cascade.delete(1, 2L);
+    Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 2)));
+    cascade.delete(1, 4L);
+    Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4)));
+
+    /* chaining of storage agent */
+    String latestAppPath = oldAppPath + ".latest";
+    cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null));
+    CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath, null));
+    operatorIds = latest.getWindowIds(1);
+    Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5});
+
+    latest.save("6", 1, 6);
+    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 6)));
+    Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 6)));
+    Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new Path(latestAppPath + "/" + 1 + "/" + 6)));
+  }
+}
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index 97debe3..79fdac7 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -50,7 +50,7 @@
       }
 
     };
-    public final transient OutputPort<T> output = new DefaultOutputPort<T>();
+    public final transient OutputPort<T> output = new DefaultOutputPort<>();
     private int i;
 
     public void setI(int i)
@@ -109,7 +109,7 @@
   @Test
   public void testReadResolve() throws Exception
   {
-    SerializableOperator<Object> pre = new SerializableOperator<Object>();
+    SerializableOperator<Object> pre = new SerializableOperator<>();
     pre.setI(10);
 
     FileOutputStream fos = new FileOutputStream(filename);
diff --git a/docs/application_development.md b/docs/application_development.md
index 6fe1d4f..07cfd81 100644
--- a/docs/application_development.md
+++ b/docs/application_development.md
@@ -1255,15 +1255,15 @@
 
 ```
 # input operator that reads from a file
-dt.operator.inputOp.classname=com.acme.SampleInputOperator
-dt.operator.inputOp.fileName=somefile.txt
+apex.operator.inputOp.classname=com.acme.SampleInputOperator
+apex.operator.inputOp.fileName=somefile.txt
 
 # output operator that writes to the console
-dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator
+apex.operator.outputOp.classname=com.acme.ConsoleOutputOperator
 
 # stream connecting both operators
-dt.stream.inputStream.source=inputOp.outputPort
-dt.stream.inputStream.sinks=outputOp.inputPort
+apex.stream.inputStream.source=inputOp.outputPort
+apex.stream.inputStream.sinks=outputOp.inputPort
 ```
 
 
@@ -1776,7 +1776,7 @@
 The same set of rules can also be added from properties.xml by setting value for attribute DAGContext.AFFINITY_RULES_SET as JSON string.  For example:
 ```xml
 <property>
-    <name>dt.application.AffinityRulesSampleApplication.attr.AFFINITY_RULES_SET</name>
+    <name>apex.application.AffinityRulesSampleApplication.attr.AFFINITY_RULES_SET</name>
     <value>
     {
       "affinityRules": [
@@ -2952,4 +2952,4 @@
 [Apache Apex-Malhar repository](https://github.com/apache/apex-malhar).
 All of these do computations in real-time. Developers are encouraged to
 review them as they use various features of the platform and provide an
-opportunity for quick learning.
\ No newline at end of file
+opportunity for quick learning.
diff --git a/docs/application_packages.md b/docs/application_packages.md
index 891ecee..74886fc 100644
--- a/docs/application_packages.md
+++ b/docs/application_packages.md
@@ -23,26 +23,26 @@
 
 First, change to the directory where you put your projects, and create
 an Apex application project using Maven by running the following
-command.  Replace "com.example", "mydtapp" and "1.0-SNAPSHOT" with the
+command.  Replace "com.example", "myapp" and "1.0-SNAPSHOT" with the
 appropriate values (make sure this is all on one line):
 
     $ mvn archetype:generate \
      -DarchetypeGroupId=org.apache.apex \
      -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \
-     -DgroupId=com.example -Dpackage=com.example.mydtapp -DartifactId=mydtapp \
+     -DgroupId=com.example -Dpackage=com.example.myapp -DartifactId=myapp \
      -Dversion=1.0-SNAPSHOT
 
-This creates a Maven project named "mydtapp". Open it with your favorite
+This creates a Maven project named "myapp". Open it with your favorite
 IDE (e.g. NetBeans, Eclipse, IntelliJ IDEA). In the project, there is a
 sample DAG that generates a number of tuples with a random number and
 prints out "hello world" and the random number in the tuples.  The code
 that builds the DAG is in
-src/main/java/com/example/mydtapp/Application.java, and the code that
+src/main/java/com/example/myapp/Application.java, and the code that
 runs the unit test for the DAG is in
-src/test/java/com/example/mydtapp/ApplicationTest.java. Try it out by
+src/test/java/com/example/myapp/ApplicationTest.java. Try it out by
 running the following command:
 
-    $cd mydtapp; mvn package
+    $cd myapp; mvn package
 
 This builds the App Package runs the unit test of the DAG.  You should
 be getting test output similar to this:
@@ -52,7 +52,7 @@
   TESTS
  -------------------------------------------------------
 
- Running com.example.mydtapp.ApplicationTest
+ Running com.example.myapp.ApplicationTest
  hello world: 0.8015370953286478
  hello world: 0.9785359225545481
  hello world: 0.6322611586644047
@@ -76,7 +76,7 @@
 ```
 
 The "mvn package" command creates the App Package file in target
-directory as target/mydtapp-1.0-SNAPSHOT.apa. You will be able to use
+directory as target/myapp-1.0-SNAPSHOT.apa. You will be able to use
 that App Package file to launch this sample application in your actual
 Apex installation.
 
@@ -132,7 +132,7 @@
 ```
 
 By default, as shown above, the default dependencies include
-malhar-library in compile scope, dt-engine in provided scope, and junit
+malhar-library in compile scope, apex-engine in provided scope, and junit
 in test scope.  Do not remove these three dependencies since they are
 necessary for any Apex application.  You can, however, exclude
 transitive dependencies from malhar-library to reduce the size of your
@@ -191,14 +191,14 @@
 
 Application attributes are used to specify the platform behavior for the
 application. They can be specified using the parameter
-```dt.attr.<attribute>```. The prefix “dt” is a constant, “attr” is a
+```apex.attr.<attribute>```. The prefix “apex” is a constant, “attr” is a
 constant denoting an attribute is being specified and ```<attribute>```
 specifies the name of the attribute. Below is an example snippet setting
 the streaming windows size of the application to be 1000 milliseconds.
 
 ```
   <property>
-     <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+     <name>apex.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
      <value>1000</value>
   </property>
 ```
@@ -213,7 +213,7 @@
 
 Operator attributes are used to specify the platform behavior for the
 operator. They can be specified using the parameter
-```dt.operator.<operator-name>.attr.<attribute>```. The prefix “dt” is a
+```apex.operator.<operator-name>.attr.<attribute>```. The prefix “apex” is a
 constant, “operator” is a constant denoting that an operator is being
 specified, ```<operator-name>``` denotes the name of the operator, “attr” is
 the constant denoting that an attribute is being specified and
@@ -225,7 +225,7 @@
 
 ```
 <property>
-  <name>dt.operator.input.attr.APPLICATION_WINDOW_COUNT</name>
+  <name>apex.operator.input.attr.APPLICATION_WINDOW_COUNT</name>
   <value>10</value>
 </property>
 ```
@@ -240,7 +240,7 @@
 
 Operators can be configured using operator specific properties. The
 properties can be specified using the parameter
-```dt.operator.<operator-name>.prop.<property-name>```. The difference
+```apex.operator.<operator-name>.prop.<property-name>```. The difference
 between this and the operator attribute specification described above is
 that the keyword “prop” is used to denote that it is a property and
 ```<property-name>``` specifies the property name.  An example illustrating
@@ -249,7 +249,7 @@
 
 ```
   <property>
-    <name>dt.operator.redis.prop.host</name>
+    <name>apex.operator.redis.prop.host</name>
     <value>127.0.0.1</value>
   </property>
 ```
@@ -265,8 +265,8 @@
 
 ### Port attributes
 Port attributes are used to specify the platform behavior for input and
-output ports. They can be specified using the parameter ```dt.operator.<operator-name>.inputport.<port-name>.attr.<attribute>```
-for input port and ```dt.operator.<operator-name>.outputport.<port-name>.attr.<attribute>```
+output ports. They can be specified using the parameter ```apex.operator.<operator-name>.inputport.<port-name>.attr.<attribute>```
+for input port and ```apex.operator.<operator-name>.outputport.<port-name>.attr.<attribute>```
 for output port. The keyword “inputport” is used to denote an input port
 and “outputport” to denote an output port. The rest of the specification
 follows the conventions described in other specifications above. An
@@ -276,7 +276,7 @@
 
 ```
 <property>
-  <name>dt.operator.range.inputport.input.attr.QUEUE_CAPACITY</name>
+  <name>apex.operator.range.inputport.input.attr.QUEUE_CAPACITY</name>
   <value>4000</value>
 </property>
 ```
@@ -297,7 +297,7 @@
 
 Streams can be configured using stream properties. The properties can be
 specified using the parameter
-```dt.stream.<stream-name>.prop.<property-name>```  The constant “stream”
+```apex.stream.<stream-name>.prop.<property-name>```  The constant “stream”
 specifies that it is a stream, ```<stream-name>``` specifies the name of the
 stream and ```<property-name>``` the name of the property. The name of the
 stream is the same name that is passed when the stream is added to the
@@ -308,7 +308,7 @@
 
 ```
   <property>
-    <name>dt.stream.stream1.prop.locality</name>
+    <name>apex.stream.stream1.prop.locality</name>
     <value>CONTAINER_LOCAL</value>
   </property>
 ```
@@ -339,7 +339,7 @@
 done as follows
 ```
 <property>
-  <name>dt.operator.range.port.*.attr.QUEUE_CAPACITY</name>
+  <name>apex.operator.range.port.*.attr.QUEUE_CAPACITY</name>
   <value>4000</value>
 </property>
 ```
@@ -372,13 +372,13 @@
 
 The name of an application-specific property takes the form of:
 
-```dt.operator.{opName}.prop.{propName} ```
+```apex.operator.{opName}.prop.{propName} ```
 
 The first represents the property with name propName of operator opName.
  Or you can set the application name at run time by setting this
 property:
 
-        dt.attr.APPLICATION_NAME
+        apex.attr.APPLICATION_NAME
 
 
 In this example, property some_name_1 is a required property which
@@ -531,12 +531,12 @@
 ## Creating Configuration Packages
 
 Creating Configuration Packages is similar to creating Application Packages. You can create a configuration 
-package project using Maven by running the following command. Replace "com.example", "mydtconfig" and "1.0-SNAPSHOT" with the appropriate values:
+package project using Maven by running the following command. Replace "com.example", "myconfig" and "1.0-SNAPSHOT" with the appropriate values:
 
 ```
 $ mvn archetype:generate -DarchetypeGroupId=org.apache.apex \
   -DarchetypeArtifactId=apex-conf-archetype -DarchetypeVersion=3.4.0 \
-  -DgroupId=com.example -Dpackage=com.example.mydtconfig -DartifactId=mydtconfig \
+  -DgroupId=com.example -Dpackage=com.example.myconfig -DartifactId=myconfig \
   -Dversion=1.0-SNAPSHOT
 ```
 
@@ -547,7 +547,7 @@
 ```
 
 The "mvn package" command creates the Config Package file in target
-directory as target/mydtconfig.apc. You will be able to use that
+directory as target/myconfig.apc. You will be able to use that
 Configuration Package file to launch an Apache Apex application.
 
 ## Assembling your own configuration package
@@ -568,7 +568,7 @@
 ```xml
   <groupId>com.example</groupId>
   <version>1.0.0</version>
-  <artifactId>mydtconf</artifactId>
+  <artifactId>myconfig</artifactId>
   <packaging>jar</packaging>
   <!-- change these to the appropriate values -->
   <name>My Apex Application Configuration</name>
@@ -661,7 +661,7 @@
 
 `-conf` option of the launch command in CLI supports specifying configuration package in the local filesystem.  Example:
 
-    dt\> launch mydtapp-1.0.0.apa -conf mydtconfig.apc
+    apex\> launch myapp-1.0.0.apa -conf myconfig.apc
 
 This command expects both the application package and the configuration package to be in the local file system.
 
diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
index 53d91a5..80314c7 100644
--- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
@@ -73,7 +73,7 @@
       for (ContainerRequest cr : requests) {
         ContainerStartRequest csr = hostSpecificRequests.get(cr);
         ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority());
-        MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr);
+        MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, newCr);
         requestedResources.put(csr, pair);
         containerRequests.add(newCr);
         hostSpecificRequests.remove(cr);
@@ -91,7 +91,7 @@
         for (Entry<ContainerRequest, ContainerStartRequest> entry : otherContainerRequests.entrySet()) {
           ContainerRequest cr = entry.getKey();
           ContainerStartRequest csr = entry.getValue();
-          MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+          MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
           requestedResources.put(csr, pair);
           containerRequests.add(cr);
         }
diff --git a/engine/src/main/java/com/datatorrent/stram/EventRecorder.java b/engine/src/main/java/com/datatorrent/stram/EventRecorder.java
index d313693..2994af1 100644
--- a/engine/src/main/java/com/datatorrent/stram/EventRecorder.java
+++ b/engine/src/main/java/com/datatorrent/stram/EventRecorder.java
@@ -27,6 +27,6 @@
  */
 public interface EventRecorder
 {
-  public void recordEventAsync(StramEvent event);
+  void recordEventAsync(StramEvent event);
 
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/LicensingProtocol.java b/engine/src/main/java/com/datatorrent/stram/LicensingProtocol.java
index 5ff255d..4853414 100644
--- a/engine/src/main/java/com/datatorrent/stram/LicensingProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/LicensingProtocol.java
@@ -27,7 +27,7 @@
  */
 public interface LicensingProtocol extends VersionedProtocol
 {
-  public static final long versionID = 201401310447L;
+  long versionID = 201401310447L;
 
-  public byte[] processRequest(byte[] request);
+  byte[] processRequest(byte[] request);
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index e7f9672..45206bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -102,7 +102,7 @@
    */
   public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
   {
-    MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+    MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
     requestedResources.put(csr, pair);
     containerRequests.add(cr);
   }
@@ -164,7 +164,7 @@
 
   public List<String> getNodesExceptHost(List<String> hostNames)
   {
-    List<String> nodesList = new ArrayList<String>();
+    List<String> nodesList = new ArrayList<>();
     Set<String> hostNameSet = Sets.newHashSet();
     hostNameSet.addAll(hostNames);
     for (String host : nodeReportMap.keySet()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/StatsRecorder.java b/engine/src/main/java/com/datatorrent/stram/StatsRecorder.java
index d293ca8..7263529 100644
--- a/engine/src/main/java/com/datatorrent/stram/StatsRecorder.java
+++ b/engine/src/main/java/com/datatorrent/stram/StatsRecorder.java
@@ -31,8 +31,8 @@
  */
 public interface StatsRecorder
 {
-  public void recordContainers(Map<String, StreamingContainerAgent> containerMap, long timestamp) throws IOException;
+  void recordContainers(Map<String, StreamingContainerAgent> containerMap, long timestamp) throws IOException;
 
-  public void recordOperators(List<OperatorInfo> operatorList, long timestamp) throws IOException;
+  void recordOperators(List<OperatorInfo> operatorList, long timestamp) throws IOException;
 
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index dad42e3..b280aad 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -257,6 +257,7 @@
   public void copyInitialState(Path origAppDir) throws IOException
   {
     // locate previous snapshot
+    long copyStart = System.currentTimeMillis();
     String newAppDir = this.dag.assertAppPath();
 
     FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(origAppDir.toString(), conf);
@@ -284,6 +285,7 @@
     logOs.close();
     logIs.close();
 
+    List<String> excludeDirs = Arrays.asList(LogicalPlan.SUBDIR_CHECKPOINTS, LogicalPlan.SUBDIR_EVENTS, LogicalPlan.SUBDIR_STATS);
     // copy sub directories that are not present in target
     FileStatus[] lFiles = fs.listStatus(origAppDir);
 
@@ -298,19 +300,19 @@
     String newAppDirPath = Path.getPathWithoutSchemeAndAuthority(new Path(newAppDir)).toString();
 
     for (FileStatus f : lFiles) {
-      if (f.isDirectory()) {
+      if (f.isDirectory() && !excludeDirs.contains(f.getPath().getName())) {
         String targetPath = f.getPath().toString().replace(origAppDirPath, newAppDirPath);
         if (!fs.exists(new Path(targetPath))) {
-          LOG.debug("Copying {} to {}", f.getPath(), targetPath);
+          LOG.debug("Copying {} size {} to {}", f.getPath(), f.getLen(), targetPath);
+          long start = System.currentTimeMillis();
           FileUtil.copy(fs, f.getPath(), fs, new Path(targetPath), false, conf);
-          //FSUtil.copy(fs, f, fs, new Path(targetPath), false, false, conf);
+          LOG.debug("Copying {} to {} took {} ms", f.getPath(), f.getLen(), targetPath, System.currentTimeMillis() - start);
         } else {
           LOG.debug("Ignoring {} as it already exists under {}", f.getPath(), targetPath);
-          //FSUtil.setPermission(fs, new Path(targetPath), new FsPermission((short)0777));
         }
       }
     }
-
+    LOG.info("Copying initial state took {} ms", System.currentTimeMillis() - copyStart);
   }
 
   /**
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index c68df14..51e85f7 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,6 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.commons.io.IOUtils;
@@ -3238,23 +3239,43 @@
 
       this.finals = new FinalVars(finals, lp);
       StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT);
-      if (sa instanceof AsyncFSStorageAgent) {
-        // replace the default storage agent, if present
-        AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
-        if (fssa.path.contains(oldAppId)) {
-          fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
-          lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
-        }
-      } else if (sa instanceof FSStorageAgent) {
-        // replace the default storage agent, if present
-        FSStorageAgent fssa = (FSStorageAgent)sa;
-        if (fssa.path.contains(oldAppId)) {
-          fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
-          lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa);
-        }
+      lp.setAttribute(OperatorContext.STORAGE_AGENT, updateStorageAgent(sa, oldAppId, appId, conf));
+    }
+  }
+
+  private static StorageAgent updateStorageAgent(StorageAgent sa, String oldAppId, String appId, Configuration conf)
+  {
+    if (sa instanceof AsyncFSStorageAgent || sa instanceof FSStorageAgent) {
+      FSStorageAgent newAgent = (FSStorageAgent)updateFSStorageAgent(sa, oldAppId, appId, conf);
+      if (newAgent != sa) {
+        return new CascadeStorageAgent(sa, newAgent);
+      }
+    } else if (sa instanceof CascadeStorageAgent) {
+      CascadeStorageAgent csa = (CascadeStorageAgent)sa;
+      StorageAgent currentStorageAgent = csa.getCurrentStorageAgent();
+      return new CascadeStorageAgent(csa, updateFSStorageAgent(currentStorageAgent, oldAppId, appId, conf));
+    }
+    return sa;
+  }
+
+  /**
+   * Return updated FileSystem based storage agent. Storage agent is updated only when
+   * they use application directory to store the checkpoints.
+   */
+  private static StorageAgent updateFSStorageAgent(StorageAgent sa, String oldAppId, String appId, Configuration conf)
+  {
+    if (sa instanceof AsyncFSStorageAgent) {
+      AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
+      if (fssa.path.contains(oldAppId)) {
+        return new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
+      }
+    } else if (sa instanceof FSStorageAgent) {
+      FSStorageAgent fssa = (FSStorageAgent)sa;
+      if (fssa.path.contains(oldAppId)) {
+        return new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
       }
     }
-
+    return sa;
   }
 
   public interface RecoveryHandler
diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
index 605944e..bc3a187 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
@@ -29,10 +29,10 @@
  */
 public interface ContainerContext extends Context
 {
-  public static final Attribute<String> IDENTIFIER = new Attribute<>("unknown_container_id");
-  public static final Attribute<Integer> BUFFER_SERVER_MB = new Attribute<>(8 * 64);
-  public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<>(null, null);
-  public static final Attribute<RequestFactory> REQUEST_FACTORY = new Attribute<>(null, null);
+  Attribute<String> IDENTIFIER = new Attribute<>("unknown_container_id");
+  Attribute<Integer> BUFFER_SERVER_MB = new Attribute<>(8 * 64);
+  Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<>(null, null);
+  Attribute<RequestFactory> REQUEST_FACTORY = new Attribute<>(null, null);
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   long serialVersionUID = AttributeInitializer.initialize(ContainerContext.class);
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/api/NodeActivationListener.java b/engine/src/main/java/com/datatorrent/stram/api/NodeActivationListener.java
index dd9ac30..f5b2576 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/NodeActivationListener.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/NodeActivationListener.java
@@ -32,13 +32,13 @@
    *
    * @param node node which got activated.
    */
-  public void activated(Node<?> node);
+  void activated(Node<?> node);
 
   /**
    * Callback to notify the listner that the node has been activated.
    *
    * @param node node which got deactivated.
    */
-  public void deactivated(Node<?> node);
+  void deactivated(Node<?> node);
 
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index b78e8f2..eba10db 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -54,7 +54,7 @@
 @InterfaceStability.Stable
 public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
 {
-  public static final long versionID = 201208081755L;
+  long versionID = 201208081755L;
 
   /**
    * Initialization parameters for StramChild container. Container
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 77959ab..9a7b128 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -92,7 +92,6 @@
 import com.google.common.base.Preconditions;
 import com.sun.jersey.api.client.WebResource;
 
-import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG.GenericOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
@@ -112,6 +111,7 @@
 import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper;
 import com.datatorrent.stram.codec.LogicalPlanSerializer;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 import com.datatorrent.stram.plan.logical.requests.AddStreamSinkRequest;
 import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
 import com.datatorrent.stram.plan.logical.requests.CreateStreamRequest;
@@ -2108,13 +2108,11 @@
 
         if (appFactory != null) {
           if (!commandLineInfo.localMode) {
-
             // see whether there is an app with the same name and user name running
-            String appNameAttributeName = StreamingApplication.DT_PREFIX + Context.DAGContext.APPLICATION_NAME.getName();
-            String appName = config.get(appNameAttributeName, appFactory.getName());
+            String appName = config.get(LogicalPlanConfiguration.KEY_APPLICATION_NAME, appFactory.getName());
             ApplicationReport duplicateApp = StramClientUtils.getStartedAppInstanceByName(yarnClient, appName, UserGroupInformation.getLoginUser().getUserName(), null);
             if (duplicateApp != null) {
-              throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + appNameAttributeName);
+              throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + LogicalPlanConfiguration.KEY_APPLICATION_NAME);
             }
 
             // This is for suppressing System.out printouts from applications so that the user of CLI will not be confused by those printouts
@@ -3788,9 +3786,11 @@
       while (it.hasNext()) {
         Entry<String, String> entry = it.next();
         // filter relevant entries
-        if (entry.getKey().startsWith(StreamingApplication.DT_PREFIX)) {
-          launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
-          requiredProperties.remove(entry.getKey());
+        String key = entry.getKey();
+        if (key.startsWith(StreamingApplication.DT_PREFIX)
+            || key.startsWith(StreamingApplication.APEX_PREFIX)) {
+          launchProperties.set(key, entry.getValue(), Scope.TRANSIENT, null);
+          requiredProperties.remove(key);
         }
       }
     }
diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
index 3a61ee6..b374447 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
@@ -32,10 +32,10 @@
 public class PermissionsInfo
 {
 
-  private final Set<String> readOnlyRoles = new TreeSet<String>();
-  private final Set<String> readOnlyUsers = new TreeSet<String>();
-  private final Set<String> readWriteRoles = new TreeSet<String>();
-  private final Set<String> readWriteUsers = new TreeSet<String>();
+  private final Set<String> readOnlyRoles = new TreeSet<>();
+  private final Set<String> readOnlyUsers = new TreeSet<>();
+  private final Set<String> readWriteRoles = new TreeSet<>();
+  private final Set<String> readWriteUsers = new TreeSet<>();
   private boolean readOnlyEveryone = false;
   private boolean readWriteEveryone = false;
 
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 050729d..15adab4 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -197,7 +197,7 @@
       this.conf = conf;
     }
 
-    public static interface AppStatusCallback
+    public interface AppStatusCallback
     {
       boolean exitLoop(ApplicationReport report);
 
@@ -271,7 +271,7 @@
       }
       Text rmTokenService = new Text(Joiner.on(',').join(services));
 
-      return new Token<RMDelegationTokenIdentifier>(
+      return new Token<>(
           rmDelegationToken.getIdentifier().array(),
           rmDelegationToken.getPassword().array(),
           new Text(rmDelegationToken.getKind()),
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
index 3797830..dc47b4e 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
@@ -32,7 +32,7 @@
   /**
    * A convenience class which is used to hold 2 different values associated with each serialize/deserialize operation.
    */
-  public class DataStatePair
+  class DataStatePair
   {
     /**
      * This byte array corresponds to serialized form of the tuple of type T.
@@ -85,7 +85,7 @@
    * should not be confused with the resetState operation of upstream operator.
    *
    */
-  public void resetState();
+  void resetState();
 
   /**
    * Provide a new instance of the current object.
@@ -96,5 +96,5 @@
    *
    * @return new instance of this codec for which the state has been reset.
    */
-  public StatefulStreamCodec<T> newInstance();
+  StatefulStreamCodec<T> newInstance();
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/ByteCounterStream.java b/engine/src/main/java/com/datatorrent/stram/engine/ByteCounterStream.java
index d71ae69..e217f64 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/ByteCounterStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/ByteCounterStream.java
@@ -25,5 +25,5 @@
  */
 public interface ByteCounterStream extends Stream
 {
-  public long getByteCount(boolean reset);
+  long getByteCount(boolean reset);
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index d779afe..c84a249 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -48,6 +48,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Throwables;
@@ -70,7 +71,6 @@
 import com.datatorrent.api.StatsListener.OperatorRequest;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.OperatorDeployInfo;
@@ -519,16 +519,16 @@
           checkpointStats = new Stats.CheckpointStats();
           checkpointStats.checkpointStartTime = System.currentTimeMillis();
           ba.save(operator, id, windowId);
-          if (ba instanceof AsyncFSStorageAgent) {
-            AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)ba;
-            if (!asyncFSStorageAgent.isSyncCheckpoint()) {
+          if (ba instanceof AsyncStorageAgent) {
+            AsyncStorageAgent asyncStorageAgent = (AsyncStorageAgent)ba;
+            if (!asyncStorageAgent.isSyncCheckpoint()) {
               if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
                 CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo();
                 checkpointWindowInfo.windowId = windowId;
                 checkpointWindowInfo.applicationWindowCount = applicationWindowCount;
                 checkpointWindowInfo.checkpointWindowCount = checkpointWindowCount;
                 CheckpointHandler checkpointHandler = new CheckpointHandler();
-                checkpointHandler.agent = asyncFSStorageAgent;
+                checkpointHandler.agent = asyncStorageAgent;
                 checkpointHandler.operatorId = id;
                 checkpointHandler.windowId = windowId;
                 checkpointHandler.stats = checkpointStats;
@@ -539,7 +539,7 @@
                 checkpointStats = null;
                 return;
               } else {
-                asyncFSStorageAgent.copyToHDFS(id, windowId);
+                asyncStorageAgent.finalize(id, windowId);
               }
             }
           }
@@ -680,8 +680,7 @@
 
   private class CheckpointHandler implements Callable<Stats.CheckpointStats>
   {
-
-    public AsyncFSStorageAgent agent;
+    public AsyncStorageAgent agent;
     public int operatorId;
     public long windowId;
     public Stats.CheckpointStats stats;
@@ -689,7 +688,7 @@
     @Override
     public Stats.CheckpointStats call() throws Exception
     {
-      agent.copyToHDFS(id, windowId);
+      agent.finalize(id, windowId);
       stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
       return stats;
     }
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 7113280..284aefb 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -50,8 +50,8 @@
   private final int id;
   private final String name;
   // the size of the circular queue should be configurable. hardcoded to 1024 for now.
-  private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
-  private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
+  private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024);
+  private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024);
   public final boolean stateless;
   private int windowsFromCheckpoint;
 
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
index 196134f..1aa0641 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
@@ -35,9 +35,9 @@
  */
 public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, ControlTupleEnabledSink<Object>
 {
-  public interface MultiSinkCapableStream extends Stream
+  interface MultiSinkCapableStream extends Stream
   {
-    public void setSink(String id, Sink<Object> sink);
+    void setSink(String id, Sink<Object> sink);
   }
 
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 401eea9..62c4fd8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -760,7 +760,7 @@
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
         InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
         StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
-        StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
+        StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator);
         persistOperatorPortMeta.setStreamCodec(codec);
       }
     }
@@ -1907,14 +1907,14 @@
     HashMap<OperatorPair, AffinityRule> antiAffinities = new HashMap<>();
     HashMap<OperatorPair, AffinityRule> threadLocalAffinities = new HashMap<>();
 
-    List<String> operatorNames = new ArrayList<String>();
+    List<String> operatorNames = new ArrayList<>();
 
     for (OperatorMeta operator : getAllOperators()) {
       operatorNames.add(operator.getName());
-      Set<String> containerSet = new HashSet<String>();
+      Set<String> containerSet = new HashSet<>();
       containerSet.add(operator.getName());
       containerAffinities.put(operator.getName(), containerSet);
-      Set<String> nodeSet = new HashSet<String>();
+      Set<String> nodeSet = new HashSet<>();
       nodeSet.add(operator.getName());
       nodeAffinities.put(operator.getName(), nodeSet);
 
@@ -2073,7 +2073,7 @@
    */
   public void convertRegexToList(List<String> operatorNames, AffinityRule rule)
   {
-    List<String> operators = new LinkedList<String>();
+    List<String> operators = new LinkedList<>();
     Pattern p = Pattern.compile(rule.getOperatorRegex());
     for (String name : operatorNames) {
       if (p.matcher(name).matches()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index ffe33f3..5a9030e 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -112,14 +112,14 @@
   public static final String GATEWAY_PREFIX = StreamingApplication.DT_PREFIX + "gateway.";
   public static final String GATEWAY_LISTEN_ADDRESS = GATEWAY_PREFIX + "listenAddress";
 
-  public static final String STREAM_PREFIX = StreamingApplication.DT_PREFIX + "stream.";
+  public static final String STREAM_PREFIX = StreamingApplication.APEX_PREFIX + "stream.";
   public static final String STREAM_SOURCE = "source";
   public static final String STREAM_SINKS = "sinks";
   public static final String STREAM_TEMPLATE = "template";
   public static final String STREAM_LOCALITY = "locality";
   public static final String STREAM_SCHEMA = "schema";
 
-  public static final String OPERATOR_PREFIX = StreamingApplication.DT_PREFIX + "operator.";
+  public static final String OPERATOR_PREFIX = StreamingApplication.APEX_PREFIX + "operator.";
   public static final String OPERATOR_CLASSNAME = "classname";
   public static final String OPERATOR_TEMPLATE = "template";
 
@@ -144,6 +144,16 @@
     LOG.debug("Initialized attributes {}", serial);
   }
 
+  public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME);
+  public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+
+  private static String keyAndDeprecation(Attribute<?> attr)
+  {
+    String key = StreamingApplication.APEX_PREFIX + attr.getName();
+    Configuration.addDeprecation(StreamingApplication.DT_PREFIX + attr.getName(), key);
+    return key;
+  }
+
   private final DAGSetupPluginManager pluginManager;
 
   /**
@@ -286,7 +296,7 @@
         ambiguousAttributes.addAll(childElement.getAmbiguousAttributes());
 
         @SuppressWarnings("unchecked")
-        Set<String> intersection = (Set<String>)Sets.newHashSet(CollectionUtils.intersection(allChildAttributes, allAttributes));
+        Set<String> intersection = Sets.newHashSet(CollectionUtils.intersection(allChildAttributes, allAttributes));
         ambiguousAttributes.addAll(intersection);
         allChildAttributes.addAll(allAttributes);
       }
@@ -1661,19 +1671,16 @@
    */
   public final void addFromConfiguration(Configuration conf)
   {
-    addFromProperties(toProperties(conf, StreamingApplication.DT_PREFIX), null);
+    addFromProperties(toProperties(conf), null);
   }
 
-  public static Properties toProperties(Configuration conf, String prefix)
+  private static Properties toProperties(Configuration conf)
   {
     Iterator<Entry<String, String>> it = conf.iterator();
     Properties props = new Properties();
     while (it.hasNext()) {
       Entry<String, String> e = it.next();
-      // filter relevant entries
-      if (e.getKey().startsWith(prefix)) {
-        props.put(e.getKey(), e.getValue());
-      }
+      props.put(e.getKey(), e.getValue());
     }
     return props;
   }
@@ -1713,7 +1720,7 @@
     JSONArray operatorArray = json.getJSONArray("operators");
     for (int i = 0; i < operatorArray.length(); i++) {
       JSONObject operator = operatorArray.getJSONObject(i);
-      String operatorPrefix = StreamingApplication.DT_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + ".";
+      String operatorPrefix = StreamingApplication.APEX_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + ".";
       prop.setProperty(operatorPrefix + "classname", operator.getString("class"));
       JSONObject operatorProperties = operator.optJSONObject("properties");
       if (operatorProperties != null) {
@@ -1756,7 +1763,7 @@
 
     JSONObject appAttributes = json.optJSONObject("attributes");
     if (appAttributes != null) {
-      String attributesPrefix = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR;
+      String attributesPrefix = StreamingApplication.APEX_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR;
       @SuppressWarnings("unchecked")
       Iterator<String> iter = appAttributes.keys();
       while (iter.hasNext()) {
@@ -1769,7 +1776,7 @@
     for (int i = 0; i < streamArray.length(); i++) {
       JSONObject stream = streamArray.getJSONObject(i);
       String name = stream.optString("name", "stream-" + i);
-      String streamPrefix = StreamingApplication.DT_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR;
+      String streamPrefix = StreamingApplication.APEX_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR;
       JSONObject source = stream.getJSONObject("source");
       prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + KEY_SEPARATOR + source.getString("portName"));
       JSONArray sinks = stream.getJSONArray("sinks");
@@ -1797,7 +1804,7 @@
 
 
   /**
-   * Read node configurations from opProps. The opProps can be in any
+   * Read operator configurations from properties. The properties can be in any
    * random order, as long as they represent a consistent configuration in their
    * entirety.
    *
@@ -1813,7 +1820,8 @@
     for (final String propertyName : props.stringPropertyNames()) {
       String propertyValue = props.getProperty(propertyName);
       this.properties.setProperty(propertyName, propertyValue);
-      if (propertyName.startsWith(StreamingApplication.DT_PREFIX)) {
+      if (propertyName.startsWith(StreamingApplication.DT_PREFIX) ||
+          propertyName.startsWith(StreamingApplication.APEX_PREFIX)) {
         String[] keyComps = propertyName.split(KEY_SEPARATOR_SPLIT_REGEX);
         parseStramPropertyTokens(keyComps, 1, propertyName, propertyValue, stramConf);
       }
@@ -2239,14 +2247,14 @@
   /**
    * Populate the logical plan from the streaming application definition and configuration.
    * Configuration is resolved based on application alias, if any.
-   * @param app The {@lin StreamingApplication} to be run.
+   * @param app The {@link StreamingApplication} to be run.
    * @param dag This will hold the {@link LogicalPlan} representation of the given {@link StreamingApplication}.
    * @param name The path of the application class in the jar.
    */
   public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
   {
     // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
-    String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName());
+    String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
     dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
     DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
     if (app != null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..f4e2100 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -41,6 +41,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
 import org.apache.commons.lang.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,7 +69,6 @@
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.api.StramEvent;
@@ -477,13 +477,13 @@
     // Log container anti-affinity
     if (LOG.isDebugEnabled()) {
       for (PTContainer container : containers) {
-        List<String> antiOperators = new ArrayList<String>();
+        List<String> antiOperators = new ArrayList<>();
         for (PTContainer c : container.getStrictAntiPrefs()) {
           for (PTOperator operator : c.getOperators()) {
             antiOperators.add(operator.getName());
           }
         }
-        List<String> containerOperators = new ArrayList<String>();
+        List<String> containerOperators = new ArrayList<>();
         for (PTOperator operator : container.getOperators()) {
           containerOperators.add(operator.getName());
         }
@@ -1226,11 +1226,8 @@
       long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId;
       StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
       agent.save(oo, oper.id, windowId);
-      if (agent instanceof AsyncFSStorageAgent) {
-        AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent;
-        if (!asyncFSStorageAgent.isSyncCheckpoint()) {
-          asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
-        }
+      if (agent instanceof AsyncStorageAgent) {
+        ((AsyncStorageAgent)agent).finalize(oper.id, windowId);
       }
     } catch (IOException e) {
       // inconsistent state, no recovery option, requires shutdown
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 38c0f41..47986cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -116,10 +116,10 @@
    */
   public PubSubWebSocketClient()
   {
-    throwable = new AtomicReference<Throwable>();
+    throwable = new AtomicReference<>();
     ioThreadMultiplier = 1;
     mapper = (new JacksonObjectMapperProvider()).getContext(null);
-    codec = new PubSubMessageCodec<Object>(mapper);
+    codec = new PubSubMessageCodec<>(mapper);
 
     AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
     config.setIoThreadMultiplier(ioThreadMultiplier);
diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
index ad5a423..73572f2 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
@@ -51,9 +51,9 @@
 
   public interface Handler
   {
-    public void onMessage(String type, String topic, Object data);
+    void onMessage(String type, String topic, Object data);
 
-    public void onClose();
+    void onClose();
 
   }
 
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
index dd75857..9fbb54d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
@@ -148,7 +148,7 @@
     List<CompactAnnotationNode> annotations = new LinkedList<>();
     for (Object visibleAnnotation : fn.visibleAnnotations) {
       CompactAnnotationNode node = new CompactAnnotationNode();
-      Map<String, Object> annotationMap = new HashMap<String, Object>();
+      Map<String, Object> annotationMap = new HashMap<>();
       if (visibleAnnotation instanceof AnnotationNode) {
         AnnotationNode annotation = (AnnotationNode)visibleAnnotation;
         if (annotation.desc.contains("InputPortFieldAnnotation")
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
index 1e87b31..91a3cf3 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
@@ -87,7 +87,7 @@
 
     char boundChar;
 
-    ArrayList<Type> bounds = new ArrayList<Type>();
+    ArrayList<Type> bounds = new ArrayList<>();
 
     public Type[] getUpperBounds()
     {
@@ -154,7 +154,7 @@
   class ParameterizedTypeNode extends TypeNode
   {
 
-    ArrayList<Type> actualTypeArguments = new ArrayList<Type>();
+    ArrayList<Type> actualTypeArguments = new ArrayList<>();
 
     public Type[] getActualTypeArguments()
     {
diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
index 5930e78..958907d 100644
--- a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
@@ -79,9 +79,7 @@
           throw new LauncherException(e);
         }
       }
-      if (StramUtils.getValueWithDefault(launchParameters, HEARTBEAT_MONITORING)) {
-        lc.setHeartbeatMonitoringEnabled(true);
-      }
+      lc.setHeartbeatMonitoringEnabled(StramUtils.getValueWithDefault(launchParameters, HEARTBEAT_MONITORING));
       if (StramUtils.getValueWithDefault(launchParameters, RUN_ASYNC)) {
         lc.runAsync();
         launched = true;
diff --git a/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java
index dc3153e..d466b23 100644
--- a/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java
+++ b/engine/src/main/java/org/apache/apex/engine/api/DAGExecutionPluginContext.java
@@ -69,21 +69,21 @@
     void handle(T data);
   }
 
-  public StramAppContext getApplicationContext();
+  StramAppContext getApplicationContext();
 
-  public AppInfo.AppStats getApplicationStats();
+  AppInfo.AppStats getApplicationStats();
 
-  public Configuration getLaunchConfig();
+  Configuration getLaunchConfig();
 
-  public DAG getDAG();
+  DAG getDAG();
 
-  public String getOperatorName(int id);
+  String getOperatorName(int id);
 
-  public BatchedOperatorStats getPhysicalOperatorStats(int id);
+  BatchedOperatorStats getPhysicalOperatorStats(int id);
 
-  public List<LogicalOperatorInfo> getLogicalOperatorInfoList();
+  List<LogicalOperatorInfo> getLogicalOperatorInfoList();
 
-  public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
+  Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
 
-  public long windowIdToMillis(long windowId);
+  long windowIdToMillis(long windowId);
 }
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index d7f96d4..0c997ec 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -78,7 +78,7 @@
   private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
   {
     @OutputPortFieldAnnotation( optional = true)
-    public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
     private transient int windowCount;
 
     private int checkpointState;
@@ -326,7 +326,7 @@
 
   public List<Checkpoint> getCheckpoints(Long... windowIds)
   {
-    List<Checkpoint> list = new ArrayList<Checkpoint>(windowIds.length);
+    List<Checkpoint> list = new ArrayList<>(windowIds.length);
     for (Long windowId : windowIds) {
       list.add(new Checkpoint(windowId, 0, 0));
     }
diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
index b1509e0..b1f3363 100644
--- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
@@ -41,7 +41,7 @@
   public void testGenericOperatorPropertyCodec()
   {
     LogicalPlan dag = new LogicalPlan();
-    Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+    Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>();
     codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class);
     dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs);
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index ecbeeb6..f0199f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -86,7 +86,7 @@
     /*
      * Received tuples are stored in a map keyed with the system assigned operator id.
      */
-    public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<String, List<Object>>();
+    public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
     private transient int operatorId;
     public String prefix = "";
 
@@ -107,7 +107,7 @@
         synchronized (receivedTuples) {
           List<Object> l = receivedTuples.get(id);
           if (l == null) {
-            l = Collections.synchronizedList(new ArrayList<Object>());
+            l = Collections.synchronizedList(new ArrayList<>());
             //LOG.debug("adding {} {}", id, l);
             receivedTuples.put(id, l);
           }
@@ -121,12 +121,12 @@
 
     };
     @OutputPortFieldAnnotation( optional = true)
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
   }
 
   public static class TestInputOperator<T> extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
     transient boolean first;
     transient long windowId;
     boolean blockEndStream = false;
@@ -178,9 +178,9 @@
     CollectorOperator.receivedTuples.clear();
 
     TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
-    input.testTuples = new ArrayList<List<Integer>>();
+    input.testTuples = new ArrayList<>();
     for (Integer[] tuples: testData) {
-      input.testTuples.add(new ArrayList<Integer>(Arrays.asList(tuples)));
+      input.testTuples.add(new ArrayList<>(Arrays.asList(tuples)));
     }
     CollectorOperator collector = dag.addOperator("collector", new CollectorOperator());
     collector.prefix = "" + System.identityHashCode(collector);
@@ -234,7 +234,7 @@
     {
       Map<Integer, Integer> m = loadIndicators.get();
       if (m == null) {
-        loadIndicators.set(m = new ConcurrentHashMap<Integer, Integer>());
+        loadIndicators.set(m = new ConcurrentHashMap<>());
       }
       m.put(oper.getId(), load);
     }
@@ -341,7 +341,7 @@
     Assert.assertNotNull("" + nodeMap, inputDeployed);
 
     // add tuple that matches the partition key and check that each partition receives it
-    ArrayList<Integer> inputTuples = new ArrayList<Integer>();
+    ArrayList<Integer> inputTuples = new ArrayList<>();
     LOG.debug("Number of partitions {}", partitions.size());
     for (PTOperator p: partitions) {
       // default partitioning has one port mapping with a single partition key
@@ -391,7 +391,7 @@
     @Override
     public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context)
     {
-      List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<Partition<PartitionableInputOperator>>(3);
+      List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3);
       Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator();
       Partition<PartitionableInputOperator> templatePartition;
       for (int i = 0; i < 3; i++) {
@@ -401,7 +401,7 @@
           op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty;
         }
         op.partitionProperty += "_" + i;
-        newPartitions.add(new DefaultPartition<PartitionableInputOperator>(op));
+        newPartitions.add(new DefaultPartition<>(op));
       }
       return newPartitions;
     }
@@ -431,7 +431,7 @@
       lc.runAsync();
 
       List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
-      Set<String> partProperties = new HashSet<String>();
+      Set<String> partProperties = new HashSet<>();
       for (PTOperator p : partitions) {
         LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
         Map<Integer, Node<?>> nodeMap = c.getNodes();
@@ -460,7 +460,7 @@
       PartitionLoadWatch.remove(partitions.get(0));
 
       partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
-      partProperties = new HashSet<String>();
+      partProperties = new HashSet<>();
       for (PTOperator p: partitions) {
         LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
         Map<Integer, Node<?>> nodeMap = c.getNodes();
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index 3ec9882..ca85f5d 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -200,31 +200,25 @@
     Properties dagProps = new Properties();
 
     // input module (ensure shutdown works while windows are generated)
-    dagProps.put(StreamingApplication.DT_PREFIX + "operator.numGen.classname", TestGeneratorInputOperator.class.getName());
-    dagProps.put(StreamingApplication.DT_PREFIX + "operator.numGen.maxTuples", "1");
+    dagProps.put(StreamingApplication.APEX_PREFIX + "operator.numGen.classname", TestGeneratorInputOperator.class.getName());
+    dagProps.put(StreamingApplication.APEX_PREFIX + "operator.numGen.maxTuples", "1");
 
-    // fake output adapter - to be ignored when determine shutdown
-    //props.put(DAGContext.DT_PREFIX + "stream.output.classname", HDFSOutputStream.class.getName());
-    //props.put(DAGContext.DT_PREFIX + "stream.output.inputNode", "module2");
-    //props.put(DAGContext.DT_PREFIX + "stream.output.filepath", "miniclustertest-testSetupShutdown.out");
+    dagProps.put(StreamingApplication.APEX_PREFIX + "operator.module1.classname", GenericTestOperator.class.getName());
 
-    dagProps.put(StreamingApplication.DT_PREFIX + "operator.module1.classname", GenericTestOperator.class.getName());
+    dagProps.put(StreamingApplication.APEX_PREFIX + "operator.module2.classname", GenericTestOperator.class.getName());
 
-    dagProps.put(StreamingApplication.DT_PREFIX + "operator.module2.classname", GenericTestOperator.class.getName());
+    dagProps.put(StreamingApplication.APEX_PREFIX + "stream.fromNumGen.source", "numGen.outport");
+    dagProps.put(StreamingApplication.APEX_PREFIX + "stream.fromNumGen.sinks", "module1.inport1");
 
-    dagProps.put(StreamingApplication.DT_PREFIX + "stream.fromNumGen.source", "numGen.outport");
-    dagProps.put(StreamingApplication.DT_PREFIX + "stream.fromNumGen.sinks", "module1.inport1");
+    dagProps.put(StreamingApplication.APEX_PREFIX + "stream.n1n2.source", "module1.outport1");
+    dagProps.put(StreamingApplication.APEX_PREFIX + "stream.n1n2.sinks", "module2.inport1");
 
-    dagProps.put(StreamingApplication.DT_PREFIX + "stream.n1n2.source", "module1.outport1");
-    dagProps.put(StreamingApplication.DT_PREFIX + "stream.n1n2.sinks", "module2.inport1");
-
-    dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.MASTER_MEMORY_MB.getName(), "128");
-    dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
-    dagProps.setProperty(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.MEMORY_MB.getName(), "64");
-    dagProps.setProperty(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.VCORES.getName(), "1");
-    dagProps.setProperty(StreamingApplication.DT_PREFIX + "operator.*.port.*." + Context.PortContext.BUFFER_MEMORY_MB.getName(), "32");
-    dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.DEBUG.getName(), "true");
-    //dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.CONTAINERS_MAX_COUNT.getName(), "2");
+    dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.MASTER_MEMORY_MB.getName(), "128");
+    dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
+    dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.MEMORY_MB.getName(), "64");
+    dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.VCORES.getName(), "1");
+    dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*.port.*." + Context.PortContext.BUFFER_MEMORY_MB.getName(), "32");
+    dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.DEBUG.getName(), "true");
     LOG.info("dag properties: {}", dagProps);
 
     LOG.info("Initializing Client");
@@ -272,9 +266,9 @@
 
     // single container topology of inline input and module
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "stream.input.classname", TestGeneratorInputOperator.class.getName());
-    props.put(StreamingApplication.DT_PREFIX + "stream.input.outputNode", "module1");
-    props.put(StreamingApplication.DT_PREFIX + "module.module1.classname", GenericTestOperator.class.getName());
+    props.put(StreamingApplication.APEX_PREFIX + "stream.input.classname", TestGeneratorInputOperator.class.getName());
+    props.put(StreamingApplication.APEX_PREFIX + "stream.input.outputNode", "module1");
+    props.put(StreamingApplication.APEX_PREFIX + "module.module1.classname", GenericTestOperator.class.getName());
 
     LOG.info("Initializing Client");
     LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 645598d..2f46049 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -44,6 +44,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
@@ -428,6 +429,7 @@
     o1p1.getContainer().setExternalId("cid1");
     scm.writeJournal(o1p1.getContainer().getSetContainerState());
 
+    /* simulate application restart from app1 */
     dag = new LogicalPlan();
     dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath2);
     dag.setAttribute(LogicalPlan.APPLICATION_ID, appId2);
@@ -447,9 +449,50 @@
     o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0);
     assertEquals("journal copied", "cid1", o1p1.getContainer().getExternalId());
 
-    ids = new FSStorageAgent(appPath2 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId());
+    CascadeStorageAgent csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+    Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class);
+    Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(), agent.getClass());
+    Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(), agent.getClass());
+    /* parent and current points to expected location */
+    Assert.assertEquals(true, ((FSStorageAgent)csa.getParentStorageAgent()).path.contains("app1"));
+    Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app2"));
+
+    ids = csa.getWindowIds(o1p1.getId());
     Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids);
 
+
+    /* simulate another application restart from app2 */
+    String appId3 = "app3";
+    String appPath3 = testMeta.getPath() + "/" + appId3;
+    dag = new LogicalPlan();
+    dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath3);
+    dag.setAttribute(LogicalPlan.APPLICATION_ID, appId3);
+    sc = new StramClient(new Configuration(), dag);
+    try {
+      sc.start();
+      sc.copyInitialState(new Path(appPath2)); // copy state from app2.
+    } finally {
+      sc.stop();
+    }
+    scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false);
+    plan = scm.getPhysicalPlan();
+    dag = plan.getLogicalPlan();
+
+    csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+    Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class);
+    Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(), agent.getClass());
+    Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(), CascadeStorageAgent.class);
+
+    CascadeStorageAgent parent = (CascadeStorageAgent)csa.getParentStorageAgent();
+    Assert.assertEquals("current storage agent is of same type ", parent.getCurrentStorageAgent().getClass(), agent.getClass());
+    Assert.assertEquals("parent storage agent is cascade ", parent.getParentStorageAgent().getClass(), agent.getClass());
+    /* verify paths */
+    Assert.assertEquals(true, ((FSStorageAgent)parent.getParentStorageAgent()).path.contains("app1"));
+    Assert.assertEquals(true, ((FSStorageAgent)parent.getCurrentStorageAgent()).path.contains("app2"));
+    Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app3"));
+
+    ids = csa.getWindowIds(o1p1.getId());
+    Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids);
   }
 
   @Test
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 35bb363..cee8247 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1007,7 +1007,7 @@
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n2meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
         /*
@@ -1036,7 +1036,7 @@
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n3meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
       }
@@ -1063,7 +1063,7 @@
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n2meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
         /*
@@ -1144,7 +1144,7 @@
 
   private Set<PTOperator> getUnifiers(PhysicalPlan plan)
   {
-    Set<PTOperator> unifiers = new HashSet<PTOperator>();
+    Set<PTOperator> unifiers = new HashSet<>();
     for (PTContainer container : plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
         if (operator.isUnifier()) {
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 84622c4..cb2d760 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -124,14 +124,14 @@
     input.portName = "inputPortNameOnNode";
     input.sourceNodeId = 99;
 
-    ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>();
+    ndi.inputs = new ArrayList<>();
     ndi.inputs.add(input);
 
     OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo();
     output.declaredStreamId = "streamFromNode";
     output.portName = "outputPortNameOnNode";
 
-    ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>();
+    ndi.outputs = new ArrayList<>();
     ndi.outputs.add(output);
 
     ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse();
@@ -875,7 +875,6 @@
   private void testAppDataSources(boolean appendQIDToTopic) throws Exception
   {
     StramLocalCluster lc = new StramLocalCluster(dag);
-    lc.runAsync();
     StreamingContainerManager dnmgr = lc.dnmgr;
     List<AppDataSource> appDataSources = dnmgr.getAppDataSources();
     Assert.assertEquals("There should be exactly one data source", 1, appDataSources.size());
@@ -890,7 +889,6 @@
     Assert.assertEquals("Result topic verification", "xyz.result", result.topic);
     Assert.assertEquals("Result URL verification", "ws://123.123.123.124:9090/pubsub", result.url);
     Assert.assertEquals("Result QID append verification", appendQIDToTopic, result.appendQIDToTopic);
-    lc.shutdown();
   }
 
   @Test
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
index f6b7277..59f9dcc 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
@@ -34,7 +34,7 @@
 {
   ApexCli cli;
 
-  static Map<String, String> env = new HashMap<String, String>();
+  static Map<String, String> env = new HashMap<>();
   static String userHome;
 
   @Before
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
index 2ac1c50..f1356df 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
@@ -59,7 +59,7 @@
   static TemporaryFolder testFolder = new TemporaryFolder();
   ApexCli cli;
 
-  static Map<String, String> env = new HashMap<String, String>();
+  static Map<String, String> env = new HashMap<>();
   static String userHome;
 
   @BeforeClass
diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
index d1a18ae..26aced8 100644
--- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
@@ -112,8 +112,8 @@
   @Test
   public void testString()
   {
-    StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>();
-    StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>();
+    StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<>();
+    StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<>();
 
     String hello = "hello";
 
@@ -182,7 +182,7 @@
   public void testFinalFieldSerialization() throws Exception
   {
     TestTuple t1 = new TestTuple(5);
-    DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+    DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
     DataStatePair dsp = c.toDataStatePair(t1);
     TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp);
     Assert.assertEquals("", t1.finalField, t2.finalField);
@@ -208,7 +208,7 @@
     Object inner = outer.new InnerClass();
 
     for (Object o: new Object[] {outer, inner}) {
-      DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+      DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
       DataStatePair dsp = c.toDataStatePair(o);
       c.fromDataStatePair(dsp);
 
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index cc777f7..64298de 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -77,7 +77,7 @@
   @Override
   public void testNonLinearOperatorRecovery() throws InterruptedException
   {
-    final HashSet<Object> collection = new HashSet<Object>();
+    final HashSet<Object> collection = new HashSet<>();
 
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index da5c7b7..66f1b84 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -197,7 +197,7 @@
 
     };
     @OutputPortFieldAnnotation( optional = true)
-    DefaultOutputPort<Object> op = new DefaultOutputPort<Object>();
+    DefaultOutputPort<Object> op = new DefaultOutputPort<>();
 
     @Override
     public void beginWindow(long windowId)
@@ -226,7 +226,7 @@
 
   public static class CheckpointDistanceOperator extends GenericOperator
   {
-    List<Integer> distances = new ArrayList<Integer>();
+    List<Integer> distances = new ArrayList<>();
     int numWindows = 0;
     int maxWindows = 0;
 
@@ -245,7 +245,7 @@
   public void testSynchingLogic() throws InterruptedException
   {
     long sleeptime = 25L;
-    final ArrayList<Object> list = new ArrayList<Object>();
+    final ArrayList<Object> list = new ArrayList<>();
     GenericOperator go = new GenericOperator();
     final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
         new DefaultAttributeMap(), null));
@@ -376,8 +376,8 @@
     final Server bufferServer = new Server(eventloop, 0); // find random port
     final int bufferServerPort = bufferServer.run().getPort();
 
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
-    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
+    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10);
 
     GenericTestOperator go = new GenericTestOperator();
     final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
@@ -905,7 +905,7 @@
     CheckpointDistanceOperator go = new CheckpointDistanceOperator();
     go.maxWindows = maxWindows;
 
-    List<Integer> checkpoints = new ArrayList<Integer>();
+    List<Integer> checkpoints = new ArrayList<>();
 
     int window = 0;
     while (window < maxWindows) {
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 84217eb..bb2e72f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -51,10 +51,10 @@
 
   public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener<OperatorContext>
   {
-    public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<Integer>();
-    public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<Integer>();
-    private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<Integer>(1024);
-    private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<Integer>(1024);
+    public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>();
+    private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024);
+    private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024);
     private volatile Thread dataGeneratorThread;
 
     @Override
@@ -179,7 +179,7 @@
     public void setConnected(boolean flag)
     {
       if (flag) {
-        collections.put(id, list = new ArrayList<T>());
+        collections.put(id, list = new ArrayList<>());
       }
     }
   }
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 55b5eab..f669832 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -171,7 +171,7 @@
 
     }
 
-    static final ArrayList<Call> calls = new ArrayList<Call>();
+    static final ArrayList<Call> calls = new ArrayList<>();
 
     @Override
     public void save(Object object, int operatorId, long windowId) throws IOException
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 3e208c8..952a36b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -152,8 +152,8 @@
 
   public static class CollectorOperator extends BaseOperator implements com.datatorrent.api.Operator.CheckpointListener
   {
-    public static HashSet<Long> collection = new HashSet<Long>(20);
-    public static ArrayList<Long> duplicates = new ArrayList<Long>();
+    public static HashSet<Long> collection = new HashSet<>(20);
+    public static ArrayList<Long> duplicates = new ArrayList<>();
     private boolean simulateFailure;
     private long checkPointWindowId;
     public final transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
@@ -211,7 +211,7 @@
   {
     public final transient MyInputPort input1 = new MyInputPort(100);
     public final transient MyInputPort input2 = new MyInputPort(200);
-    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     public class MyInputPort extends DefaultInputPort<Integer>
     {
@@ -255,7 +255,7 @@
   @SuppressWarnings("SleepWhileInLoop")
   public void testNonLinearOperatorRecovery() throws InterruptedException
   {
-    final HashSet<Object> collection = new HashSet<Object>();
+    final HashSet<Object> collection = new HashSet<>();
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
     map.put(OperatorContext.PROCESSING_MODE, processingMode);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 8ba57be..d8d97e0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -47,7 +47,7 @@
       emit = true;
     }
 
-    public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 4625368..47f1ea0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -83,7 +83,7 @@
     public static class TestInputStatsListener implements StatsListener, Serializable
     {
       private static final long serialVersionUID = 1L;
-      private List<OperatorStats> inputOperatorStats = new ArrayList<OperatorStats>();
+      private List<OperatorStats> inputOperatorStats = new ArrayList<>();
 
       @Override
       public Response processStats(BatchedOperatorStats stats)
@@ -101,7 +101,7 @@
   public static class TestCollector extends GenericTestOperator implements StatsListener
   {
     transient long windowId;
-    List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>();
+    List<OperatorStats> collectorOperatorStats = new ArrayList<>();
 
     @Override
     public Response processStats(BatchedOperatorStats stats)
@@ -129,7 +129,7 @@
     public static class TestCollectorStatsListener implements StatsListener, Serializable
     {
       private static final long serialVersionUID = 1L;
-      List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>();
+      List<OperatorStats> collectorOperatorStats = new ArrayList<>();
 
       @Override
       public Response processStats(BatchedOperatorStats stats)
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 451972e..36f9d63 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -93,7 +93,7 @@
   private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator
   {
     private transient String name;
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 
     @InputPortFieldAnnotation(optional = true)
     public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
index e6b5cd5..9e6c788 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
@@ -41,9 +41,9 @@
   private int emitInterval = 1000;
   private final int spinMillis = 50;
   private String myStringProperty;
-  private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<String>();
+  private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<>();
   @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+  public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
 
   public int getMaxTuples()
   {
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 0c95b75..85125c7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -256,7 +256,7 @@
 
   static class RandomNumberGenerator implements InputOperator
   {
-    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
index 0730289..7dc0686 100644
--- a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
@@ -220,7 +220,7 @@
     public transient String transientProperty = "transientProperty";
 
     public java.util.concurrent.ConcurrentHashMap<String, String> mapProperty = new java.util.concurrent
-        .ConcurrentHashMap<String, String>();
+        .ConcurrentHashMap<>();
 
     public java.util.concurrent.ConcurrentHashMap<String, String> getMapProperty()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index d40fd7b..9d66ca3 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -570,12 +570,12 @@
       }
     };
 
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
 
     @Override
     public Collection definePartitions(Collection partitions, PartitioningContext context)
     {
-      Collection<Partition> newPartitions = new ArrayList<Partition>();
+      Collection<Partition> newPartitions = new ArrayList<>();
 
       // Mostly for 1 partition we dont need to do this
       int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1));
@@ -590,7 +590,7 @@
         // No partitioning done so far..
         // Single partition again, but with only even numbers ok?
         PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec();
-        Partition partition = new DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+        Partition partition = new DefaultPartition<>(newInstance);
 
         // Consider partitions are 1 & 2 and we are sending only 1 partition
         // Partition 1 = even numbers
@@ -796,7 +796,7 @@
       }
     };
 
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
 
     @Override
     public Collection definePartitions(Collection partitions, PartitioningContext context)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
index 14f2b1b..705904e 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
@@ -45,7 +45,7 @@
 
 public class TestPlanContext implements PlanContext, StorageAgent
 {
-  public List<Runnable> events = new ArrayList<Runnable>();
+  public List<Runnable> events = new ArrayList<>();
   public Collection<PTOperator> undeploy;
   public Collection<PTOperator> deploy;
   public Set<PTContainer> releaseContainers;
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index 18dfd99..d3240c5 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -113,7 +113,7 @@
   }
 
   /**
-   * Test read from dt-site.xml in Hadoop configuration format.
+   * Test read from configuration file in Hadoop configuration format.
    */
   @Test
   public void testLoadFromConfigXml()
@@ -217,7 +217,7 @@
 
     StreamMeta s1 = dag.getStream("n1n2");
     assertNotNull(s1);
-    assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
+    assertTrue("n1n2 locality", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
 
     OperatorMeta operator3 = dag.getOperatorMeta("operator3");
     assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass());
@@ -245,6 +245,46 @@
   }
 
   @Test
+  public void testLoadFromPropertiesFileWithLegacyPrefix() throws IOException
+  {
+    Properties props = new Properties();
+    String resourcePath = "/testTopologyLegacyPrefix.properties";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      fail("Could not load " + resourcePath);
+    }
+    props.load(is);
+    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null);
+
+    LogicalPlan dag = new LogicalPlan();
+    pb.populateDAG(dag);
+    dag.validate();
+
+    assertEquals("number of operators", 2, dag.getAllOperators().size());
+    assertEquals("number of root operators", 1, dag.getRootOperators().size());
+
+    StreamMeta s1 = dag.getStream("s1");
+    assertNotNull(s1);
+    assertTrue("s1 locality", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
+
+    OperatorMeta o2m = dag.getOperatorMeta("o2");
+    assertEquals(GenericTestOperator.class, o2m.getOperator().getClass());
+    GenericTestOperator o2 = (GenericTestOperator)o2m.getOperator();
+    assertEquals("myStringProperty " + o2, "myStringPropertyValue", o2.getMyStringProperty());
+  }
+
+  @Test
+  public void testDeprecation()
+  {
+    String value = "bar";
+    String oldKey = StreamingApplication.DT_PREFIX + Context.DAGContext.APPLICATION_NAME.getName();
+    String newKey = LogicalPlanConfiguration.KEY_APPLICATION_NAME;
+    Configuration config = new Configuration(false);
+    config.set(oldKey, value);
+    Assert.assertEquals(value, config.get(newKey));
+  }
+
+  @Test
   public void testLoadFromJson() throws Exception
   {
     String resourcePath = "/testTopology.json";
@@ -258,7 +298,7 @@
     JSONObject json = new JSONObject(writer.toString());
 
     Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
 
     LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
     LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
@@ -319,11 +359,11 @@
     String appName = "app1";
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123");
-    props.put(StreamingApplication.DT_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
-    props.put(StreamingApplication.DT_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000");
+    props.put(StreamingApplication.APEX_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123");
+    props.put(StreamingApplication.APEX_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
+    props.put(StreamingApplication.APEX_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000");
 
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
@@ -347,10 +387,10 @@
   {
     String appName = "app1";
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".testprop1", "10");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".prop.testprop2", "100");
-    props.put(StreamingApplication.DT_PREFIX + "application.*.prop.testprop3", "1000");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".inncls.a", "10000");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".testprop1", "10");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".prop.testprop2", "100");
+    props.put(StreamingApplication.APEX_PREFIX + "application.*.prop.testprop3", "1000");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".inncls.a", "10000");
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
 
@@ -396,20 +436,20 @@
     Properties props = new Properties();
 
     // match operator by name
-    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*");
-    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1");
-    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1");
+    props.put(StreamingApplication.APEX_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*");
+    props.put(StreamingApplication.APEX_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1");
+    props.put(StreamingApplication.APEX_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1");
 
     // match class name, lower priority
-    props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName());
-    props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1");
+    props.put(StreamingApplication.APEX_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName());
+    props.put(StreamingApplication.APEX_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1");
 
     // match class name
-    props.put(StreamingApplication.DT_PREFIX + "template.t2.matchClassNameRegExp", ".*" + GenericTestOperator.class.getSimpleName());
-    props.put(StreamingApplication.DT_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue");
+    props.put(StreamingApplication.APEX_PREFIX + "template.t2.matchClassNameRegExp", ".*" + GenericTestOperator.class.getSimpleName());
+    props.put(StreamingApplication.APEX_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue");
 
     // direct setting
-    props.put(StreamingApplication.DT_PREFIX + "operator.operator3.emitFormat", "emitFormatValue");
+    props.put(StreamingApplication.APEX_PREFIX + "operator.operator3.emitFormat", "emitFormatValue");
 
     LogicalPlan dag = new LogicalPlan();
     Operator operator1 = dag.addOperator("operator1", new ValidationTestOperator());
@@ -441,11 +481,11 @@
   {
 
     Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
 
     LogicalPlan dag = new LogicalPlan();
     GenericTestOperator o1 = dag.addOperator("o1", new GenericTestOperator());
@@ -484,7 +524,7 @@
     LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
 
     Properties properties = new Properties();
-    properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
+    properties.put(StreamingApplication.APEX_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
 
     builder.addFromProperties(properties, null);
 
@@ -506,7 +546,7 @@
     LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
 
     Properties properties = new Properties();
-    properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
+    properties.put(StreamingApplication.APEX_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
 
     builder.addFromProperties(properties, null);
 
@@ -549,10 +589,10 @@
     };
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
-    props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
+    props.put(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName());
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20");
 
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
@@ -586,9 +626,9 @@
     };
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512");
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
 
@@ -625,7 +665,7 @@
           output.emit(tuple);
         }
       };
-      public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+      public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
     }
 
     class DummyOutputOperator extends BaseOperator
@@ -644,8 +684,8 @@
 
     class TestUnifierAttributeModule implements Module
     {
-      public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
-      public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+      public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>();
+      public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();
 
       @Override
       public void populateDAG(DAG dag, Configuration conf)
@@ -723,10 +763,10 @@
     };
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "pv1");
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.APEX_PREFIX + "operator.*.myStringProperty", "pv1");
+    props.put(StreamingApplication.APEX_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
 
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
@@ -758,10 +798,10 @@
     };
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz");
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.APEX_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz");
+    props.put(StreamingApplication.APEX_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
 
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
 
@@ -788,12 +828,12 @@
     SimpleTestApplication app = new SimpleTestApplication();
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
+    props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
 
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
@@ -831,7 +871,7 @@
     // attribute that cannot be configured
 
     Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "attr.NOT_CONFIGURABLE", "value");
+    props.put(StreamingApplication.APEX_PREFIX + "attr.NOT_CONFIGURABLE", "value");
 
     LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
     dagBuilder.addFromProperties(props, null);
@@ -849,7 +889,7 @@
 
     // invalid attribute name
     props = new Properties();
-    String invalidAttribute = StreamingApplication.DT_PREFIX + "attr.INVALID_NAME";
+    String invalidAttribute = StreamingApplication.APEX_PREFIX + "attr.INVALID_NAME";
     props.put(invalidAttribute, "value");
 
     try {
@@ -930,7 +970,7 @@
   public void testTestTupleClassAttrSetFromConfig()
   {
     Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS",
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS",
         "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestSchema");
 
     StreamingApplication streamingApplication = new StreamingApplication()
@@ -986,7 +1026,7 @@
     }
 
     Properties props = new Properties();
-    String propName = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName;
+    String propName = StreamingApplication.APEX_PREFIX + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName;
     props.put(propName, "5");
 
     SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
@@ -1015,7 +1055,7 @@
   public void testRootLevelAmbiguousAttributeSimple()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX, null, Boolean.TRUE, true, true);
+        StreamingApplication.APEX_PREFIX, null, Boolean.TRUE, true, true);
   }
 
   /**
@@ -1025,7 +1065,7 @@
   public void testApplicationLevelAmbiguousAttributeSimple()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, true);
+        StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, true);
   }
 
   /**
@@ -1035,7 +1075,7 @@
   public void testOperatorLevelAmbiguousAttributeSimple()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, false);
+        StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, false);
   }
 
   /**
@@ -1045,7 +1085,7 @@
   public void testPortLevelAmbiguousAttributeSimple()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, false, true);
+        StreamingApplication.APEX_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, false, true);
   }
 
   /**
@@ -1054,7 +1094,7 @@
   @Test
   public void testRootLevelAmbiguousAttributeComplex()
   {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, StreamingApplication.DT_PREFIX,
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, StreamingApplication.APEX_PREFIX,
         PortContext.class.getCanonicalName(), Boolean.TRUE, false, true);
   }
 
@@ -1065,7 +1105,7 @@
   public void testApplicationLevelAmbiguousAttributeComplex()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
+        StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
         Boolean.TRUE, false, true);
   }
 
@@ -1076,7 +1116,7 @@
   public void testOperatorLevelAmbiguousAttributeComplex()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, OperatorContext.class.getCanonicalName(),
+        StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, OperatorContext.class.getCanonicalName(),
         Boolean.TRUE, true, false);
   }
 
@@ -1087,7 +1127,7 @@
   public void testOperatorLevelAmbiguousAttributeComplex2()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
+        StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
         Boolean.TRUE, false, true);
   }
 
@@ -1098,7 +1138,7 @@
   public void testPortLevelAmbiguousAttributeComplex()
   {
     testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
-        StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
+        StreamingApplication.APEX_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
         Boolean.TRUE, false, true);
   }
 
@@ -1116,7 +1156,7 @@
   @Test
   public void testRootLevelAttributeSimpleNameOperator()
   {
-    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, true);
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.APEX_PREFIX, true, 4096, true, true);
   }
 
   @Test
@@ -1124,19 +1164,19 @@
   {
     MockStorageAgent mockAgent = new MockStorageAgent();
 
-    simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, StreamingApplication.DT_PREFIX, true, mockAgent, true, false);
+    simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, StreamingApplication.APEX_PREFIX, true, mockAgent, true, false);
   }
 
   @Test
   public void testRootLevelAttributeSimpleNameOperatorNoScope()
   {
-    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, false);
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.APEX_PREFIX, true, 4096, true, false);
   }
 
   @Test
   public void testApplicationLevelAttributeSimpleNameOperator()
   {
-    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
         true, 4096, true, true);
   }
 
@@ -1190,137 +1230,137 @@
   @Test
   public void testRootLevelAttributeSimpleNamePort()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX,
-        true, (Integer)4096, true, true);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX,
+        true, 4096, true, true);
   }
 
   @Test
   public void testRootLevelAttributeSimpleNamePortNoScope()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX,
-        true, (Integer)4096, true, false);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX,
+        true, 4096, true, false);
   }
 
   @Test
   public void testOperatorLevelAttributeSimpleNamePort()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR,
-        true, (Integer)4096, true, true);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR,
+        true, 4096, true, true);
   }
 
   @Test
   public void testApplicationLevelAttributeSimpleNamePort()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
-        true, (Integer)4096, true, true);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+        true, 4096, true, true);
   }
 
   @Test
   public void testRootLevelAttributeComplexNamePort()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false,
-        (Integer)4096, true, true);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false,
+        4096, true, true);
   }
 
   @Test
   public void testRootLevelAttributeComplexNamePortNoScope()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false,
-        (Integer)4096, true, false);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false,
+        4096, true, false);
   }
 
   @Test
   public void testOperatorLevelAttributeComplexNamePort()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR,
-        false, (Integer)4096, true, true);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR,
+        false, 4096, true, true);
   }
 
   @Test
   public void testApplicationLevelAttributeComplexNamePort()
   {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
-        false, (Integer)4096, true, true);
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+        false, 4096, true, true);
   }
 
   /* Input port tests */
   @Test
   public void testRootLevelAttributeSimpleNameInputPort()
   {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true,
-        (Integer)4096, true);
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, true,
+        4096, true);
   }
 
   @Test
   public void testOperatorLevelAttributeSimpleNameInputPort()
   {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true);
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true);
   }
 
   @Test
   public void testApplicationLevelAttributeSimpleNameInputPort()
   {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
-        true, (Integer)4096, true);
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+        true, 4096, true);
   }
 
   @Test
   public void testRootLevelAttributeComplexNameInputPort()
   {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true);
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false, 4096, true);
   }
 
   @Test
   public void testOperatorLevelAttributeComplexNameInputPort()
   {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false,
-        (Integer)4096, true);
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false,
+        4096, true);
   }
 
   @Test
   public void testApplicationLevelAttributeComplexNameInputPort()
   {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
-        false, (Integer)4096, true);
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+        false, 4096, true);
   }
 
   /* Output port tests */
   @Test
   public void testRootLevelAttributeSimpleNameOutputPort()
   {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true, (Integer)4096, true);
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, true, 4096, true);
   }
 
   @Test
   public void testOperatorLevelAttributeSimpleNameOutputPort()
   {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true);
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true);
   }
 
   @Test
   public void testApplicationLevelAttributeSimpleNameOutputPort()
   {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
-        "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true);
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
+        "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true);
   }
 
   @Test
   public void testRootLevelAttributeComplexNameOutputPort()
   {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true);
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false, 4096, true);
   }
 
   @Test
   public void testOperatorLevelAttributeComplexNameOutputPort()
   {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true);
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, 4096, true);
   }
 
   @Test
   public void testApplicationLevelAttributeComplexNameOutputPort()
   {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
-        "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true);
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
+        "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, false, 4096, true);
   }
 
   /* Helpers for building ports */
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index dd32cc7..1507e2d 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -173,9 +173,9 @@
 
   public static class ValidationOperator extends BaseOperator
   {
-    public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<>();
   }
 
   public static class CounterOperator extends BaseOperator
@@ -641,7 +641,7 @@
 
   private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -653,9 +653,9 @@
   private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
   {
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -773,7 +773,7 @@
   public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
   {
     LogicalPlan dag = new LogicalPlan();
-    Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String());
+    Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String());
     Field nameField = Attribute.class.getDeclaredField("name");
     nameField.setAccessible(true);
     nameField.set(attr, "Test_Attribute");
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
index 1966678..64aaa44 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -50,7 +50,7 @@
   {
 
     Random r = new Random();
-    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -73,7 +73,7 @@
         output.emit(tuple);
       }
     };
-    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
   }
 
   /*
@@ -92,7 +92,7 @@
         output.emit(tuple);
       }
     };
-    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
   }
 
   /*
@@ -118,8 +118,8 @@
   static class TestModule implements Module
   {
 
-    public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
-    public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+    public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>();
+    public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();
 
     @Override
     public void populateDAG(DAG dag, Configuration conf)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 5b5583a..6747fd7 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -59,7 +59,7 @@
     private int inputOperatorProp = 0;
 
     Random r = new Random();
-    public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -709,7 +709,7 @@
     JSONObject json = new JSONObject(writer.toString());
 
     Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
 
     LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
     LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
index 7951e26..a556235 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
@@ -33,11 +33,11 @@
   public void testModuleProperties()
   {
     Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
+    conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
 
     LogicalPlan dag = new LogicalPlan();
     TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule());
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
index 8fad613..956db88 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
@@ -49,10 +49,10 @@
     public volatile Object inport1Tuple = null;
 
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
 
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
 
     private String emitFormat;
 
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 61a85a5..941f5a4 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -476,7 +476,7 @@
 
     OperatorMeta o1Meta = dag.getMeta(o1);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
-    TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<TestInputOperator<Object>>();
+    TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<>();
     dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner);
 
     TestPlanContext ctx = new TestPlanContext();
@@ -497,7 +497,7 @@
     plan.onStatusUpdate(o1p1);
     Assert.assertEquals("scale up triggered", 1, ctx.events.size());
     // add another partition, keep existing as is
-    partitioner.extraPartitions.add(new DefaultPartition<TestInputOperator<Object>>(o1));
+    partitioner.extraPartitions.add(new DefaultPartition<>(o1));
     Runnable r = ctx.events.remove(0);
     r.run();
     partitioner.extraPartitions.clear();
@@ -745,7 +745,7 @@
       partitions.add(new DefaultPartition<Operator>(operator, p1Keys, 1, null));
     }
 
-    ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<Partition<Operator>>();
+    ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<>();
     for (Partition<Operator> p : partitions) {
       lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), p.getPartitionKeys(), -1, null));
     }
@@ -793,7 +793,7 @@
     for (Set<PartitionKeys> expectedKeys: expectedKeysSets) {
       List<Partition<Operator>> clonePartitions = Lists.newArrayList();
       for (PartitionKeys pks: twoBitPartitionKeys) {
-        Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<InputPort<?>, PartitionKeys>();
+        Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>();
         p1Keys.put(operator.inport1, pks);
         int load = expectedKeys.contains(pks) ? 0 : -1;
         clonePartitions.add(new DefaultPartition<Operator>(operator, p1Keys, load, null));
@@ -876,7 +876,7 @@
 
     Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size());
     Set<OperatorMeta> c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), dag.getMeta(o3));
-    Set<OperatorMeta> c2ActNodes = new HashSet<OperatorMeta>();
+    Set<OperatorMeta> c2ActNodes = new HashSet<>();
     PTContainer c2 = plan.getContainers().get(1);
     for (PTOperator pNode : c2.getOperators()) {
       c2ActNodes.add(pNode.getOperatorMeta());
@@ -1139,7 +1139,7 @@
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
-    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<TestGeneratorInputOperator>();
+    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<>();
     o1Partitioner.setPartitionCount(2);
     dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
@@ -1312,7 +1312,7 @@
       }
 
       Assert.assertEquals("repartition event", 1, ctx.events.size());
-      o1Partitioner.extraPartitions.add(new DefaultPartition<TestGeneratorInputOperator>(o1));
+      o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1));
       ctx.events.remove(0).run();
       o1Partitioner.extraPartitions.clear();
 
@@ -1607,7 +1607,7 @@
       }
       T paritionable = first.getPartitionedInstance();
       for (int i = partitions.size(); i < numTotal; ++i) {
-        newPartitions.add(new DefaultPartition<T>(paritionable));
+        newPartitions.add(new DefaultPartition<>(paritionable));
       }
       return newPartitions;
     }
@@ -1767,7 +1767,7 @@
 
     List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
     Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 cascadingUnifiers and one-downstream partition unifier
-    List<PTOperator> finalUnifiers = new ArrayList<PTOperator>();
+    List<PTOperator> finalUnifiers = new ArrayList<>();
     for (PTOperator o : o1Unifiers) {
       Assert.assertEquals("inputs " + o, 2, o.getInputs().size());
       Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
@@ -2054,7 +2054,7 @@
       if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) {
         // parallel partitioned, fill to requested count
         for (int i = newPartitions.size(); i < context.getParallelPartitionCount(); i++) {
-          newPartitions.add(new DefaultPartition<T>(partitions.iterator().next().getPartitionedInstance()));
+          newPartitions.add(new DefaultPartition<>(partitions.iterator().next().getPartitionedInstance()));
         }
       }
       return newPartitions;
@@ -2215,9 +2215,9 @@
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
     dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
     dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
     dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024);
-    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
     dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
     dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
@@ -2237,7 +2237,7 @@
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
     dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
     dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true);
     dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
index 77334db..2cb3e58 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
@@ -41,7 +41,7 @@
   @Test
   public void testEmergencySinks() throws InterruptedException
   {
-    final List<Object> list = new ArrayList<Object>();
+    final List<Object> list = new ArrayList<>();
     final StreamCodec<Object> myserde = new StreamCodec<Object>()
     {
       @Override
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
index e094d44..a487176 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
@@ -50,7 +50,7 @@
     byte[] buffer = publisher.consume();
 
     FastSubscriber subscriber = new FastSubscriber("subscriber", 1024);
-    subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<Object>();
+    subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<>();
     SweepableReservoir sr = subscriber.acquireReservoir("res", 1024);
     sr.setSink(new Sink<Object>()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index fd67121..32e5095 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -94,7 +94,7 @@
   @SuppressWarnings({"SleepWhileInLoop"})
   public void testBufferServerStream() throws Exception
   {
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
     final AtomicInteger messageCount = new AtomicInteger();
     Sink<Object> sink = new Sink<Object>()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index ed145c7..9db6fe9 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -55,13 +55,13 @@
   {
     final int totalTupleCount = 5000;
 
-    final PassThroughNode<Object> operator1 = new PassThroughNode<Object>();
+    final PassThroughNode<Object> operator1 = new PassThroughNode<>();
     final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(),
         null));
     node1.setId(1);
     operator1.setup(node1.context);
 
-    final PassThroughNode<Object> operator2 = new PassThroughNode<Object>();
+    final PassThroughNode<Object> operator2 = new PassThroughNode<>();
     final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(),
         null));
     node2.setId(2);
@@ -115,7 +115,7 @@
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 1024 * 5);
     node1.connectInputPort("input", reservoir1);
 
-    Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, Node<?>>();
+    Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<>();
     launchNodeThread(node1, activeNodes);
     launchNodeThread(node2, activeNodes);
     stream.activate(streamContext);
@@ -206,7 +206,7 @@
       }
 
     };
-    public final DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+    public final DefaultOutputPort<T> output = new DefaultOutputPort<>();
     private boolean logMessages = false;
 
     public boolean isLogMessages()
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index f0748eb..287a796 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -47,7 +47,7 @@
 
   public static class TestInputOperator extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -60,7 +60,7 @@
   public static class FirstGenericOperator extends BaseOperator
   {
     public static long endwindowCount;
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
     public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
     {
       @Override
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
index e614750..26e913b 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
@@ -298,7 +298,7 @@
 
   public static class ThreadIdValidatingInputOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
     public static long threadId;
 
     @Override
@@ -416,12 +416,12 @@
       assert (threadList.contains(Thread.currentThread().getId()));
     }
 
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
   }
 
   public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator
   {
-    public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<>();
   }
 
   public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator
@@ -621,9 +621,9 @@
     slc.run();
 
     Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
-    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
     Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
-    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
     Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
     Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
 
@@ -640,9 +640,9 @@
     slc.run();
 
     Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
-    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
     Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
-    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
     Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
     Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
   }
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 38460ea..15de177 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -149,7 +149,7 @@
   @Before
   public void init()
   {
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
     messageCount = new AtomicInteger(0);
 
     Sink<Object> sink = new Sink<Object>()
diff --git a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
index 902b8b6..af821ea 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
@@ -37,7 +37,7 @@
     public long time;
   }
 
-  PriorityQueue<TimedRunnable> queue = new PriorityQueue<TimedRunnable>(16, new Comparator<TimedRunnable>()
+  PriorityQueue<TimedRunnable> queue = new PriorityQueue<>(16, new Comparator<TimedRunnable>()
   {
     @Override
     public int compare(TimedRunnable o1, TimedRunnable o2)
diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index 0326b6a..7399aff 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -322,7 +322,7 @@
   public static class TestHomeDirectory extends TestWatcher
   {
 
-    Map<String, String> env = new HashMap<String, String>();
+    Map<String, String> env = new HashMap<>();
     String userHome;
 
     @Override
@@ -444,7 +444,7 @@
       private static final long serialVersionUID = 201404091805L;
     }
 
-    transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<OperatorWindowIdPair, Object>();
+    transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<>();
 
     @Override
     public synchronized void save(Object object, int operatorId, long windowId) throws IOException
@@ -467,7 +467,7 @@
     @Override
     public synchronized long[] getWindowIds(int operatorId) throws IOException
     {
-      ArrayList<Long> windowIds = new ArrayList<Long>();
+      ArrayList<Long> windowIds = new ArrayList<>();
       for (OperatorWindowIdPair key : store.keySet()) {
         if (key.operatorId == operatorId) {
           windowIds.add(key.windowId);
diff --git a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
index 5bc70dc..9d12fdc 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
@@ -65,7 +65,7 @@
   @Test
   public void testElement()
   {
-    StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1);
+    StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
     Integer i = 10;
     instance.add(i);
     Object result = instance.element();
@@ -78,7 +78,7 @@
   @Test
   public void testOffer()
   {
-    StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1);
+    StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
     Integer i = 10;
     assertTrue(instance.offer(i));
     Object result = instance.peek();
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 9ac28c0..e1fb860 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -109,9 +109,9 @@
     };
 
     @OutputPortFieldAnnotation(optional = false, error = true)
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<Double>();
+    public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<>();
 
     @Override
     public String getName()
@@ -417,7 +417,7 @@
   {
     String classpath = System.getProperty("java.class.path");
     String[] paths = classpath.split(":");
-    List<String> fnames = new LinkedList<String>();
+    List<String> fnames = new LinkedList<>();
     for (String cp : paths) {
       File f = new File(cp);
       if (!f.isDirectory()) {
@@ -480,7 +480,7 @@
   @Test
   public void testValueSerialization() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new TestOperator<>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -491,7 +491,7 @@
     bean.structuredArray[0].name = "s1";
     bean.color = Color.BLUE;
     bean.booleanProp = true;
-    bean.nestedList = new LinkedList<OperatorDiscoveryTest.Structured>();
+    bean.nestedList = new LinkedList<>();
     Structured st = new Structured();
     st.name = "nestedone";
     st.size = 10;
@@ -640,15 +640,15 @@
     private List<Structured> nestedList;
     private Properties props;
     private Structured nested;
-    private Map<String, Structured> map = new HashMap<String, Structured>();
+    private Map<String, Structured> map = new HashMap<>();
     private String[] stringArray;
     private Color color;
     private Structured[] structuredArray;
     private T[] genericArray;
-    private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<String, List<Map<String, Number>>>();
+    private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<>();
     private Map<?, ? super Long> wildcardType = new HashMap<Object, Number>();
-    private List<int[]> listofIntArray = new LinkedList<int[]>();
-    private List<T> parameterizedTypeVariable = new LinkedList<T>();
+    private List<int[]> listofIntArray = new LinkedList<>();
+    private List<T> parameterizedTypeVariable = new LinkedList<>();
     private Z genericType;
     private int[][] multiDimensionPrimitiveArray;
     private Structured[][] multiDimensionComplexArray;
@@ -1056,7 +1056,7 @@
   @Test
   public void testLogicalPlanConfiguration() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new InputTestOperator<>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -1113,12 +1113,12 @@
   public static class SchemaRequiredOperator extends BaseOperator implements InputOperator
   {
     @OutputPortFieldAnnotation(schemaRequired = true)
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
 
     @OutputPortFieldAnnotation(schemaRequired = false)
-    public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 9f414dc..76fa963 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -113,7 +113,7 @@
         lastRequests = requests;
 
         // delegate processing to dispatch thread
-        FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>()
+        FutureTask<Object> future = new FutureTask<>(new Callable<Object>()
         {
           @Override
           public Object call() throws Exception
@@ -351,7 +351,7 @@
   @Test
   public void testSubmitLogicalPlanChange() throws JSONException, Exception
   {
-    List<LogicalPlanRequest> requests = new ArrayList<LogicalPlanRequest>();
+    List<LogicalPlanRequest> requests = new ArrayList<>();
     WebResource r = resource();
 
     CreateOperatorRequest request1 = new CreateOperatorRequest();
@@ -366,7 +366,7 @@
     requests.add(request2);
 
     ObjectMapper mapper = new ObjectMapper();
-    final Map<String, Object> m = new HashMap<String, Object>();
+    final Map<String, Object> m = new HashMap<>();
     m.put("requests", requests);
     final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m));
 
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
index 09b8785..2f0b018 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
@@ -39,7 +39,7 @@
 public class TypeDiscoveryTest
 {
 
-  private static interface GenericInterface<T>
+  private interface GenericInterface<T>
   {
   }
 
@@ -87,10 +87,10 @@
       {
       }
     };
-    final OutputPort<T2> outportT2 = new DefaultOutputPort<T2>();
-    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>();
+    final OutputPort<T2> outportT2 = new DefaultOutputPort<>();
+    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
     final StringOutputPort outportString = new StringOutputPort(this);
-    final OutputPort<List<T0>> outportList = new DefaultOutputPort<List<T0>>();
+    final OutputPort<List<T0>> outportList = new DefaultOutputPort<>();
     final GenericSubClassOutputPort outClassObject = new GenericSubClassOutputPort(this);
   }
 
@@ -107,8 +107,8 @@
       {
       }
     };
-    final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<Map<String, Number>>();
-    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>();
+    final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<>();
+    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
     final StringOutputPort outportString = new StringOutputPort(this);
   }
 
@@ -163,7 +163,7 @@
 
   static class ParameterizedTypeOperator<T> extends BaseOperator
   {
-    final OutputPort<T> output = new DefaultOutputPort<T>();
+    final OutputPort<T> output = new DefaultOutputPort<>();
   }
 
   static class StringParameterOperator extends ParameterizedTypeOperator<String>
diff --git a/engine/src/test/resources/clusterTest.app.properties b/engine/src/test/resources/clusterTest.app.properties
deleted file mode 100644
index 2a0b292..0000000
--- a/engine/src/test/resources/clusterTest.app.properties
+++ /dev/null
@@ -1,44 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# run number generator with pass through operator on cluster continuously
-
-#template for operator definition
-dt.template.nt1.classname=com.datatorrent.dt.engine.GenericTestOperator
-#dt.template.nt1.logMessages=true
-
-dt.template.streamtempl1.inline=true
-
-dt.operator.operator1.template=nt1
-dt.operator.operator2.template=nt1
-
-dt.operator.inputOperator.classname=com.datatorrent.dt.engine.TestGeneratorInputOperator
-dt.operator.inputOperator.maxTuples=10
-
-#dt.operator.httpOut.classname=com.datatorrent.lib.io.HttpOutputOperator
-#dt.operator.httpOut.resourceUrl=http://localhost:9999/resourcecontext
-
-dt.stream.input1.source=inputOperator.outputPort
-dt.stream.input1.sinks=operator1.input1
-dt.stream.input1.template=streamtempl1
-
-dt.stream.n1n2.source=operator1.output1
-#dt.stream.n1n2.sinks=operator2.input1,httpOut.input
-dt.stream.n1n2.sinks=operator2.input1
-dt.stream.n1n2.template=streamtempl1
diff --git a/engine/src/test/resources/dt-site.xml b/engine/src/test/resources/dt-site.xml
index 20748e2..e3cffbf 100644
--- a/engine/src/test/resources/dt-site.xml
+++ b/engine/src/test/resources/dt-site.xml
@@ -34,14 +34,14 @@
   <!--  Operator 1 -->
 
   <property>
-    <name>dt.operator.operator1.classname</name>
+    <name>apex.operator.operator1.classname</name>
     <value>com.datatorrent.stram.engine.TestGeneratorInputOperator</value>
     <description>The root operator</description>
   </property>
 
   <!-- operator subclass specific property -->
   <property>
-    <name>dt.operator.operator1.myStringProperty</name>
+    <name>apex.operator.operator1.myStringProperty</name>
     <value>myStringPropertyValue</value>
     <description>subclass specific property</description>
   </property>
@@ -49,7 +49,7 @@
   <!-- Operator 2, with stream partitioning policy -->
 
   <property>
-    <name>dt.operator.operator2.classname</name>
+    <name>apex.operator.operator2.classname</name>
     <value>com.datatorrent.stram.engine.GenericTestOperator</value>
     <description>Another operator, which gets input from root</description>
   </property>
@@ -57,19 +57,19 @@
   <!--  Stream connecting operator1 and operator2 -->
 
   <property>
-    <name>dt.stream.n1n2.source</name>
+    <name>apex.stream.n1n2.source</name>
     <value>operator1.outport</value>
     <description></description>
   </property>
 
   <property>
-    <name>dt.stream.n1n2.sinks</name>
+    <name>apex.stream.n1n2.sinks</name>
     <value>operator2.inport1</value>
     <description></description>
   </property>
 
   <property>
-    <name>dt.stream.n1n2.partitionPolicy</name>
+    <name>apex.stream.n1n2.partitionPolicy</name>
     <value>someTargetPolicy</value>
     <description>The partition policy for this stream.</description>
   </property>
@@ -77,7 +77,7 @@
   <!-- Operator 3, receives input from operator 2 -->
 
   <property>
-    <name>dt.operator.operator3.classname</name>
+    <name>apex.operator.operator3.classname</name>
     <value>com.datatorrent.stram.engine.GenericTestOperator</value>
     <description>Another operator, which gets input from root</description>
   </property>
@@ -85,19 +85,19 @@
   <!-- Operator 4, also receives input from operator 2 -->
 
   <property>
-    <name>dt.operator.operator4.classname</name>
+    <name>apex.operator.operator4.classname</name>
     <value>com.datatorrent.stram.engine.GenericTestOperator</value>
     <description>Another operator, which gets input from root</description>
   </property>
 
   <property>
-    <name>dt.stream.n2n3.source</name>
+    <name>apex.stream.n2n3.source</name>
     <value>operator2.outport1</value>
     <description>Operator 3 receives input from operator 2.</description>
   </property>
 
   <property>
-    <name>dt.stream.n2n3.sinks</name>
+    <name>apex.stream.n2n3.sinks</name>
     <value>operator3.inport1,operator4.inport1</value>
     <description>operator3 and operator4 receives input from operator 2.</description>
   </property>
@@ -105,18 +105,18 @@
   <!-- Operator 5 -->
 
   <property>
-    <name>dt.operator.operator5.classname</name>
+    <name>apex.operator.operator5.classname</name>
     <value>com.datatorrent.stram.engine.GenericTestOperator</value>
   </property>
 
   <property>
-    <name>dt.stream.n4n5.source</name>
+    <name>apex.stream.n4n5.source</name>
     <value>operator4.outport1</value>
     <description>Operator 5 receives input from operator 4.</description>
   </property>
 
   <property>
-    <name>dt.stream.n4n5.sinks</name>
+    <name>apex.stream.n4n5.sinks</name>
     <value>operator5.inport1</value>
     <description>Operator 5 receives input from operator 4.</description>
   </property>
@@ -125,7 +125,7 @@
   <!-- Operator 6, single operator w/o links -->
 
   <property>
-    <name>dt.operator.operator6.classname</name>
+    <name>apex.operator.operator6.classname</name>
     <value>com.datatorrent.stram.engine.TestGeneratorInputOperator</value>
     <description>Another operator, which gets input from root</description>
   </property>
diff --git a/engine/src/test/resources/testModuleTopology.properties b/engine/src/test/resources/testModuleTopology.properties
index 0679e26..8a67f36 100644
--- a/engine/src/test/resources/testModuleTopology.properties
+++ b/engine/src/test/resources/testModuleTopology.properties
@@ -18,45 +18,45 @@
 #
 
 # test for defining topology as property file
-dt.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator
-dt.operator.O1.inputOperatorProp=1
+apex.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator
+apex.operator.O1.inputOperatorProp=1
 
-dt.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator
-dt.operator.O2.operatorProp=2
+apex.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator
+apex.operator.O2.operatorProp=2
 
-dt.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
-dt.operator.Ma.level2ModuleAProp1=11
-dt.operator.Ma.level2ModuleAProp2=12
-dt.operator.Ma.level2ModuleAProp3=13
+apex.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
+apex.operator.Ma.level2ModuleAProp1=11
+apex.operator.Ma.level2ModuleAProp2=12
+apex.operator.Ma.level2ModuleAProp3=13
 
-dt.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
-dt.operator.Mb.level2ModuleBProp1=21
-dt.operator.Mb.level2ModuleBProp2=22
-dt.operator.Mb.level2ModuleBProp3=23
+apex.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
+apex.operator.Mb.level2ModuleBProp1=21
+apex.operator.Mb.level2ModuleBProp2=22
+apex.operator.Mb.level2ModuleBProp3=23
 
-dt.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
-dt.operator.Mc.level2ModuleAProp1=31
-dt.operator.Mc.level2ModuleAProp2=32
-dt.operator.Mc.level2ModuleAProp3=33
+apex.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
+apex.operator.Mc.level2ModuleAProp1=31
+apex.operator.Mc.level2ModuleAProp2=32
+apex.operator.Mc.level2ModuleAProp3=33
 
-dt.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
-dt.operator.Md.level2ModuleBProp1=41
-dt.operator.Md.level2ModuleBProp2=42
-dt.operator.Md.level2ModuleBProp3=43
+apex.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
+apex.operator.Md.level2ModuleBProp1=41
+apex.operator.Md.level2ModuleBProp2=42
+apex.operator.Md.level2ModuleBProp3=43
 
-dt.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module
+apex.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module
 
-dt.stream.O1_O2.source=O1.out
-dt.stream.O1_O2.sinks=O2.in,Me.mIn
+apex.stream.O1_O2.source=O1.out
+apex.stream.O1_O2.sinks=O2.in,Me.mIn
 
-dt.stream.O2_Ma.source=O2.out1
-dt.stream.O2_Ma.sinks=Ma.mIn
+apex.stream.O2_Ma.source=O2.out1
+apex.stream.O2_Ma.sinks=Ma.mIn
 
-dt.stream.Ma_Mb.source=Ma.mOut1
-dt.stream.Ma_Mb.sinks=Mb.mIn
+apex.stream.Ma_Mb.source=Ma.mOut1
+apex.stream.Ma_Mb.sinks=Mb.mIn
 
-dt.stream.Ma_Md.source=Ma.mOut2
-dt.stream.Ma_Md.sinks=Md.mIn
+apex.stream.Ma_Md.source=Ma.mOut2
+apex.stream.Ma_Md.sinks=Md.mIn
 
-dt.stream.Mb_Mc.source=Mb.mOut2
-dt.stream.Mb_Mc.sinks=Mc.mIn
+apex.stream.Mb_Mc.source=Mb.mOut2
+apex.stream.Mb_Mc.sinks=Mc.mIn
diff --git a/engine/src/test/resources/testTopology.properties b/engine/src/test/resources/testTopology.properties
index abb7040..3160424 100644
--- a/engine/src/test/resources/testTopology.properties
+++ b/engine/src/test/resources/testTopology.properties
@@ -19,29 +19,29 @@
 
 # test for defining topology as property file
 
-dt.operator.inputOperator.classname=com.datatorrent.stram.engine.TestGeneratorInputOperator
-dt.operator.inputOperator.myConfigProperty=myConfigPropertyValue
+apex.operator.inputOperator.classname=com.datatorrent.stram.engine.TestGeneratorInputOperator
+apex.operator.inputOperator.myConfigProperty=myConfigPropertyValue
 
-dt.operator.operator1.classname=com.datatorrent.stram.engine.GenericTestOperator
-dt.operator.operator1.myStringProperty=myStringPropertyValue
+apex.operator.operator1.classname=com.datatorrent.stram.engine.GenericTestOperator
+apex.operator.operator1.myStringProperty=myStringPropertyValue
 
-dt.operator.operator2.classname=com.datatorrent.stram.engine.GenericTestOperator
+apex.operator.operator2.classname=com.datatorrent.stram.engine.GenericTestOperator
 
 #define a template for operator definition
-dt.template.nt1.classname=com.datatorrent.stram.engine.GenericTestOperator
-dt.template.nt1.myStringProperty=myStringPropertyValueFromTemplate
+apex.template.nt1.classname=com.datatorrent.stram.engine.GenericTestOperator
+apex.template.nt1.myStringProperty=myStringPropertyValueFromTemplate
 
-dt.operator.operator3.template=nt1
+apex.operator.operator3.template=nt1
 
-dt.operator.operator4.template=nt1
-dt.operator.operator4.myStringProperty=overrideOperator4
-dt.operator.operator4.stringPropertySetterOnly=setterOnlyOperator4
-dt.operator.operator4.booleanProperty=true
+apex.operator.operator4.template=nt1
+apex.operator.operator4.myStringProperty=overrideOperator4
+apex.operator.operator4.stringPropertySetterOnly=setterOnlyOperator4
+apex.operator.operator4.booleanProperty=true
 
-dt.stream.n1n2.source=operator1.outport1
-dt.stream.n1n2.sinks=operator2.inport1
-dt.stream.n1n2.template=defaultstream
-dt.stream.n1n2.locality=CONTAINER_LOCAL
+apex.stream.n1n2.source=operator1.outport1
+apex.stream.n1n2.sinks=operator2.inport1
+apex.stream.n1n2.template=defaultstream
+apex.stream.n1n2.locality=CONTAINER_LOCAL
 
-dt.stream.inputStream.source=inputOperator.outport
-dt.stream.inputStream.sinks=operator1.inport1,operator3.inport1,operator4.inport1
+apex.stream.inputStream.source=inputOperator.outport
+apex.stream.inputStream.sinks=operator1.inport1,operator3.inport1,operator4.inport1
diff --git a/engine/src/test/resources/testTopologyLegacyPrefix.properties b/engine/src/test/resources/testTopologyLegacyPrefix.properties
new file mode 100644
index 0000000..012c9cb
--- /dev/null
+++ b/engine/src/test/resources/testTopologyLegacyPrefix.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+dt.operator.o1.classname=com.datatorrent.stram.engine.TestGeneratorInputOperator
+
+dt.operator.o2.classname=com.datatorrent.stram.engine.GenericTestOperator
+dt.operator.o2.myStringProperty=myStringPropertyValue
+
+dt.stream.s1.source=o1.outport
+dt.stream.s1.sinks=o2.inport1
+dt.stream.s1.locality=CONTAINER_LOCAL