APEXCORE-691 Use type inference for generic instance creation
closes #505
diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java
index 5e10ccd..304c086 100644
--- a/api/src/main/java/com/datatorrent/api/AffinityRule.java
+++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java
@@ -92,7 +92,7 @@
   public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators)
   {
     this(type, locality, relaxLocality);
-    LinkedList<String> operators = new LinkedList<String>();
+    LinkedList<String> operators = new LinkedList<>();
     if (firstOperator != null && otherOperators.length >= 1) {
       operators.add(firstOperator);
 
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 2efc84f..821ecb2 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -236,11 +236,11 @@
      */
     public static class AttributeInitializer
     {
-      static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<Class<?>, Set<Attribute<Object>>>();
+      static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>();
 
       public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz)
       {
-        Map<Attribute<Object>, Object> result = new HashMap<Attribute<Object>, Object>();
+        Map<Attribute<Object>, Object> result = new HashMap<>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
             if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
@@ -273,7 +273,7 @@
         if (map.containsKey(clazz)) {
           return 0;
         }
-        Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
+        Set<Attribute<Object>> set = new HashSet<>();
         try {
           for (Field f: clazz.getDeclaredFields()) {
             if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 3d3cffe..94022ff 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -121,17 +121,17 @@
     /**
      * Number of tuples the poll buffer can cache without blocking the input stream to the port.
      */
-    Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024);
+    Attribute<Integer> QUEUE_CAPACITY = new Attribute<>(1024);
     /**
      * The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container.
      * Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may
      * not be adhered to.
      */
-    Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64);
+    Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<>(8 * 64);
     /**
      * Poll period in milliseconds when the port buffer reaches its limits.
      */
-    Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+    Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
     /**
      * Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge.
      * Can be used to form parallel partitions that span a group of operators.
@@ -139,7 +139,7 @@
      * If multiple ports of an operator have the setting, incoming streams must track back to
      * a common root partition, i.e. the operator join forks of the same origin.
      */
-    Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false);
+    Attribute<Boolean> PARTITION_PARALLEL = new Attribute<>(false);
     /**
      * Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the
      * number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions
@@ -147,7 +147,7 @@
      * network I/O or other resource requirement for each unifier container (depends on the specific functionality of
      * the unifier), enabling horizontal scale by overcoming the single unifier bottleneck.
      */
-    Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE);
 
     /**
      * Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning
@@ -158,16 +158,16 @@
      * the inputs. In this case the default unifier behavior can be specified on the output port and individual
      * exceptions can be specified on the corresponding input ports.
      */
-    Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE);
+    Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE);
     /**
      * Whether or not to auto record the tuples
      */
-    Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+    Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
     /**
      * Whether the output is unified.
      * This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified.
      */
-    Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false);
+    Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<>(false);
     /**
      * Provide the codec which can be used to serialize or deserialize the data
      * that can be received on the port. If it is unspecified the engine may use
@@ -193,13 +193,13 @@
      * of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state
      * is recovered.
      */
-    Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID);
+    Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID);
     /**
      * It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the
      * operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds.
      * Default value is 10 milliseconds.
      */
-    Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+    Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
     /**
      * The maximum number of attempts to restart a failing operator before shutting down the application.
      * Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the
@@ -218,15 +218,15 @@
      * by the engine. The attribute is ignored when the operator was already declared stateless through the
      * {@link Stateless} annotation.
      */
-    Attribute<Boolean> STATELESS = new Attribute<Boolean>(false);
+    Attribute<Boolean> STATELESS = new Attribute<>(false);
     /**
      * Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers.
      */
-    Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024);
+    Attribute<Integer> MEMORY_MB = new Attribute<>(1024);
     /**
      * CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers.
      */
-    Attribute<Integer> VCORES = new Attribute<Integer>(0);
+    Attribute<Integer> VCORES = new Attribute<>(0);
 
     /**
      * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here.
@@ -235,7 +235,7 @@
     /**
      * Attribute of the operator that tells the platform how many streaming windows make 1 application window.
      */
-    Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1);
+    Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<>(1);
     /**
      * When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is
      * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}.
@@ -251,7 +251,7 @@
      * value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing
      * will be done at application window boundary.
      */
-    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1);
+    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(1);
     /**
      * Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity).
      * For example, the user may wish to specify a locality constraint for an input operator relative to its data source.
@@ -274,18 +274,18 @@
      * If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators
      * should be specified as AT_MOST_ONCE otherwise it will result in an error.
      */
-    Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE);
+    Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE);
     /**
      * Timeout to identify stalled processing, specified as count of streaming windows. If the last processed
      * window does not advance within the specified timeout count, the operator will be considered stuck and the
      * container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue,
      * blocking operator logic, etc.
      */
-    Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120);
+    Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<>(120);
     /**
      * Whether or not to auto record the tuples
      */
-    Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+    Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
     /**
      * How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute.
      * If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the
@@ -348,7 +348,7 @@
      * Name under which the application will be shown in the resource manager.
      * If not set, the default is the configuration Java class or property file name.
      */
-    Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name");
+    Attribute<String> APPLICATION_NAME = new Attribute<>("unknown-application-name");
     /**
      * URL to the application's documentation.
      */
@@ -387,7 +387,7 @@
     /**
      * Dump extra debug information in launcher, master and containers.
      */
-    Attribute<Boolean> DEBUG = new Attribute<Boolean>(false);
+    Attribute<Boolean> DEBUG = new Attribute<>(false);
     /**
      * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here.
      */
@@ -396,20 +396,20 @@
      * The amount of memory to be requested for the application master. Not used in local mode.
      * Default value is 1GB.
      */
-    Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024);
+    Attribute<Integer> MASTER_MEMORY_MB = new Attribute<>(1024);
     /**
      * Where to spool the data once the buffer server capacity is reached.
      */
-    Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true);
+    Attribute<Boolean> BUFFER_SPOOLING = new Attribute<>(true);
     /**
      * The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms.
      */
-    Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500);
+    Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500);
     /**
      * The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator
      * state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows.
      */
-    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60);
+    Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(60);
     /**
      * The path to store application dependencies, recording and other generated files for application master and containers.
      */
@@ -418,13 +418,13 @@
      * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
      * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k.
      */
-    Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024);
+    Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024);
     /**
      * The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
      * in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file
      * is created and the tuples start getting stored in the new file. Default value is 30hrs.
      */
-    Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000);
+    Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000);
     /**
      * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration.
      */
@@ -432,7 +432,7 @@
     /**
      * Whether or not gateway is expecting SSL connection.
      */
-    Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false);
+    Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<>(false);
     /**
      * The username for logging in to the gateway, if authentication is enabled.
      */
@@ -448,48 +448,48 @@
     /**
      * Maximum number of simultaneous heartbeat connections to process. Default value is 30.
      */
-    Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30);
+    Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30);
     /**
      * How frequently should operators heartbeat to stram. Recommended setting is
      * 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms.
      */
-    Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000);
+    Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000);
     /**
      * Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart.
      * Default value is 30s.
      */
-    Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000);
+    Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000);
     /**
      * Timeout for allocating container resources. Default value is 60s.
      */
-    Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE);
     /**
      * Maximum number of windows that can be pending for statistics calculation. Statistics are computed when
      * the metrics are available from all operators for a window. If the information is not available from all operators then
      * the window is pending. When the number of pending windows reaches this limit the information for the oldest window
      * is purged. Default value is 1000 windows.
      */
-    Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000);
+    Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000);
     /**
      * Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false.
      */
-    Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false);
+    Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<>(false);
     /**
      * The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or
      * equal to the throughput calculation interval. The default value is 10s.
      */
-    Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000);
+    Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000);
     /**
      * The maximum number of samples to use when calculating throughput. In practice fewer samples may be used
      * if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples.
      */
-    Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000);
+    Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000);
     /**
      * The number of samples to use when using RPC latency to compensate for clock skews and network latency when
      * calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100
      * samples.
      */
-    Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100);
+    Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100);
     /**
      * The agent which can be used to find the jvm options for the container.
      */
@@ -511,12 +511,12 @@
      * blacklisting of nodes by application master
      * Blacklisting for nodes is disabled for the default value
      */
-    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE);
+    Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE);
 
     /**
      * The amount of time to wait before removing failed nodes from blacklist
      */
-    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000));
+    Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000));
 
     /**
      * Affinity rules for specifying affinity and anti-affinity between logical operators
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index d4a0a41..fa8ab23 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -302,7 +302,7 @@
           return clazz.getConstructor(String.class).newInstance(parts[1]);
         } else {
           T object = clazz.getConstructor(String.class).newInstance(parts[1]);
-          HashMap<String, String> hashMap = new HashMap<String, String>();
+          HashMap<String, String> hashMap = new HashMap<>();
           for (int i = 2; i < parts.length; i++) {
             String[] keyValPair = parts[i].split(propertySeparator, 2);
             hashMap.put(keyValPair[0], keyValPair[1]);
@@ -365,11 +365,11 @@
       }
 
       if (string.isEmpty()) {
-        return new HashMap<K, V>();
+        return new HashMap<>();
       }
 
       String[] parts = string.split(separator);
-      HashMap<K, V> map = new HashMap<K, V>();
+      HashMap<K, V> map = new HashMap<>();
       for (String part : parts) {
         String[] kvpair = part.split(equal, 2);
         map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1]));
@@ -433,7 +433,7 @@
       }
 
       String[] parts = string.split(separator);
-      ArrayList<T> arrayList = new ArrayList<T>(parts.length);
+      ArrayList<T> arrayList = new ArrayList<>(parts.length);
       for (String part : parts) {
         arrayList.add(codec.fromString(part));
       }
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
index 82cf50e..8ff0205 100644
--- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -36,17 +36,17 @@
   /**
    * Parameter to specify extra jars for launch.
    */
-  public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> LIB_JARS = new Attribute<>(new StringCodec.String2String());
 
   /**
    * Parameter to specify the previous application id to use to resume launch from.
    */
-  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String());
 
   /**
    * Parameter to specify the queue name to use for launch.
    */
-  public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+  public static final Attribute<String> QUEUE_NAME = new Attribute<>(new StringCodec.String2String());
 
   static {
     Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);
diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
index fcb1809..b463619 100644
--- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
+++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
@@ -51,7 +51,7 @@
 
   interface iface
   {
-    Attribute<Greeting> greeting = new Attribute<Greeting>(Greeting.hello);
+    Attribute<Greeting> greeting = new Attribute<>(Greeting.hello);
   }
 
   @Test
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 84999fa..d08b9fc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -336,14 +336,14 @@
   {
     all_listeners.add(dl);
     //logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this);
-    ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+    ArrayList<BitVector> partitions = new ArrayList<>();
     if (dl.getPartitions(partitions) > 0) {
       for (BitVector partition : partitions) {
         HashSet<DataListener> set;
         if (listeners.containsKey(partition)) {
           set = listeners.get(partition);
         } else {
-          set = new HashSet<DataListener>();
+          set = new HashSet<>();
           listeners.put(partition, set);
         }
         set.add(dl);
@@ -353,7 +353,7 @@
       if (listeners.containsKey(DataListener.NULL_PARTITION)) {
         set = listeners.get(DataListener.NULL_PARTITION);
       } else {
-        set = new HashSet<DataListener>();
+        set = new HashSet<>();
         listeners.put(DataListener.NULL_PARTITION, set);
       }
 
@@ -363,7 +363,7 @@
 
   public void removeDataListener(DataListener dl)
   {
-    ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+    ArrayList<BitVector> partitions = new ArrayList<>();
     if (dl.getPartitions(partitions) > 0) {
       for (BitVector partition : partitions) {
         if (listeners.containsKey(partition)) {
@@ -459,7 +459,7 @@
 
     // When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it.
     Block b = first;
-    Map<Block, Integer> indices = new HashMap<Block, Integer>();
+    Map<Block, Integer> indices = new HashMap<>();
     int i = 0;
     while (b != null) {
       indices.put(b, i++);
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 08a483a..b06e60a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -71,8 +71,8 @@
     this.identifier = identifier;
     this.upstream = upstream;
     this.group = group;
-    this.physicalNodes = new HashSet<PhysicalNode>();
-    this.partitions = new HashSet<BitVector>();
+    this.physicalNodes = new HashSet<>();
+    this.partitions = new HashSet<>();
     this.iterator = iterator;
     this.skipWindowId = skipWindowId;
     this.eventloop = eventloop;
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7ac518b..8a56b51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -260,7 +260,7 @@
   }
 
   private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
-  private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
+  private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
   private final int blockSize;
   private final int numberOfCacheBlocks;
@@ -883,7 +883,7 @@
         }
       }
 
-      ArrayList<LogicalNode> list = new ArrayList<LogicalNode>();
+      ArrayList<LogicalNode> list = new ArrayList<>();
       String publisherIdentifier = datalist.getIdentifier();
       Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
       while (iterator.hasNext()) {
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 124cc5f..000ce00 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -31,7 +31,7 @@
  */
 public class System
 {
-  private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<String, DefaultEventLoop>();
+  private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<>();
 
   public static void startup(String identifier)
   {
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index f7b8829..371f98a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -42,7 +42,7 @@
     String down_type = "SubscriberId/StreamType";
     String upstream_id = "PublisherId";
     int mask = 7;
-    ArrayList<Integer> partitions = new ArrayList<Integer>();
+    ArrayList<Integer> partitions = new ArrayList<>();
     partitions.add(5);
     long startingWindowId = 0xcafebabe00000078L;
 
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 3c0cb0e..d6e9b1a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -32,7 +32,7 @@
  */
 public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber
 {
-  public final ArrayList<Object> resetPayloads = new ArrayList<Object>();
+  public final ArrayList<Object> resetPayloads = new ArrayList<>();
   public AtomicInteger tupleCount = new AtomicInteger(0);
   public WindowIdHolder firstPayload;
   public WindowIdHolder lastPayload;
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index 165d8cf..d77b1ae 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -114,7 +114,7 @@
       newPartitions = Lists.newArrayList();
 
       for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) {
-        newPartitions.add(new DefaultPartition<T>(partition.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
       }
 
       // partition the stream that was first connected in the DAG and send full data to remaining input ports
@@ -156,8 +156,8 @@
    */
   public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions)
   {
-    List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
-    HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<Integer, Partition<T>>();
+    List<Partition<T>> newPartitions = new ArrayList<>();
+    HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>();
     for (Partition<T> p: partitions) {
       int load = p.getLoad();
       if (load < 0) {
@@ -201,7 +201,7 @@
         }
 
         for (int key: newKeys) {
-          Partition<T> newPartition = new DefaultPartition<T>(p.getPartitionedInstance());
+          Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance());
           newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
           newPartitions.add(newPartition);
         }
@@ -224,8 +224,8 @@
    */
   public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions)
   {
-    List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
-    List<Partition<T>> lowLoadPartitions = new ArrayList<Partition<T>>();
+    List<Partition<T>> newPartitions = new ArrayList<>();
+    List<Partition<T>> lowLoadPartitions = new ArrayList<>();
     for (Partition<T> p: partitions) {
       int load = p.getLoad();
       if (load < 0) {
@@ -235,8 +235,8 @@
           lowLoadPartitions.add(p);
         }
       } else if (load > 0) {
-        newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
-        newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
+        newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
       } else {
         newPartitions.add(p);
       }
@@ -274,7 +274,7 @@
       T anOperator = newPartitions.iterator().next().getPartitionedInstance();
 
       while (morePartitionsToCreate-- > 0) {
-        DefaultPartition<T> partition = new DefaultPartition<T>(anOperator);
+        DefaultPartition<T> partition = new DefaultPartition<>(anOperator);
         newPartitions.add(partition);
       }
     }
diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
index dccd7b7..3dc4dda 100644
--- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
+++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
@@ -32,17 +32,17 @@
   /**
    * Attribute for the user name for login.
    */
-  Attribute<String> USER_NAME = new Attribute<String>((String)null);
+  Attribute<String> USER_NAME = new Attribute<>((String)null);
 
   /**
    * Attribute for the password for login.
    */
 
-  Attribute<char[]> PASSWORD = new Attribute<char[]>((char[])null);
+  Attribute<char[]> PASSWORD = new Attribute<>((char[])null);
 
   /**
    * Attribute for the realm for login.
    */
-  Attribute<String> REALM = new Attribute<String>((String)null);
+  Attribute<String> REALM = new Attribute<>((String)null);
 
 }
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
index ca7490d..f90a888 100644
--- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -48,7 +48,7 @@
     }
   };
 
-  public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+  public transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
 
   protected List<T> lastWindowTuples = new ArrayList<>();
 
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 7723fed..ef837a8 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -55,9 +55,9 @@
     this.objectMapper = new ObjectMapper();
     objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
     objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
-    module.addSerializer(ObjectMapperString.class, new RawSerializer<Object>(Object.class));
-    module.addSerializer(JSONObject.class, new RawSerializer<Object>(Object.class));
-    module.addSerializer(JSONArray.class, new RawSerializer<Object>(Object.class));
+    module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class));
+    module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class));
+    module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class));
     objectMapper.registerModule(module);
   }
 
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index 63d1646..af5e10e 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -53,7 +53,7 @@
    */
   public static <T> String constructPublishMessage(String topic, T data, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.PUBLISH);
     pubSubMessage.setTopic(topic);
     pubSubMessage.setData(data);
@@ -72,7 +72,7 @@
    */
   public static <T> String constructSubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.SUBSCRIBE);
     pubSubMessage.setTopic(topic);
 
@@ -90,7 +90,7 @@
    */
   public static <T> String constructUnsubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE);
     pubSubMessage.setTopic(topic);
 
@@ -108,7 +108,7 @@
    */
   public static <T> String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS);
     pubSubMessage.setTopic(topic);
 
@@ -126,7 +126,7 @@
    */
   public static <T> String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
   {
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS);
     pubSubMessage.setTopic(topic);
 
@@ -135,7 +135,7 @@
 
   public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
   {
-    HashMap<String, Object> map = new HashMap<String, Object>();
+    HashMap<String, Object> map = new HashMap<>();
     map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
     map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
     T data = pubSubMessage.getData();
@@ -156,7 +156,7 @@
   public PubSubMessage<T> parseMessage(String message) throws IOException
   {
     HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
-    PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+    PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
     pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));
     pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY));
     pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY));
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index e7c4887..2e48f54 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -41,7 +41,7 @@
 
   public static class DummyOperator implements Operator
   {
-    public final DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     private Integer value;
 
@@ -93,10 +93,10 @@
   public void partition1Test()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
     partitions.add(defaultPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -111,10 +111,10 @@
   public void partition5Test()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(5);
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(5);
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+    DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
     partitions.add(defaultPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -137,10 +137,10 @@
   public void testParallelPartitionScaleUP()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
-    partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+    partitions.add(new DefaultPartition<>(dummyOperator));
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
         new PartitioningContextImpl(null, 5));
@@ -151,12 +151,12 @@
   public void testParallelPartitionScaleDown()
   {
     DummyOperator dummyOperator = new DummyOperator(5);
-    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+    StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
 
     Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
 
     for (int i = 5; i-- > 0; ) {
-      partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+      partitions.add(new DefaultPartition<>(dummyOperator));
     }
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index 97debe3..79fdac7 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -50,7 +50,7 @@
       }
 
     };
-    public final transient OutputPort<T> output = new DefaultOutputPort<T>();
+    public final transient OutputPort<T> output = new DefaultOutputPort<>();
     private int i;
 
     public void setI(int i)
@@ -109,7 +109,7 @@
   @Test
   public void testReadResolve() throws Exception
   {
-    SerializableOperator<Object> pre = new SerializableOperator<Object>();
+    SerializableOperator<Object> pre = new SerializableOperator<>();
     pre.setI(10);
 
     FileOutputStream fos = new FileOutputStream(filename);
diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
index 53d91a5..80314c7 100644
--- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
@@ -73,7 +73,7 @@
       for (ContainerRequest cr : requests) {
         ContainerStartRequest csr = hostSpecificRequests.get(cr);
         ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority());
-        MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr);
+        MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, newCr);
         requestedResources.put(csr, pair);
         containerRequests.add(newCr);
         hostSpecificRequests.remove(cr);
@@ -91,7 +91,7 @@
         for (Entry<ContainerRequest, ContainerStartRequest> entry : otherContainerRequests.entrySet()) {
           ContainerRequest cr = entry.getKey();
           ContainerStartRequest csr = entry.getValue();
-          MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+          MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
           requestedResources.put(csr, pair);
           containerRequests.add(cr);
         }
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index e7f9672..45206bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -102,7 +102,7 @@
    */
   public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
   {
-    MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+    MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
     requestedResources.put(csr, pair);
     containerRequests.add(cr);
   }
@@ -164,7 +164,7 @@
 
   public List<String> getNodesExceptHost(List<String> hostNames)
   {
-    List<String> nodesList = new ArrayList<String>();
+    List<String> nodesList = new ArrayList<>();
     Set<String> hostNameSet = Sets.newHashSet();
     hostNameSet.addAll(hostNames);
     for (String host : nodeReportMap.keySet()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
index 3a61ee6..b374447 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
@@ -32,10 +32,10 @@
 public class PermissionsInfo
 {
 
-  private final Set<String> readOnlyRoles = new TreeSet<String>();
-  private final Set<String> readOnlyUsers = new TreeSet<String>();
-  private final Set<String> readWriteRoles = new TreeSet<String>();
-  private final Set<String> readWriteUsers = new TreeSet<String>();
+  private final Set<String> readOnlyRoles = new TreeSet<>();
+  private final Set<String> readOnlyUsers = new TreeSet<>();
+  private final Set<String> readWriteRoles = new TreeSet<>();
+  private final Set<String> readWriteUsers = new TreeSet<>();
   private boolean readOnlyEveryone = false;
   private boolean readWriteEveryone = false;
 
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 050729d..8076d4a 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -271,7 +271,7 @@
       }
       Text rmTokenService = new Text(Joiner.on(',').join(services));
 
-      return new Token<RMDelegationTokenIdentifier>(
+      return new Token<>(
           rmDelegationToken.getIdentifier().array(),
           rmDelegationToken.getPassword().array(),
           new Text(rmDelegationToken.getKind()),
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 7113280..284aefb 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -50,8 +50,8 @@
   private final int id;
   private final String name;
   // the size of the circular queue should be configurable. hardcoded to 1024 for now.
-  private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
-  private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
+  private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024);
+  private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024);
   public final boolean stateless;
   private int windowsFromCheckpoint;
 
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 401eea9..62c4fd8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -760,7 +760,7 @@
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
         InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
         StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
-        StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
+        StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator);
         persistOperatorPortMeta.setStreamCodec(codec);
       }
     }
@@ -1907,14 +1907,14 @@
     HashMap<OperatorPair, AffinityRule> antiAffinities = new HashMap<>();
     HashMap<OperatorPair, AffinityRule> threadLocalAffinities = new HashMap<>();
 
-    List<String> operatorNames = new ArrayList<String>();
+    List<String> operatorNames = new ArrayList<>();
 
     for (OperatorMeta operator : getAllOperators()) {
       operatorNames.add(operator.getName());
-      Set<String> containerSet = new HashSet<String>();
+      Set<String> containerSet = new HashSet<>();
       containerSet.add(operator.getName());
       containerAffinities.put(operator.getName(), containerSet);
-      Set<String> nodeSet = new HashSet<String>();
+      Set<String> nodeSet = new HashSet<>();
       nodeSet.add(operator.getName());
       nodeAffinities.put(operator.getName(), nodeSet);
 
@@ -2073,7 +2073,7 @@
    */
   public void convertRegexToList(List<String> operatorNames, AffinityRule rule)
   {
-    List<String> operators = new LinkedList<String>();
+    List<String> operators = new LinkedList<>();
     Pattern p = Pattern.compile(rule.getOperatorRegex());
     for (String name : operatorNames) {
       if (p.matcher(name).matches()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..a1da94a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -477,13 +477,13 @@
     // Log container anti-affinity
     if (LOG.isDebugEnabled()) {
       for (PTContainer container : containers) {
-        List<String> antiOperators = new ArrayList<String>();
+        List<String> antiOperators = new ArrayList<>();
         for (PTContainer c : container.getStrictAntiPrefs()) {
           for (PTOperator operator : c.getOperators()) {
             antiOperators.add(operator.getName());
           }
         }
-        List<String> containerOperators = new ArrayList<String>();
+        List<String> containerOperators = new ArrayList<>();
         for (PTOperator operator : container.getOperators()) {
           containerOperators.add(operator.getName());
         }
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 38c0f41..47986cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -116,10 +116,10 @@
    */
   public PubSubWebSocketClient()
   {
-    throwable = new AtomicReference<Throwable>();
+    throwable = new AtomicReference<>();
     ioThreadMultiplier = 1;
     mapper = (new JacksonObjectMapperProvider()).getContext(null);
-    codec = new PubSubMessageCodec<Object>(mapper);
+    codec = new PubSubMessageCodec<>(mapper);
 
     AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
     config.setIoThreadMultiplier(ioThreadMultiplier);
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
index dd75857..9fbb54d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
@@ -148,7 +148,7 @@
     List<CompactAnnotationNode> annotations = new LinkedList<>();
     for (Object visibleAnnotation : fn.visibleAnnotations) {
       CompactAnnotationNode node = new CompactAnnotationNode();
-      Map<String, Object> annotationMap = new HashMap<String, Object>();
+      Map<String, Object> annotationMap = new HashMap<>();
       if (visibleAnnotation instanceof AnnotationNode) {
         AnnotationNode annotation = (AnnotationNode)visibleAnnotation;
         if (annotation.desc.contains("InputPortFieldAnnotation")
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
index 1e87b31..91a3cf3 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
@@ -87,7 +87,7 @@
 
     char boundChar;
 
-    ArrayList<Type> bounds = new ArrayList<Type>();
+    ArrayList<Type> bounds = new ArrayList<>();
 
     public Type[] getUpperBounds()
     {
@@ -154,7 +154,7 @@
   class ParameterizedTypeNode extends TypeNode
   {
 
-    ArrayList<Type> actualTypeArguments = new ArrayList<Type>();
+    ArrayList<Type> actualTypeArguments = new ArrayList<>();
 
     public Type[] getActualTypeArguments()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index d7f96d4..0c997ec 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -78,7 +78,7 @@
   private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
   {
     @OutputPortFieldAnnotation( optional = true)
-    public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
     private transient int windowCount;
 
     private int checkpointState;
@@ -326,7 +326,7 @@
 
   public List<Checkpoint> getCheckpoints(Long... windowIds)
   {
-    List<Checkpoint> list = new ArrayList<Checkpoint>(windowIds.length);
+    List<Checkpoint> list = new ArrayList<>(windowIds.length);
     for (Long windowId : windowIds) {
       list.add(new Checkpoint(windowId, 0, 0));
     }
diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
index b1509e0..b1f3363 100644
--- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
@@ -41,7 +41,7 @@
   public void testGenericOperatorPropertyCodec()
   {
     LogicalPlan dag = new LogicalPlan();
-    Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+    Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>();
     codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class);
     dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs);
     dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index ecbeeb6..f0199f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -86,7 +86,7 @@
     /*
      * Received tuples are stored in a map keyed with the system assigned operator id.
      */
-    public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<String, List<Object>>();
+    public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
     private transient int operatorId;
     public String prefix = "";
 
@@ -107,7 +107,7 @@
         synchronized (receivedTuples) {
           List<Object> l = receivedTuples.get(id);
           if (l == null) {
-            l = Collections.synchronizedList(new ArrayList<Object>());
+            l = Collections.synchronizedList(new ArrayList<>());
             //LOG.debug("adding {} {}", id, l);
             receivedTuples.put(id, l);
           }
@@ -121,12 +121,12 @@
 
     };
     @OutputPortFieldAnnotation( optional = true)
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
   }
 
   public static class TestInputOperator<T> extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
     transient boolean first;
     transient long windowId;
     boolean blockEndStream = false;
@@ -178,9 +178,9 @@
     CollectorOperator.receivedTuples.clear();
 
     TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
-    input.testTuples = new ArrayList<List<Integer>>();
+    input.testTuples = new ArrayList<>();
     for (Integer[] tuples: testData) {
-      input.testTuples.add(new ArrayList<Integer>(Arrays.asList(tuples)));
+      input.testTuples.add(new ArrayList<>(Arrays.asList(tuples)));
     }
     CollectorOperator collector = dag.addOperator("collector", new CollectorOperator());
     collector.prefix = "" + System.identityHashCode(collector);
@@ -234,7 +234,7 @@
     {
       Map<Integer, Integer> m = loadIndicators.get();
       if (m == null) {
-        loadIndicators.set(m = new ConcurrentHashMap<Integer, Integer>());
+        loadIndicators.set(m = new ConcurrentHashMap<>());
       }
       m.put(oper.getId(), load);
     }
@@ -341,7 +341,7 @@
     Assert.assertNotNull("" + nodeMap, inputDeployed);
 
     // add tuple that matches the partition key and check that each partition receives it
-    ArrayList<Integer> inputTuples = new ArrayList<Integer>();
+    ArrayList<Integer> inputTuples = new ArrayList<>();
     LOG.debug("Number of partitions {}", partitions.size());
     for (PTOperator p: partitions) {
       // default partitioning has one port mapping with a single partition key
@@ -391,7 +391,7 @@
     @Override
     public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context)
     {
-      List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<Partition<PartitionableInputOperator>>(3);
+      List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3);
       Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator();
       Partition<PartitionableInputOperator> templatePartition;
       for (int i = 0; i < 3; i++) {
@@ -401,7 +401,7 @@
           op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty;
         }
         op.partitionProperty += "_" + i;
-        newPartitions.add(new DefaultPartition<PartitionableInputOperator>(op));
+        newPartitions.add(new DefaultPartition<>(op));
       }
       return newPartitions;
     }
@@ -431,7 +431,7 @@
       lc.runAsync();
 
       List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
-      Set<String> partProperties = new HashSet<String>();
+      Set<String> partProperties = new HashSet<>();
       for (PTOperator p : partitions) {
         LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
         Map<Integer, Node<?>> nodeMap = c.getNodes();
@@ -460,7 +460,7 @@
       PartitionLoadWatch.remove(partitions.get(0));
 
       partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
-      partProperties = new HashSet<String>();
+      partProperties = new HashSet<>();
       for (PTOperator p: partitions) {
         LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
         Map<Integer, Node<?>> nodeMap = c.getNodes();
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 35bb363..cee8247 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1007,7 +1007,7 @@
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n2meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
         /*
@@ -1036,7 +1036,7 @@
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n3meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
       }
@@ -1063,7 +1063,7 @@
       lastId = assignNewContainers(dnm, lastId);
 
       List<PTOperator> operators = plan.getOperators(n2meta);
-      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+      List<PTOperator> upstreamOperators = new ArrayList<>();
       for (PTOperator operator : operators) {
         upstreamOperators.addAll(operator.upstreamMerge.values());
         /*
@@ -1144,7 +1144,7 @@
 
   private Set<PTOperator> getUnifiers(PhysicalPlan plan)
   {
-    Set<PTOperator> unifiers = new HashSet<PTOperator>();
+    Set<PTOperator> unifiers = new HashSet<>();
     for (PTContainer container : plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
         if (operator.isUnifier()) {
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index c606f47..cb2d760 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -124,14 +124,14 @@
     input.portName = "inputPortNameOnNode";
     input.sourceNodeId = 99;
 
-    ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>();
+    ndi.inputs = new ArrayList<>();
     ndi.inputs.add(input);
 
     OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo();
     output.declaredStreamId = "streamFromNode";
     output.portName = "outputPortNameOnNode";
 
-    ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>();
+    ndi.outputs = new ArrayList<>();
     ndi.outputs.add(output);
 
     ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse();
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
index f6b7277..59f9dcc 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
@@ -34,7 +34,7 @@
 {
   ApexCli cli;
 
-  static Map<String, String> env = new HashMap<String, String>();
+  static Map<String, String> env = new HashMap<>();
   static String userHome;
 
   @Before
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
index 2ac1c50..f1356df 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
@@ -59,7 +59,7 @@
   static TemporaryFolder testFolder = new TemporaryFolder();
   ApexCli cli;
 
-  static Map<String, String> env = new HashMap<String, String>();
+  static Map<String, String> env = new HashMap<>();
   static String userHome;
 
   @BeforeClass
diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
index d1a18ae..26aced8 100644
--- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
@@ -112,8 +112,8 @@
   @Test
   public void testString()
   {
-    StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>();
-    StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>();
+    StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<>();
+    StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<>();
 
     String hello = "hello";
 
@@ -182,7 +182,7 @@
   public void testFinalFieldSerialization() throws Exception
   {
     TestTuple t1 = new TestTuple(5);
-    DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+    DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
     DataStatePair dsp = c.toDataStatePair(t1);
     TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp);
     Assert.assertEquals("", t1.finalField, t2.finalField);
@@ -208,7 +208,7 @@
     Object inner = outer.new InnerClass();
 
     for (Object o: new Object[] {outer, inner}) {
-      DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+      DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
       DataStatePair dsp = c.toDataStatePair(o);
       c.fromDataStatePair(dsp);
 
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index cc777f7..64298de 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -77,7 +77,7 @@
   @Override
   public void testNonLinearOperatorRecovery() throws InterruptedException
   {
-    final HashSet<Object> collection = new HashSet<Object>();
+    final HashSet<Object> collection = new HashSet<>();
 
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index da5c7b7..66f1b84 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -197,7 +197,7 @@
 
     };
     @OutputPortFieldAnnotation( optional = true)
-    DefaultOutputPort<Object> op = new DefaultOutputPort<Object>();
+    DefaultOutputPort<Object> op = new DefaultOutputPort<>();
 
     @Override
     public void beginWindow(long windowId)
@@ -226,7 +226,7 @@
 
   public static class CheckpointDistanceOperator extends GenericOperator
   {
-    List<Integer> distances = new ArrayList<Integer>();
+    List<Integer> distances = new ArrayList<>();
     int numWindows = 0;
     int maxWindows = 0;
 
@@ -245,7 +245,7 @@
   public void testSynchingLogic() throws InterruptedException
   {
     long sleeptime = 25L;
-    final ArrayList<Object> list = new ArrayList<Object>();
+    final ArrayList<Object> list = new ArrayList<>();
     GenericOperator go = new GenericOperator();
     final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
         new DefaultAttributeMap(), null));
@@ -376,8 +376,8 @@
     final Server bufferServer = new Server(eventloop, 0); // find random port
     final int bufferServerPort = bufferServer.run().getPort();
 
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
-    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
+    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10);
 
     GenericTestOperator go = new GenericTestOperator();
     final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
@@ -905,7 +905,7 @@
     CheckpointDistanceOperator go = new CheckpointDistanceOperator();
     go.maxWindows = maxWindows;
 
-    List<Integer> checkpoints = new ArrayList<Integer>();
+    List<Integer> checkpoints = new ArrayList<>();
 
     int window = 0;
     while (window < maxWindows) {
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 84217eb..bb2e72f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -51,10 +51,10 @@
 
   public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener<OperatorContext>
   {
-    public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<Integer>();
-    public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<Integer>();
-    private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<Integer>(1024);
-    private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<Integer>(1024);
+    public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>();
+    public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>();
+    private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024);
+    private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024);
     private volatile Thread dataGeneratorThread;
 
     @Override
@@ -179,7 +179,7 @@
     public void setConnected(boolean flag)
     {
       if (flag) {
-        collections.put(id, list = new ArrayList<T>());
+        collections.put(id, list = new ArrayList<>());
       }
     }
   }
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 55b5eab..f669832 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -171,7 +171,7 @@
 
     }
 
-    static final ArrayList<Call> calls = new ArrayList<Call>();
+    static final ArrayList<Call> calls = new ArrayList<>();
 
     @Override
     public void save(Object object, int operatorId, long windowId) throws IOException
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 3e208c8..952a36b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -152,8 +152,8 @@
 
   public static class CollectorOperator extends BaseOperator implements com.datatorrent.api.Operator.CheckpointListener
   {
-    public static HashSet<Long> collection = new HashSet<Long>(20);
-    public static ArrayList<Long> duplicates = new ArrayList<Long>();
+    public static HashSet<Long> collection = new HashSet<>(20);
+    public static ArrayList<Long> duplicates = new ArrayList<>();
     private boolean simulateFailure;
     private long checkPointWindowId;
     public final transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
@@ -211,7 +211,7 @@
   {
     public final transient MyInputPort input1 = new MyInputPort(100);
     public final transient MyInputPort input2 = new MyInputPort(200);
-    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     public class MyInputPort extends DefaultInputPort<Integer>
     {
@@ -255,7 +255,7 @@
   @SuppressWarnings("SleepWhileInLoop")
   public void testNonLinearOperatorRecovery() throws InterruptedException
   {
-    final HashSet<Object> collection = new HashSet<Object>();
+    final HashSet<Object> collection = new HashSet<>();
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
     map.put(OperatorContext.PROCESSING_MODE, processingMode);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 8ba57be..d8d97e0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -47,7 +47,7 @@
       emit = true;
     }
 
-    public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 4625368..47f1ea0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -83,7 +83,7 @@
     public static class TestInputStatsListener implements StatsListener, Serializable
     {
       private static final long serialVersionUID = 1L;
-      private List<OperatorStats> inputOperatorStats = new ArrayList<OperatorStats>();
+      private List<OperatorStats> inputOperatorStats = new ArrayList<>();
 
       @Override
       public Response processStats(BatchedOperatorStats stats)
@@ -101,7 +101,7 @@
   public static class TestCollector extends GenericTestOperator implements StatsListener
   {
     transient long windowId;
-    List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>();
+    List<OperatorStats> collectorOperatorStats = new ArrayList<>();
 
     @Override
     public Response processStats(BatchedOperatorStats stats)
@@ -129,7 +129,7 @@
     public static class TestCollectorStatsListener implements StatsListener, Serializable
     {
       private static final long serialVersionUID = 1L;
-      List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>();
+      List<OperatorStats> collectorOperatorStats = new ArrayList<>();
 
       @Override
       public Response processStats(BatchedOperatorStats stats)
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 451972e..36f9d63 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -93,7 +93,7 @@
   private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator
   {
     private transient String name;
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 
     @InputPortFieldAnnotation(optional = true)
     public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
index e6b5cd5..9e6c788 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
@@ -41,9 +41,9 @@
   private int emitInterval = 1000;
   private final int spinMillis = 50;
   private String myStringProperty;
-  private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<String>();
+  private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<>();
   @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+  public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
 
   public int getMaxTuples()
   {
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 0c95b75..85125c7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -256,7 +256,7 @@
 
   static class RandomNumberGenerator implements InputOperator
   {
-    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
index 0730289..7dc0686 100644
--- a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
@@ -220,7 +220,7 @@
     public transient String transientProperty = "transientProperty";
 
     public java.util.concurrent.ConcurrentHashMap<String, String> mapProperty = new java.util.concurrent
-        .ConcurrentHashMap<String, String>();
+        .ConcurrentHashMap<>();
 
     public java.util.concurrent.ConcurrentHashMap<String, String> getMapProperty()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index d40fd7b..9d66ca3 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -570,12 +570,12 @@
       }
     };
 
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
 
     @Override
     public Collection definePartitions(Collection partitions, PartitioningContext context)
     {
-      Collection<Partition> newPartitions = new ArrayList<Partition>();
+      Collection<Partition> newPartitions = new ArrayList<>();
 
       // Mostly for 1 partition we dont need to do this
       int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1));
@@ -590,7 +590,7 @@
         // No partitioning done so far..
         // Single partition again, but with only even numbers ok?
         PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec();
-        Partition partition = new DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+        Partition partition = new DefaultPartition<>(newInstance);
 
         // Consider partitions are 1 & 2 and we are sending only 1 partition
         // Partition 1 = even numbers
@@ -796,7 +796,7 @@
       }
     };
 
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
 
     @Override
     public Collection definePartitions(Collection partitions, PartitioningContext context)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
index 14f2b1b..705904e 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
@@ -45,7 +45,7 @@
 
 public class TestPlanContext implements PlanContext, StorageAgent
 {
-  public List<Runnable> events = new ArrayList<Runnable>();
+  public List<Runnable> events = new ArrayList<>();
   public Collection<PTOperator> undeploy;
   public Collection<PTOperator> deploy;
   public Set<PTContainer> releaseContainers;
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index 18dfd99..3999ace 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -625,7 +625,7 @@
           output.emit(tuple);
         }
       };
-      public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+      public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
     }
 
     class DummyOutputOperator extends BaseOperator
@@ -644,8 +644,8 @@
 
     class TestUnifierAttributeModule implements Module
     {
-      public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
-      public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+      public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>();
+      public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();
 
       @Override
       public void populateDAG(DAG dag, Configuration conf)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index dd32cc7..1507e2d 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -173,9 +173,9 @@
 
   public static class ValidationOperator extends BaseOperator
   {
-    public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<>();
   }
 
   public static class CounterOperator extends BaseOperator
@@ -641,7 +641,7 @@
 
   private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -653,9 +653,9 @@
   private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
   {
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -773,7 +773,7 @@
   public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
   {
     LogicalPlan dag = new LogicalPlan();
-    Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String());
+    Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String());
     Field nameField = Attribute.class.getDeclaredField("name");
     nameField.setAccessible(true);
     nameField.set(attr, "Test_Attribute");
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
index 1966678..64aaa44 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -50,7 +50,7 @@
   {
 
     Random r = new Random();
-    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -73,7 +73,7 @@
         output.emit(tuple);
       }
     };
-    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
   }
 
   /*
@@ -92,7 +92,7 @@
         output.emit(tuple);
       }
     };
-    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
   }
 
   /*
@@ -118,8 +118,8 @@
   static class TestModule implements Module
   {
 
-    public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
-    public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+    public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>();
+    public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();
 
     @Override
     public void populateDAG(DAG dag, Configuration conf)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 5b5583a..7759363 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -59,7 +59,7 @@
     private int inputOperatorProp = 0;
 
     Random r = new Random();
-    public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+    public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
index 8fad613..956db88 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
@@ -49,10 +49,10 @@
     public volatile Object inport1Tuple = null;
 
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
 
     @OutputPortFieldAnnotation(optional = true)
-    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
 
     private String emitFormat;
 
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 61a85a5..941f5a4 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -476,7 +476,7 @@
 
     OperatorMeta o1Meta = dag.getMeta(o1);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
-    TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<TestInputOperator<Object>>();
+    TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<>();
     dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner);
 
     TestPlanContext ctx = new TestPlanContext();
@@ -497,7 +497,7 @@
     plan.onStatusUpdate(o1p1);
     Assert.assertEquals("scale up triggered", 1, ctx.events.size());
     // add another partition, keep existing as is
-    partitioner.extraPartitions.add(new DefaultPartition<TestInputOperator<Object>>(o1));
+    partitioner.extraPartitions.add(new DefaultPartition<>(o1));
     Runnable r = ctx.events.remove(0);
     r.run();
     partitioner.extraPartitions.clear();
@@ -745,7 +745,7 @@
       partitions.add(new DefaultPartition<Operator>(operator, p1Keys, 1, null));
     }
 
-    ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<Partition<Operator>>();
+    ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<>();
     for (Partition<Operator> p : partitions) {
       lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), p.getPartitionKeys(), -1, null));
     }
@@ -793,7 +793,7 @@
     for (Set<PartitionKeys> expectedKeys: expectedKeysSets) {
       List<Partition<Operator>> clonePartitions = Lists.newArrayList();
       for (PartitionKeys pks: twoBitPartitionKeys) {
-        Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<InputPort<?>, PartitionKeys>();
+        Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>();
         p1Keys.put(operator.inport1, pks);
         int load = expectedKeys.contains(pks) ? 0 : -1;
         clonePartitions.add(new DefaultPartition<Operator>(operator, p1Keys, load, null));
@@ -876,7 +876,7 @@
 
     Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size());
     Set<OperatorMeta> c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), dag.getMeta(o3));
-    Set<OperatorMeta> c2ActNodes = new HashSet<OperatorMeta>();
+    Set<OperatorMeta> c2ActNodes = new HashSet<>();
     PTContainer c2 = plan.getContainers().get(1);
     for (PTOperator pNode : c2.getOperators()) {
       c2ActNodes.add(pNode.getOperatorMeta());
@@ -1139,7 +1139,7 @@
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
-    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<TestGeneratorInputOperator>();
+    TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<>();
     o1Partitioner.setPartitionCount(2);
     dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner);
     dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
@@ -1312,7 +1312,7 @@
       }
 
       Assert.assertEquals("repartition event", 1, ctx.events.size());
-      o1Partitioner.extraPartitions.add(new DefaultPartition<TestGeneratorInputOperator>(o1));
+      o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1));
       ctx.events.remove(0).run();
       o1Partitioner.extraPartitions.clear();
 
@@ -1607,7 +1607,7 @@
       }
       T paritionable = first.getPartitionedInstance();
       for (int i = partitions.size(); i < numTotal; ++i) {
-        newPartitions.add(new DefaultPartition<T>(paritionable));
+        newPartitions.add(new DefaultPartition<>(paritionable));
       }
       return newPartitions;
     }
@@ -1767,7 +1767,7 @@
 
     List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
     Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 cascadingUnifiers and one-downstream partition unifier
-    List<PTOperator> finalUnifiers = new ArrayList<PTOperator>();
+    List<PTOperator> finalUnifiers = new ArrayList<>();
     for (PTOperator o : o1Unifiers) {
       Assert.assertEquals("inputs " + o, 2, o.getInputs().size());
       Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
@@ -2054,7 +2054,7 @@
       if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) {
         // parallel partitioned, fill to requested count
         for (int i = newPartitions.size(); i < context.getParallelPartitionCount(); i++) {
-          newPartitions.add(new DefaultPartition<T>(partitions.iterator().next().getPartitionedInstance()));
+          newPartitions.add(new DefaultPartition<>(partitions.iterator().next().getPartitionedInstance()));
         }
       }
       return newPartitions;
@@ -2215,9 +2215,9 @@
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
     dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
     dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
     dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024);
-    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
     dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
     dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
@@ -2237,7 +2237,7 @@
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
     dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
     dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true);
     dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
index 77334db..2cb3e58 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
@@ -41,7 +41,7 @@
   @Test
   public void testEmergencySinks() throws InterruptedException
   {
-    final List<Object> list = new ArrayList<Object>();
+    final List<Object> list = new ArrayList<>();
     final StreamCodec<Object> myserde = new StreamCodec<Object>()
     {
       @Override
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
index e094d44..a487176 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
@@ -50,7 +50,7 @@
     byte[] buffer = publisher.consume();
 
     FastSubscriber subscriber = new FastSubscriber("subscriber", 1024);
-    subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<Object>();
+    subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<>();
     SweepableReservoir sr = subscriber.acquireReservoir("res", 1024);
     sr.setSink(new Sink<Object>()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index fd67121..32e5095 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -94,7 +94,7 @@
   @SuppressWarnings({"SleepWhileInLoop"})
   public void testBufferServerStream() throws Exception
   {
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
     final AtomicInteger messageCount = new AtomicInteger();
     Sink<Object> sink = new Sink<Object>()
     {
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index ed145c7..9db6fe9 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -55,13 +55,13 @@
   {
     final int totalTupleCount = 5000;
 
-    final PassThroughNode<Object> operator1 = new PassThroughNode<Object>();
+    final PassThroughNode<Object> operator1 = new PassThroughNode<>();
     final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(),
         null));
     node1.setId(1);
     operator1.setup(node1.context);
 
-    final PassThroughNode<Object> operator2 = new PassThroughNode<Object>();
+    final PassThroughNode<Object> operator2 = new PassThroughNode<>();
     final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(),
         null));
     node2.setId(2);
@@ -115,7 +115,7 @@
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 1024 * 5);
     node1.connectInputPort("input", reservoir1);
 
-    Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, Node<?>>();
+    Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<>();
     launchNodeThread(node1, activeNodes);
     launchNodeThread(node2, activeNodes);
     stream.activate(streamContext);
@@ -206,7 +206,7 @@
       }
 
     };
-    public final DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+    public final DefaultOutputPort<T> output = new DefaultOutputPort<>();
     private boolean logMessages = false;
 
     public boolean isLogMessages()
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index f0748eb..287a796 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -47,7 +47,7 @@
 
   public static class TestInputOperator extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
@@ -60,7 +60,7 @@
   public static class FirstGenericOperator extends BaseOperator
   {
     public static long endwindowCount;
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
     public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
     {
       @Override
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
index e614750..26e913b 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
@@ -298,7 +298,7 @@
 
   public static class ThreadIdValidatingInputOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
     public static long threadId;
 
     @Override
@@ -416,12 +416,12 @@
       assert (threadList.contains(Thread.currentThread().getId()));
     }
 
-    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
   }
 
   public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator
   {
-    public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<Long>();
+    public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<>();
   }
 
   public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator
@@ -621,9 +621,9 @@
     slc.run();
 
     Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
-    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
     Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
-    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+    Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
     Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
     Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
 
@@ -640,9 +640,9 @@
     slc.run();
 
     Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
-    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
     Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
-    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+    Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
     Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
     Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
   }
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 38460ea..15de177 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -149,7 +149,7 @@
   @Before
   public void init()
   {
-    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
     messageCount = new AtomicInteger(0);
 
     Sink<Object> sink = new Sink<Object>()
diff --git a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
index 902b8b6..af821ea 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
@@ -37,7 +37,7 @@
     public long time;
   }
 
-  PriorityQueue<TimedRunnable> queue = new PriorityQueue<TimedRunnable>(16, new Comparator<TimedRunnable>()
+  PriorityQueue<TimedRunnable> queue = new PriorityQueue<>(16, new Comparator<TimedRunnable>()
   {
     @Override
     public int compare(TimedRunnable o1, TimedRunnable o2)
diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index 0326b6a..7399aff 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -322,7 +322,7 @@
   public static class TestHomeDirectory extends TestWatcher
   {
 
-    Map<String, String> env = new HashMap<String, String>();
+    Map<String, String> env = new HashMap<>();
     String userHome;
 
     @Override
@@ -444,7 +444,7 @@
       private static final long serialVersionUID = 201404091805L;
     }
 
-    transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<OperatorWindowIdPair, Object>();
+    transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<>();
 
     @Override
     public synchronized void save(Object object, int operatorId, long windowId) throws IOException
@@ -467,7 +467,7 @@
     @Override
     public synchronized long[] getWindowIds(int operatorId) throws IOException
     {
-      ArrayList<Long> windowIds = new ArrayList<Long>();
+      ArrayList<Long> windowIds = new ArrayList<>();
       for (OperatorWindowIdPair key : store.keySet()) {
         if (key.operatorId == operatorId) {
           windowIds.add(key.windowId);
diff --git a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
index 5bc70dc..9d12fdc 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
@@ -65,7 +65,7 @@
   @Test
   public void testElement()
   {
-    StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1);
+    StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
     Integer i = 10;
     instance.add(i);
     Object result = instance.element();
@@ -78,7 +78,7 @@
   @Test
   public void testOffer()
   {
-    StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1);
+    StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
     Integer i = 10;
     assertTrue(instance.offer(i));
     Object result = instance.peek();
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 9ac28c0..e1fb860 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -109,9 +109,9 @@
     };
 
     @OutputPortFieldAnnotation(optional = false, error = true)
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<Double>();
+    public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<>();
 
     @Override
     public String getName()
@@ -417,7 +417,7 @@
   {
     String classpath = System.getProperty("java.class.path");
     String[] paths = classpath.split(":");
-    List<String> fnames = new LinkedList<String>();
+    List<String> fnames = new LinkedList<>();
     for (String cp : paths) {
       File f = new File(cp);
       if (!f.isDirectory()) {
@@ -480,7 +480,7 @@
   @Test
   public void testValueSerialization() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new TestOperator<>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -491,7 +491,7 @@
     bean.structuredArray[0].name = "s1";
     bean.color = Color.BLUE;
     bean.booleanProp = true;
-    bean.nestedList = new LinkedList<OperatorDiscoveryTest.Structured>();
+    bean.nestedList = new LinkedList<>();
     Structured st = new Structured();
     st.name = "nestedone";
     st.size = 10;
@@ -640,15 +640,15 @@
     private List<Structured> nestedList;
     private Properties props;
     private Structured nested;
-    private Map<String, Structured> map = new HashMap<String, Structured>();
+    private Map<String, Structured> map = new HashMap<>();
     private String[] stringArray;
     private Color color;
     private Structured[] structuredArray;
     private T[] genericArray;
-    private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<String, List<Map<String, Number>>>();
+    private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<>();
     private Map<?, ? super Long> wildcardType = new HashMap<Object, Number>();
-    private List<int[]> listofIntArray = new LinkedList<int[]>();
-    private List<T> parameterizedTypeVariable = new LinkedList<T>();
+    private List<int[]> listofIntArray = new LinkedList<>();
+    private List<T> parameterizedTypeVariable = new LinkedList<>();
     private Z genericType;
     private int[][] multiDimensionPrimitiveArray;
     private Structured[][] multiDimensionComplexArray;
@@ -1056,7 +1056,7 @@
   @Test
   public void testLogicalPlanConfiguration() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new InputTestOperator<>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -1113,12 +1113,12 @@
   public static class SchemaRequiredOperator extends BaseOperator implements InputOperator
   {
     @OutputPortFieldAnnotation(schemaRequired = true)
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
 
     @OutputPortFieldAnnotation(schemaRequired = false)
-    public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<>();
 
-    public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<Object>();
+    public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<>();
 
     @Override
     public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 9f414dc..76fa963 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -113,7 +113,7 @@
         lastRequests = requests;
 
         // delegate processing to dispatch thread
-        FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>()
+        FutureTask<Object> future = new FutureTask<>(new Callable<Object>()
         {
           @Override
           public Object call() throws Exception
@@ -351,7 +351,7 @@
   @Test
   public void testSubmitLogicalPlanChange() throws JSONException, Exception
   {
-    List<LogicalPlanRequest> requests = new ArrayList<LogicalPlanRequest>();
+    List<LogicalPlanRequest> requests = new ArrayList<>();
     WebResource r = resource();
 
     CreateOperatorRequest request1 = new CreateOperatorRequest();
@@ -366,7 +366,7 @@
     requests.add(request2);
 
     ObjectMapper mapper = new ObjectMapper();
-    final Map<String, Object> m = new HashMap<String, Object>();
+    final Map<String, Object> m = new HashMap<>();
     m.put("requests", requests);
     final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m));
 
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
index 09b8785..8674e9f 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
@@ -87,10 +87,10 @@
       {
       }
     };
-    final OutputPort<T2> outportT2 = new DefaultOutputPort<T2>();
-    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>();
+    final OutputPort<T2> outportT2 = new DefaultOutputPort<>();
+    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
     final StringOutputPort outportString = new StringOutputPort(this);
-    final OutputPort<List<T0>> outportList = new DefaultOutputPort<List<T0>>();
+    final OutputPort<List<T0>> outportList = new DefaultOutputPort<>();
     final GenericSubClassOutputPort outClassObject = new GenericSubClassOutputPort(this);
   }
 
@@ -107,8 +107,8 @@
       {
       }
     };
-    final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<Map<String, Number>>();
-    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>();
+    final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<>();
+    final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
     final StringOutputPort outportString = new StringOutputPort(this);
   }
 
@@ -163,7 +163,7 @@
 
   static class ParameterizedTypeOperator<T> extends BaseOperator
   {
-    final OutputPort<T> output = new DefaultOutputPort<T>();
+    final OutputPort<T> output = new DefaultOutputPort<>();
   }
 
   static class StringParameterOperator extends ParameterizedTypeOperator<String>