APEXCORE-691 Use type inference for generic instance creation
closes #505
diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java
index 5e10ccd..304c086 100644
--- a/api/src/main/java/com/datatorrent/api/AffinityRule.java
+++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java
@@ -92,7 +92,7 @@
public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators)
{
this(type, locality, relaxLocality);
- LinkedList<String> operators = new LinkedList<String>();
+ LinkedList<String> operators = new LinkedList<>();
if (firstOperator != null && otherOperators.length >= 1) {
operators.add(firstOperator);
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 2efc84f..821ecb2 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -236,11 +236,11 @@
*/
public static class AttributeInitializer
{
- static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<Class<?>, Set<Attribute<Object>>>();
+ static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>();
public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz)
{
- Map<Attribute<Object>, Object> result = new HashMap<Attribute<Object>, Object>();
+ Map<Attribute<Object>, Object> result = new HashMap<>();
try {
for (Field f: clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
@@ -273,7 +273,7 @@
if (map.containsKey(clazz)) {
return 0;
}
- Set<Attribute<Object>> set = new HashSet<Attribute<Object>>();
+ Set<Attribute<Object>> set = new HashSet<>();
try {
for (Field f: clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 3d3cffe..94022ff 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -121,17 +121,17 @@
/**
* Number of tuples the poll buffer can cache without blocking the input stream to the port.
*/
- Attribute<Integer> QUEUE_CAPACITY = new Attribute<Integer>(1024);
+ Attribute<Integer> QUEUE_CAPACITY = new Attribute<>(1024);
/**
* The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container.
* Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may
* not be adhered to.
*/
- Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<Integer>(8 * 64);
+ Attribute<Integer> BUFFER_MEMORY_MB = new Attribute<>(8 * 64);
/**
* Poll period in milliseconds when the port buffer reaches its limits.
*/
- Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+ Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
/**
* Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge.
* Can be used to form parallel partitions that span a group of operators.
@@ -139,7 +139,7 @@
* If multiple ports of an operator have the setting, incoming streams must track back to
* a common root partition, i.e. the operator join forks of the same origin.
*/
- Attribute<Boolean> PARTITION_PARALLEL = new Attribute<Boolean>(false);
+ Attribute<Boolean> PARTITION_PARALLEL = new Attribute<>(false);
/**
* Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the
* number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions
@@ -147,7 +147,7 @@
* network I/O or other resource requirement for each unifier container (depends on the specific functionality of
* the unifier), enabling horizontal scale by overcoming the single unifier bottleneck.
*/
- Attribute<Integer> UNIFIER_LIMIT = new Attribute<Integer>(Integer.MAX_VALUE);
+ Attribute<Integer> UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE);
/**
* Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning
@@ -158,16 +158,16 @@
* the inputs. In this case the default unifier behavior can be specified on the output port and individual
* exceptions can be specified on the corresponding input ports.
*/
- Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<Boolean>(Boolean.FALSE);
+ Attribute<Boolean> UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE);
/**
* Whether or not to auto record the tuples
*/
- Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+ Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
/**
* Whether the output is unified.
* This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified.
*/
- Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<Boolean>(false);
+ Attribute<Boolean> IS_OUTPUT_UNIFIED = new Attribute<>(false);
/**
* Provide the codec which can be used to serialize or deserialize the data
* that can be received on the port. If it is unspecified the engine may use
@@ -193,13 +193,13 @@
* of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state
* is recovered.
*/
- Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID);
+ Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID);
/**
* It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the
* operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds.
* Default value is 10 milliseconds.
*/
- Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);
+ Attribute<Integer> SPIN_MILLIS = new Attribute<>(10);
/**
* The maximum number of attempts to restart a failing operator before shutting down the application.
* Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the
@@ -218,15 +218,15 @@
* by the engine. The attribute is ignored when the operator was already declared stateless through the
* {@link Stateless} annotation.
*/
- Attribute<Boolean> STATELESS = new Attribute<Boolean>(false);
+ Attribute<Boolean> STATELESS = new Attribute<>(false);
/**
* Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers.
*/
- Attribute<Integer> MEMORY_MB = new Attribute<Integer>(1024);
+ Attribute<Integer> MEMORY_MB = new Attribute<>(1024);
/**
* CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers.
*/
- Attribute<Integer> VCORES = new Attribute<Integer>(0);
+ Attribute<Integer> VCORES = new Attribute<>(0);
/**
* The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here.
@@ -235,7 +235,7 @@
/**
* Attribute of the operator that tells the platform how many streaming windows make 1 application window.
*/
- Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<Integer>(1);
+ Attribute<Integer> APPLICATION_WINDOW_COUNT = new Attribute<>(1);
/**
* When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is
* slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}.
@@ -251,7 +251,7 @@
* value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing
* will be done at application window boundary.
*/
- Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(1);
+ Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(1);
/**
* Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity).
* For example, the user may wish to specify a locality constraint for an input operator relative to its data source.
@@ -274,18 +274,18 @@
* If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators
* should be specified as AT_MOST_ONCE otherwise it will result in an error.
*/
- Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<Operator.ProcessingMode>(ProcessingMode.AT_LEAST_ONCE);
+ Attribute<Operator.ProcessingMode> PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE);
/**
* Timeout to identify stalled processing, specified as count of streaming windows. If the last processed
* window does not advance within the specified timeout count, the operator will be considered stuck and the
* container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue,
* blocking operator logic, etc.
*/
- Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<Integer>(120);
+ Attribute<Integer> TIMEOUT_WINDOW_COUNT = new Attribute<>(120);
/**
* Whether or not to auto record the tuples
*/
- Attribute<Boolean> AUTO_RECORD = new Attribute<Boolean>(false);
+ Attribute<Boolean> AUTO_RECORD = new Attribute<>(false);
/**
* How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute.
* If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the
@@ -348,7 +348,7 @@
* Name under which the application will be shown in the resource manager.
* If not set, the default is the configuration Java class or property file name.
*/
- Attribute<String> APPLICATION_NAME = new Attribute<String>("unknown-application-name");
+ Attribute<String> APPLICATION_NAME = new Attribute<>("unknown-application-name");
/**
* URL to the application's documentation.
*/
@@ -387,7 +387,7 @@
/**
* Dump extra debug information in launcher, master and containers.
*/
- Attribute<Boolean> DEBUG = new Attribute<Boolean>(false);
+ Attribute<Boolean> DEBUG = new Attribute<>(false);
/**
* The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here.
*/
@@ -396,20 +396,20 @@
* The amount of memory to be requested for the application master. Not used in local mode.
* Default value is 1GB.
*/
- Attribute<Integer> MASTER_MEMORY_MB = new Attribute<Integer>(1024);
+ Attribute<Integer> MASTER_MEMORY_MB = new Attribute<>(1024);
/**
* Where to spool the data once the buffer server capacity is reached.
*/
- Attribute<Boolean> BUFFER_SPOOLING = new Attribute<Boolean>(true);
+ Attribute<Boolean> BUFFER_SPOOLING = new Attribute<>(true);
/**
* The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms.
*/
- Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<Integer>(500);
+ Attribute<Integer> STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500);
/**
* The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator
* state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows.
*/
- Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<Integer>(60);
+ Attribute<Integer> CHECKPOINT_WINDOW_COUNT = new Attribute<>(60);
/**
* The path to store application dependencies, recording and other generated files for application master and containers.
*/
@@ -418,13 +418,13 @@
* The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
* in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k.
*/
- Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<Integer>(128 * 1024);
+ Attribute<Integer> TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024);
/**
* The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored
* in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file
* is created and the tuples start getting stored in the new file. Default value is 30hrs.
*/
- Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<Integer>(30 * 60 * 60 * 1000);
+ Attribute<Integer> TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000);
/**
* Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration.
*/
@@ -432,7 +432,7 @@
/**
* Whether or not gateway is expecting SSL connection.
*/
- Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<Boolean>(false);
+ Attribute<Boolean> GATEWAY_USE_SSL = new Attribute<>(false);
/**
* The username for logging in to the gateway, if authentication is enabled.
*/
@@ -448,48 +448,48 @@
/**
* Maximum number of simultaneous heartbeat connections to process. Default value is 30.
*/
- Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<Integer>(30);
+ Attribute<Integer> HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30);
/**
* How frequently should operators heartbeat to stram. Recommended setting is
* 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms.
*/
- Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<Integer>(1000);
+ Attribute<Integer> HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000);
/**
* Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart.
* Default value is 30s.
*/
- Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<Integer>(30 * 1000);
+ Attribute<Integer> HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000);
/**
* Timeout for allocating container resources. Default value is 60s.
*/
- Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<Integer>(Integer.MAX_VALUE);
+ Attribute<Integer> RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE);
/**
* Maximum number of windows that can be pending for statistics calculation. Statistics are computed when
* the metrics are available from all operators for a window. If the information is not available from all operators then
* the window is pending. When the number of pending windows reaches this limit the information for the oldest window
* is purged. Default value is 1000 windows.
*/
- Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<Integer>(1000);
+ Attribute<Integer> STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000);
/**
* Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false.
*/
- Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<Boolean>(false);
+ Attribute<Boolean> ENABLE_STATS_RECORDING = new Attribute<>(false);
/**
* The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or
* equal to the throughput calculation interval. The default value is 10s.
*/
- Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<Integer>(10000);
+ Attribute<Integer> THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000);
/**
* The maximum number of samples to use when calculating throughput. In practice fewer samples may be used
* if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples.
*/
- Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<Integer>(1000);
+ Attribute<Integer> THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000);
/**
* The number of samples to use when using RPC latency to compensate for clock skews and network latency when
* calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100
* samples.
*/
- Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<Integer>(100);
+ Attribute<Integer> RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100);
/**
* The agent which can be used to find the jvm options for the container.
*/
@@ -511,12 +511,12 @@
* blacklisting of nodes by application master
* Blacklisting for nodes is disabled for the default value
*/
- Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<Integer>(Integer.MAX_VALUE);
+ Attribute<Integer> MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE);
/**
* The amount of time to wait before removing failed nodes from blacklist
*/
- Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<Long>(new Long(60 * 60 * 1000));
+ Attribute<Long> BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000));
/**
* Affinity rules for specifying affinity and anti-affinity between logical operators
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index d4a0a41..fa8ab23 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -302,7 +302,7 @@
return clazz.getConstructor(String.class).newInstance(parts[1]);
} else {
T object = clazz.getConstructor(String.class).newInstance(parts[1]);
- HashMap<String, String> hashMap = new HashMap<String, String>();
+ HashMap<String, String> hashMap = new HashMap<>();
for (int i = 2; i < parts.length; i++) {
String[] keyValPair = parts[i].split(propertySeparator, 2);
hashMap.put(keyValPair[0], keyValPair[1]);
@@ -365,11 +365,11 @@
}
if (string.isEmpty()) {
- return new HashMap<K, V>();
+ return new HashMap<>();
}
String[] parts = string.split(separator);
- HashMap<K, V> map = new HashMap<K, V>();
+ HashMap<K, V> map = new HashMap<>();
for (String part : parts) {
String[] kvpair = part.split(equal, 2);
map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1]));
@@ -433,7 +433,7 @@
}
String[] parts = string.split(separator);
- ArrayList<T> arrayList = new ArrayList<T>(parts.length);
+ ArrayList<T> arrayList = new ArrayList<>(parts.length);
for (String part : parts) {
arrayList.add(codec.fromString(part));
}
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
index 82cf50e..8ff0205 100644
--- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -36,17 +36,17 @@
/**
* Parameter to specify extra jars for launch.
*/
- public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+ public static final Attribute<String> LIB_JARS = new Attribute<>(new StringCodec.String2String());
/**
* Parameter to specify the previous application id to use to resume launch from.
*/
- public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+ public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String());
/**
* Parameter to specify the queue name to use for launch.
*/
- public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+ public static final Attribute<String> QUEUE_NAME = new Attribute<>(new StringCodec.String2String());
static {
Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);
diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
index fcb1809..b463619 100644
--- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
+++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java
@@ -51,7 +51,7 @@
interface iface
{
- Attribute<Greeting> greeting = new Attribute<Greeting>(Greeting.hello);
+ Attribute<Greeting> greeting = new Attribute<>(Greeting.hello);
}
@Test
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 84999fa..d08b9fc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -336,14 +336,14 @@
{
all_listeners.add(dl);
//logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this);
- ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+ ArrayList<BitVector> partitions = new ArrayList<>();
if (dl.getPartitions(partitions) > 0) {
for (BitVector partition : partitions) {
HashSet<DataListener> set;
if (listeners.containsKey(partition)) {
set = listeners.get(partition);
} else {
- set = new HashSet<DataListener>();
+ set = new HashSet<>();
listeners.put(partition, set);
}
set.add(dl);
@@ -353,7 +353,7 @@
if (listeners.containsKey(DataListener.NULL_PARTITION)) {
set = listeners.get(DataListener.NULL_PARTITION);
} else {
- set = new HashSet<DataListener>();
+ set = new HashSet<>();
listeners.put(DataListener.NULL_PARTITION, set);
}
@@ -363,7 +363,7 @@
public void removeDataListener(DataListener dl)
{
- ArrayList<BitVector> partitions = new ArrayList<BitVector>();
+ ArrayList<BitVector> partitions = new ArrayList<>();
if (dl.getPartitions(partitions) > 0) {
for (BitVector partition : partitions) {
if (listeners.containsKey(partition)) {
@@ -459,7 +459,7 @@
// When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it.
Block b = first;
- Map<Block, Integer> indices = new HashMap<Block, Integer>();
+ Map<Block, Integer> indices = new HashMap<>();
int i = 0;
while (b != null) {
indices.put(b, i++);
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 08a483a..b06e60a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -71,8 +71,8 @@
this.identifier = identifier;
this.upstream = upstream;
this.group = group;
- this.physicalNodes = new HashSet<PhysicalNode>();
- this.partitions = new HashSet<BitVector>();
+ this.physicalNodes = new HashSet<>();
+ this.partitions = new HashSet<>();
this.iterator = iterator;
this.skipWindowId = skipWindowId;
this.eventloop = eventloop;
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7ac518b..8a56b51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -260,7 +260,7 @@
}
private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
- private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
+ private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
private final int blockSize;
private final int numberOfCacheBlocks;
@@ -883,7 +883,7 @@
}
}
- ArrayList<LogicalNode> list = new ArrayList<LogicalNode>();
+ ArrayList<LogicalNode> list = new ArrayList<>();
String publisherIdentifier = datalist.getIdentifier();
Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
while (iterator.hasNext()) {
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index 124cc5f..000ce00 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -31,7 +31,7 @@
*/
public class System
{
- private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<String, DefaultEventLoop>();
+ private static final HashMap<String, DefaultEventLoop> eventloops = new HashMap<>();
public static void startup(String identifier)
{
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index f7b8829..371f98a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -42,7 +42,7 @@
String down_type = "SubscriberId/StreamType";
String upstream_id = "PublisherId";
int mask = 7;
- ArrayList<Integer> partitions = new ArrayList<Integer>();
+ ArrayList<Integer> partitions = new ArrayList<>();
partitions.add(5);
long startingWindowId = 0xcafebabe00000078L;
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 3c0cb0e..d6e9b1a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -32,7 +32,7 @@
*/
public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber
{
- public final ArrayList<Object> resetPayloads = new ArrayList<Object>();
+ public final ArrayList<Object> resetPayloads = new ArrayList<>();
public AtomicInteger tupleCount = new AtomicInteger(0);
public WindowIdHolder firstPayload;
public WindowIdHolder lastPayload;
diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
index 165d8cf..d77b1ae 100644
--- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
+++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java
@@ -114,7 +114,7 @@
newPartitions = Lists.newArrayList();
for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) {
- newPartitions.add(new DefaultPartition<T>(partition.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
}
// partition the stream that was first connected in the DAG and send full data to remaining input ports
@@ -156,8 +156,8 @@
*/
public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions)
{
- List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
- HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<Integer, Partition<T>>();
+ List<Partition<T>> newPartitions = new ArrayList<>();
+ HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>();
for (Partition<T> p: partitions) {
int load = p.getLoad();
if (load < 0) {
@@ -201,7 +201,7 @@
}
for (int key: newKeys) {
- Partition<T> newPartition = new DefaultPartition<T>(p.getPartitionedInstance());
+ Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance());
newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
newPartitions.add(newPartition);
}
@@ -224,8 +224,8 @@
*/
public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions)
{
- List<Partition<T>> newPartitions = new ArrayList<Partition<T>>();
- List<Partition<T>> lowLoadPartitions = new ArrayList<Partition<T>>();
+ List<Partition<T>> newPartitions = new ArrayList<>();
+ List<Partition<T>> lowLoadPartitions = new ArrayList<>();
for (Partition<T> p: partitions) {
int load = p.getLoad();
if (load < 0) {
@@ -235,8 +235,8 @@
lowLoadPartitions.add(p);
}
} else if (load > 0) {
- newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
- newPartitions.add(new DefaultPartition<T>(p.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
} else {
newPartitions.add(p);
}
@@ -274,7 +274,7 @@
T anOperator = newPartitions.iterator().next().getPartitionedInstance();
while (morePartitionsToCreate-- > 0) {
- DefaultPartition<T> partition = new DefaultPartition<T>(anOperator);
+ DefaultPartition<T> partition = new DefaultPartition<>(anOperator);
newPartitions.add(partition);
}
}
diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
index dccd7b7..3dc4dda 100644
--- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
+++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java
@@ -32,17 +32,17 @@
/**
* Attribute for the user name for login.
*/
- Attribute<String> USER_NAME = new Attribute<String>((String)null);
+ Attribute<String> USER_NAME = new Attribute<>((String)null);
/**
* Attribute for the password for login.
*/
- Attribute<char[]> PASSWORD = new Attribute<char[]>((char[])null);
+ Attribute<char[]> PASSWORD = new Attribute<>((char[])null);
/**
* Attribute for the realm for login.
*/
- Attribute<String> REALM = new Attribute<String>((String)null);
+ Attribute<String> REALM = new Attribute<>((String)null);
}
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
index ca7490d..f90a888 100644
--- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -48,7 +48,7 @@
}
};
- public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+ public transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
protected List<T> lastWindowTuples = new ArrayList<>();
diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
index 7723fed..ef837a8 100644
--- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
+++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java
@@ -55,9 +55,9 @@
this.objectMapper = new ObjectMapper();
objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
- module.addSerializer(ObjectMapperString.class, new RawSerializer<Object>(Object.class));
- module.addSerializer(JSONObject.class, new RawSerializer<Object>(Object.class));
- module.addSerializer(JSONArray.class, new RawSerializer<Object>(Object.class));
+ module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class));
+ module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class));
+ module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class));
objectMapper.registerModule(module);
}
diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
index 63d1646..af5e10e 100644
--- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
+++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java
@@ -53,7 +53,7 @@
*/
public static <T> String constructPublishMessage(String topic, T data, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.PUBLISH);
pubSubMessage.setTopic(topic);
pubSubMessage.setData(data);
@@ -72,7 +72,7 @@
*/
public static <T> String constructSubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.SUBSCRIBE);
pubSubMessage.setTopic(topic);
@@ -90,7 +90,7 @@
*/
public static <T> String constructUnsubscribeMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE);
pubSubMessage.setTopic(topic);
@@ -108,7 +108,7 @@
*/
public static <T> String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS);
pubSubMessage.setTopic(topic);
@@ -126,7 +126,7 @@
*/
public static <T> String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec<T> codec) throws IOException
{
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS);
pubSubMessage.setTopic(topic);
@@ -135,7 +135,7 @@
public String formatMessage(PubSubMessage<T> pubSubMessage) throws IOException
{
- HashMap<String, Object> map = new HashMap<String, Object>();
+ HashMap<String, Object> map = new HashMap<>();
map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier());
map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic());
T data = pubSubMessage.getData();
@@ -156,7 +156,7 @@
public PubSubMessage<T> parseMessage(String message) throws IOException
{
HashMap<String, Object> map = mapper.readValue(message, HashMap.class);
- PubSubMessage<T> pubSubMessage = new PubSubMessage<T>();
+ PubSubMessage<T> pubSubMessage = new PubSubMessage<>();
pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY)));
pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY));
pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY));
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index e7c4887..2e48f54 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -41,7 +41,7 @@
public static class DummyOperator implements Operator
{
- public final DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public final DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
private Integer value;
@@ -93,10 +93,10 @@
public void partition1Test()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
- DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+ DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
partitions.add(defaultPartition);
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -111,10 +111,10 @@
public void partition5Test()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>(5);
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>(5);
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
- DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<DummyOperator>(dummyOperator);
+ DefaultPartition<DummyOperator> defaultPartition = new DefaultPartition<>(dummyOperator);
partitions.add(defaultPartition);
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
@@ -137,10 +137,10 @@
public void testParallelPartitionScaleUP()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
- partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+ partitions.add(new DefaultPartition<>(dummyOperator));
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
new PartitioningContextImpl(null, 5));
@@ -151,12 +151,12 @@
public void testParallelPartitionScaleDown()
{
DummyOperator dummyOperator = new DummyOperator(5);
- StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<DummyOperator>();
+ StatelessPartitioner<DummyOperator> statelessPartitioner = new StatelessPartitioner<>();
Collection<Partition<DummyOperator>> partitions = Lists.newArrayList();
for (int i = 5; i-- > 0; ) {
- partitions.add(new DefaultPartition<DummyOperator>(dummyOperator));
+ partitions.add(new DefaultPartition<>(dummyOperator));
}
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
index 97debe3..79fdac7 100644
--- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
+++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java
@@ -50,7 +50,7 @@
}
};
- public final transient OutputPort<T> output = new DefaultOutputPort<T>();
+ public final transient OutputPort<T> output = new DefaultOutputPort<>();
private int i;
public void setI(int i)
@@ -109,7 +109,7 @@
@Test
public void testReadResolve() throws Exception
{
- SerializableOperator<Object> pre = new SerializableOperator<Object>();
+ SerializableOperator<Object> pre = new SerializableOperator<>();
pre.setI(10);
FileOutputStream fos = new FileOutputStream(filename);
diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
index 53d91a5..80314c7 100644
--- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
@@ -73,7 +73,7 @@
for (ContainerRequest cr : requests) {
ContainerStartRequest csr = hostSpecificRequests.get(cr);
ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority());
- MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, newCr);
+ MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, newCr);
requestedResources.put(csr, pair);
containerRequests.add(newCr);
hostSpecificRequests.remove(cr);
@@ -91,7 +91,7 @@
for (Entry<ContainerRequest, ContainerStartRequest> entry : otherContainerRequests.entrySet()) {
ContainerRequest cr = entry.getKey();
ContainerStartRequest csr = entry.getValue();
- MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+ MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
requestedResources.put(csr, pair);
containerRequests.add(cr);
}
diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index e7f9672..45206bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -102,7 +102,7 @@
*/
public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
{
- MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
+ MutablePair<Integer, ContainerRequest> pair = new MutablePair<>(loopCounter, cr);
requestedResources.put(csr, pair);
containerRequests.add(cr);
}
@@ -164,7 +164,7 @@
public List<String> getNodesExceptHost(List<String> hostNames)
{
- List<String> nodesList = new ArrayList<String>();
+ List<String> nodesList = new ArrayList<>();
Set<String> hostNameSet = Sets.newHashSet();
hostNameSet.addAll(hostNames);
for (String host : nodeReportMap.keySet()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
index 3a61ee6..b374447 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java
@@ -32,10 +32,10 @@
public class PermissionsInfo
{
- private final Set<String> readOnlyRoles = new TreeSet<String>();
- private final Set<String> readOnlyUsers = new TreeSet<String>();
- private final Set<String> readWriteRoles = new TreeSet<String>();
- private final Set<String> readWriteUsers = new TreeSet<String>();
+ private final Set<String> readOnlyRoles = new TreeSet<>();
+ private final Set<String> readOnlyUsers = new TreeSet<>();
+ private final Set<String> readWriteRoles = new TreeSet<>();
+ private final Set<String> readWriteUsers = new TreeSet<>();
private boolean readOnlyEveryone = false;
private boolean readWriteEveryone = false;
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 050729d..8076d4a 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -271,7 +271,7 @@
}
Text rmTokenService = new Text(Joiner.on(',').join(services));
- return new Token<RMDelegationTokenIdentifier>(
+ return new Token<>(
rmDelegationToken.getIdentifier().array(),
rmDelegationToken.getPassword().array(),
new Text(rmDelegationToken.getKind()),
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 7113280..284aefb 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -50,8 +50,8 @@
private final int id;
private final String name;
// the size of the circular queue should be configurable. hardcoded to 1024 for now.
- private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
- private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
+ private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024);
+ private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024);
public final boolean stateless;
private int windowsFromCheckpoint;
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 401eea9..62c4fd8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -760,7 +760,7 @@
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
- StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
+ StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator);
persistOperatorPortMeta.setStreamCodec(codec);
}
}
@@ -1907,14 +1907,14 @@
HashMap<OperatorPair, AffinityRule> antiAffinities = new HashMap<>();
HashMap<OperatorPair, AffinityRule> threadLocalAffinities = new HashMap<>();
- List<String> operatorNames = new ArrayList<String>();
+ List<String> operatorNames = new ArrayList<>();
for (OperatorMeta operator : getAllOperators()) {
operatorNames.add(operator.getName());
- Set<String> containerSet = new HashSet<String>();
+ Set<String> containerSet = new HashSet<>();
containerSet.add(operator.getName());
containerAffinities.put(operator.getName(), containerSet);
- Set<String> nodeSet = new HashSet<String>();
+ Set<String> nodeSet = new HashSet<>();
nodeSet.add(operator.getName());
nodeAffinities.put(operator.getName(), nodeSet);
@@ -2073,7 +2073,7 @@
*/
public void convertRegexToList(List<String> operatorNames, AffinityRule rule)
{
- List<String> operators = new LinkedList<String>();
+ List<String> operators = new LinkedList<>();
Pattern p = Pattern.compile(rule.getOperatorRegex());
for (String name : operatorNames) {
if (p.matcher(name).matches()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ce22bfd..a1da94a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -477,13 +477,13 @@
// Log container anti-affinity
if (LOG.isDebugEnabled()) {
for (PTContainer container : containers) {
- List<String> antiOperators = new ArrayList<String>();
+ List<String> antiOperators = new ArrayList<>();
for (PTContainer c : container.getStrictAntiPrefs()) {
for (PTOperator operator : c.getOperators()) {
antiOperators.add(operator.getName());
}
}
- List<String> containerOperators = new ArrayList<String>();
+ List<String> containerOperators = new ArrayList<>();
for (PTOperator operator : container.getOperators()) {
containerOperators.add(operator.getName());
}
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 38c0f41..47986cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -116,10 +116,10 @@
*/
public PubSubWebSocketClient()
{
- throwable = new AtomicReference<Throwable>();
+ throwable = new AtomicReference<>();
ioThreadMultiplier = 1;
mapper = (new JacksonObjectMapperProvider()).getContext(null);
- codec = new PubSubMessageCodec<Object>(mapper);
+ codec = new PubSubMessageCodec<>(mapper);
AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
config.setIoThreadMultiplier(ioThreadMultiplier);
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
index dd75857..9fbb54d 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java
@@ -148,7 +148,7 @@
List<CompactAnnotationNode> annotations = new LinkedList<>();
for (Object visibleAnnotation : fn.visibleAnnotations) {
CompactAnnotationNode node = new CompactAnnotationNode();
- Map<String, Object> annotationMap = new HashMap<String, Object>();
+ Map<String, Object> annotationMap = new HashMap<>();
if (visibleAnnotation instanceof AnnotationNode) {
AnnotationNode annotation = (AnnotationNode)visibleAnnotation;
if (annotation.desc.contains("InputPortFieldAnnotation")
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
index 1e87b31..91a3cf3 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java
@@ -87,7 +87,7 @@
char boundChar;
- ArrayList<Type> bounds = new ArrayList<Type>();
+ ArrayList<Type> bounds = new ArrayList<>();
public Type[] getUpperBounds()
{
@@ -154,7 +154,7 @@
class ParameterizedTypeNode extends TypeNode
{
- ArrayList<Type> actualTypeArguments = new ArrayList<Type>();
+ ArrayList<Type> actualTypeArguments = new ArrayList<>();
public Type[] getActualTypeArguments()
{
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index d7f96d4..0c997ec 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -78,7 +78,7 @@
private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
{
@OutputPortFieldAnnotation( optional = true)
- public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
private transient int windowCount;
private int checkpointState;
@@ -326,7 +326,7 @@
public List<Checkpoint> getCheckpoints(Long... windowIds)
{
- List<Checkpoint> list = new ArrayList<Checkpoint>(windowIds.length);
+ List<Checkpoint> list = new ArrayList<>(windowIds.length);
for (Long windowId : windowIds) {
list.add(new Checkpoint(windowId, 0, 0));
}
diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
index b1509e0..b1f3363 100644
--- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java
@@ -41,7 +41,7 @@
public void testGenericOperatorPropertyCodec()
{
LogicalPlan dag = new LogicalPlan();
- Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+ Map<Class<?>, Class<? extends StringCodec<?>>> codecs = new HashMap<>();
codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class);
dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs);
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index ecbeeb6..f0199f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -86,7 +86,7 @@
/*
* Received tuples are stored in a map keyed with the system assigned operator id.
*/
- public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<String, List<Object>>();
+ public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
private transient int operatorId;
public String prefix = "";
@@ -107,7 +107,7 @@
synchronized (receivedTuples) {
List<Object> l = receivedTuples.get(id);
if (l == null) {
- l = Collections.synchronizedList(new ArrayList<Object>());
+ l = Collections.synchronizedList(new ArrayList<>());
//LOG.debug("adding {} {}", id, l);
receivedTuples.put(id, l);
}
@@ -121,12 +121,12 @@
};
@OutputPortFieldAnnotation( optional = true)
- public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
}
public static class TestInputOperator<T> extends BaseOperator implements InputOperator
{
- public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+ public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
transient boolean first;
transient long windowId;
boolean blockEndStream = false;
@@ -178,9 +178,9 @@
CollectorOperator.receivedTuples.clear();
TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
- input.testTuples = new ArrayList<List<Integer>>();
+ input.testTuples = new ArrayList<>();
for (Integer[] tuples: testData) {
- input.testTuples.add(new ArrayList<Integer>(Arrays.asList(tuples)));
+ input.testTuples.add(new ArrayList<>(Arrays.asList(tuples)));
}
CollectorOperator collector = dag.addOperator("collector", new CollectorOperator());
collector.prefix = "" + System.identityHashCode(collector);
@@ -234,7 +234,7 @@
{
Map<Integer, Integer> m = loadIndicators.get();
if (m == null) {
- loadIndicators.set(m = new ConcurrentHashMap<Integer, Integer>());
+ loadIndicators.set(m = new ConcurrentHashMap<>());
}
m.put(oper.getId(), load);
}
@@ -341,7 +341,7 @@
Assert.assertNotNull("" + nodeMap, inputDeployed);
// add tuple that matches the partition key and check that each partition receives it
- ArrayList<Integer> inputTuples = new ArrayList<Integer>();
+ ArrayList<Integer> inputTuples = new ArrayList<>();
LOG.debug("Number of partitions {}", partitions.size());
for (PTOperator p: partitions) {
// default partitioning has one port mapping with a single partition key
@@ -391,7 +391,7 @@
@Override
public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context)
{
- List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<Partition<PartitionableInputOperator>>(3);
+ List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3);
Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator();
Partition<PartitionableInputOperator> templatePartition;
for (int i = 0; i < 3; i++) {
@@ -401,7 +401,7 @@
op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty;
}
op.partitionProperty += "_" + i;
- newPartitions.add(new DefaultPartition<PartitionableInputOperator>(op));
+ newPartitions.add(new DefaultPartition<>(op));
}
return newPartitions;
}
@@ -431,7 +431,7 @@
lc.runAsync();
List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
- Set<String> partProperties = new HashSet<String>();
+ Set<String> partProperties = new HashSet<>();
for (PTOperator p : partitions) {
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
Map<Integer, Node<?>> nodeMap = c.getNodes();
@@ -460,7 +460,7 @@
PartitionLoadWatch.remove(partitions.get(0));
partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
- partProperties = new HashSet<String>();
+ partProperties = new HashSet<>();
for (PTOperator p: partitions) {
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
Map<Integer, Node<?>> nodeMap = c.getNodes();
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 35bb363..cee8247 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1007,7 +1007,7 @@
lastId = assignNewContainers(dnm, lastId);
List<PTOperator> operators = plan.getOperators(n2meta);
- List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+ List<PTOperator> upstreamOperators = new ArrayList<>();
for (PTOperator operator : operators) {
upstreamOperators.addAll(operator.upstreamMerge.values());
/*
@@ -1036,7 +1036,7 @@
lastId = assignNewContainers(dnm, lastId);
List<PTOperator> operators = plan.getOperators(n3meta);
- List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+ List<PTOperator> upstreamOperators = new ArrayList<>();
for (PTOperator operator : operators) {
upstreamOperators.addAll(operator.upstreamMerge.values());
}
@@ -1063,7 +1063,7 @@
lastId = assignNewContainers(dnm, lastId);
List<PTOperator> operators = plan.getOperators(n2meta);
- List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
+ List<PTOperator> upstreamOperators = new ArrayList<>();
for (PTOperator operator : operators) {
upstreamOperators.addAll(operator.upstreamMerge.values());
/*
@@ -1144,7 +1144,7 @@
private Set<PTOperator> getUnifiers(PhysicalPlan plan)
{
- Set<PTOperator> unifiers = new HashSet<PTOperator>();
+ Set<PTOperator> unifiers = new HashSet<>();
for (PTContainer container : plan.getContainers()) {
for (PTOperator operator : container.getOperators()) {
if (operator.isUnifier()) {
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index c606f47..cb2d760 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -124,14 +124,14 @@
input.portName = "inputPortNameOnNode";
input.sourceNodeId = 99;
- ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>();
+ ndi.inputs = new ArrayList<>();
ndi.inputs.add(input);
OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo();
output.declaredStreamId = "streamFromNode";
output.portName = "outputPortNameOnNode";
- ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>();
+ ndi.outputs = new ArrayList<>();
ndi.outputs.add(output);
ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse();
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
index f6b7277..59f9dcc 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java
@@ -34,7 +34,7 @@
{
ApexCli cli;
- static Map<String, String> env = new HashMap<String, String>();
+ static Map<String, String> env = new HashMap<>();
static String userHome;
@Before
diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
index 2ac1c50..f1356df 100644
--- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java
@@ -59,7 +59,7 @@
static TemporaryFolder testFolder = new TemporaryFolder();
ApexCli cli;
- static Map<String, String> env = new HashMap<String, String>();
+ static Map<String, String> env = new HashMap<>();
static String userHome;
@BeforeClass
diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
index d1a18ae..26aced8 100644
--- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
@@ -112,8 +112,8 @@
@Test
public void testString()
{
- StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>();
- StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>();
+ StatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<>();
+ StatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<>();
String hello = "hello";
@@ -182,7 +182,7 @@
public void testFinalFieldSerialization() throws Exception
{
TestTuple t1 = new TestTuple(5);
- DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+ DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
DataStatePair dsp = c.toDataStatePair(t1);
TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp);
Assert.assertEquals("", t1.finalField, t2.finalField);
@@ -208,7 +208,7 @@
Object inner = outer.new InnerClass();
for (Object o: new Object[] {outer, inner}) {
- DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<Object>();
+ DefaultStatefulStreamCodec<Object> c = new DefaultStatefulStreamCodec<>();
DataStatePair dsp = c.toDataStatePair(o);
c.fromDataStatePair(dsp);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index cc777f7..64298de 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -77,7 +77,7 @@
@Override
public void testNonLinearOperatorRecovery() throws InterruptedException
{
- final HashSet<Object> collection = new HashSet<Object>();
+ final HashSet<Object> collection = new HashSet<>();
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index da5c7b7..66f1b84 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -197,7 +197,7 @@
};
@OutputPortFieldAnnotation( optional = true)
- DefaultOutputPort<Object> op = new DefaultOutputPort<Object>();
+ DefaultOutputPort<Object> op = new DefaultOutputPort<>();
@Override
public void beginWindow(long windowId)
@@ -226,7 +226,7 @@
public static class CheckpointDistanceOperator extends GenericOperator
{
- List<Integer> distances = new ArrayList<Integer>();
+ List<Integer> distances = new ArrayList<>();
int numWindows = 0;
int maxWindows = 0;
@@ -245,7 +245,7 @@
public void testSynchingLogic() throws InterruptedException
{
long sleeptime = 25L;
- final ArrayList<Object> list = new ArrayList<Object>();
+ final ArrayList<Object> list = new ArrayList<>();
GenericOperator go = new GenericOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
@@ -376,8 +376,8 @@
final Server bufferServer = new Server(eventloop, 0); // find random port
final int bufferServerPort = bufferServer.run().getPort();
- final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
- final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+ final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
+ final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10);
GenericTestOperator go = new GenericTestOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
@@ -905,7 +905,7 @@
CheckpointDistanceOperator go = new CheckpointDistanceOperator();
go.maxWindows = maxWindows;
- List<Integer> checkpoints = new ArrayList<Integer>();
+ List<Integer> checkpoints = new ArrayList<>();
int window = 0;
while (window < maxWindows) {
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
index 84217eb..bb2e72f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java
@@ -51,10 +51,10 @@
public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener<OperatorContext>
{
- public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<Integer>();
- public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<Integer>();
- private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<Integer>(1024);
- private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<Integer>(1024);
+ public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>();
+ private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024);
+ private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024);
private volatile Thread dataGeneratorThread;
@Override
@@ -179,7 +179,7 @@
public void setConnected(boolean flag)
{
if (flag) {
- collections.put(id, list = new ArrayList<T>());
+ collections.put(id, list = new ArrayList<>());
}
}
}
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 55b5eab..f669832 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -171,7 +171,7 @@
}
- static final ArrayList<Call> calls = new ArrayList<Call>();
+ static final ArrayList<Call> calls = new ArrayList<>();
@Override
public void save(Object object, int operatorId, long windowId) throws IOException
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 3e208c8..952a36b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -152,8 +152,8 @@
public static class CollectorOperator extends BaseOperator implements com.datatorrent.api.Operator.CheckpointListener
{
- public static HashSet<Long> collection = new HashSet<Long>(20);
- public static ArrayList<Long> duplicates = new ArrayList<Long>();
+ public static HashSet<Long> collection = new HashSet<>(20);
+ public static ArrayList<Long> duplicates = new ArrayList<>();
private boolean simulateFailure;
private long checkPointWindowId;
public final transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
@@ -211,7 +211,7 @@
{
public final transient MyInputPort input1 = new MyInputPort(100);
public final transient MyInputPort input2 = new MyInputPort(200);
- public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
public class MyInputPort extends DefaultInputPort<Integer>
{
@@ -255,7 +255,7 @@
@SuppressWarnings("SleepWhileInLoop")
public void testNonLinearOperatorRecovery() throws InterruptedException
{
- final HashSet<Object> collection = new HashSet<Object>();
+ final HashSet<Object> collection = new HashSet<>();
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
map.put(OperatorContext.PROCESSING_MODE, processingMode);
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index 8ba57be..d8d97e0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -47,7 +47,7 @@
emit = true;
}
- public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<Integer>();
+ public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<>();
@Override
public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 4625368..47f1ea0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -83,7 +83,7 @@
public static class TestInputStatsListener implements StatsListener, Serializable
{
private static final long serialVersionUID = 1L;
- private List<OperatorStats> inputOperatorStats = new ArrayList<OperatorStats>();
+ private List<OperatorStats> inputOperatorStats = new ArrayList<>();
@Override
public Response processStats(BatchedOperatorStats stats)
@@ -101,7 +101,7 @@
public static class TestCollector extends GenericTestOperator implements StatsListener
{
transient long windowId;
- List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>();
+ List<OperatorStats> collectorOperatorStats = new ArrayList<>();
@Override
public Response processStats(BatchedOperatorStats stats)
@@ -129,7 +129,7 @@
public static class TestCollectorStatsListener implements StatsListener, Serializable
{
private static final long serialVersionUID = 1L;
- List<OperatorStats> collectorOperatorStats = new ArrayList<OperatorStats>();
+ List<OperatorStats> collectorOperatorStats = new ArrayList<>();
@Override
public Response processStats(BatchedOperatorStats stats)
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
index 451972e..36f9d63 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java
@@ -93,7 +93,7 @@
private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator
{
private transient String name;
- public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
@InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
index e6b5cd5..9e6c788 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java
@@ -41,9 +41,9 @@
private int emitInterval = 1000;
private final int spinMillis = 50;
private String myStringProperty;
- private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<String>();
+ private final ConcurrentLinkedQueue<String> externallyAddedTuples = new ConcurrentLinkedQueue<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
public int getMaxTuples()
{
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 0c95b75..85125c7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -256,7 +256,7 @@
static class RandomNumberGenerator implements InputOperator
{
- public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
@Override
public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
index 0730289..7dc0686 100644
--- a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java
@@ -220,7 +220,7 @@
public transient String transientProperty = "transientProperty";
public java.util.concurrent.ConcurrentHashMap<String, String> mapProperty = new java.util.concurrent
- .ConcurrentHashMap<String, String>();
+ .ConcurrentHashMap<>();
public java.util.concurrent.ConcurrentHashMap<String, String> getMapProperty()
{
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index d40fd7b..9d66ca3 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -570,12 +570,12 @@
}
};
- public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
@Override
public Collection definePartitions(Collection partitions, PartitioningContext context)
{
- Collection<Partition> newPartitions = new ArrayList<Partition>();
+ Collection<Partition> newPartitions = new ArrayList<>();
// Mostly for 1 partition we dont need to do this
int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1));
@@ -590,7 +590,7 @@
// No partitioning done so far..
// Single partition again, but with only even numbers ok?
PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec();
- Partition partition = new DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+ Partition partition = new DefaultPartition<>(newInstance);
// Consider partitions are 1 & 2 and we are sending only 1 partition
// Partition 1 = even numbers
@@ -796,7 +796,7 @@
}
};
- public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
@Override
public Collection definePartitions(Collection partitions, PartitioningContext context)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
index 14f2b1b..705904e 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java
@@ -45,7 +45,7 @@
public class TestPlanContext implements PlanContext, StorageAgent
{
- public List<Runnable> events = new ArrayList<Runnable>();
+ public List<Runnable> events = new ArrayList<>();
public Collection<PTOperator> undeploy;
public Collection<PTOperator> deploy;
public Set<PTContainer> releaseContainers;
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index 18dfd99..3999ace 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -625,7 +625,7 @@
output.emit(tuple);
}
};
- public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
}
class DummyOutputOperator extends BaseOperator
@@ -644,8 +644,8 @@
class TestUnifierAttributeModule implements Module
{
- public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
- public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+ public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>();
+ public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();
@Override
public void populateDAG(DAG dag, Configuration conf)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index dd32cc7..1507e2d 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -173,9 +173,9 @@
public static class ValidationOperator extends BaseOperator
{
- public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<>();
- public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<>();
}
public static class CounterOperator extends BaseOperator
@@ -641,7 +641,7 @@
private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
{
- public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
@Override
public void emitTuples()
@@ -653,9 +653,9 @@
private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
{
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
@Override
public void emitTuples()
@@ -773,7 +773,7 @@
public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
{
LogicalPlan dag = new LogicalPlan();
- Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String());
+ Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String());
Field nameField = Attribute.class.getDeclaredField("name");
nameField.setAccessible(true);
nameField.set(attr, "Test_Attribute");
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
index 1966678..64aaa44 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -50,7 +50,7 @@
{
Random r = new Random();
- public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
@Override
public void emitTuples()
@@ -73,7 +73,7 @@
output.emit(tuple);
}
};
- public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
}
/*
@@ -92,7 +92,7 @@
output.emit(tuple);
}
};
- public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
}
/*
@@ -118,8 +118,8 @@
static class TestModule implements Module
{
- public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
- public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+ public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>();
+ public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>();
@Override
public void populateDAG(DAG dag, Configuration conf)
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 5b5583a..7759363 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -59,7 +59,7 @@
private int inputOperatorProp = 0;
Random r = new Random();
- public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+ public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<>();
@Override
public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
index 8fad613..956db88 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
@@ -49,10 +49,10 @@
public volatile Object inport1Tuple = null;
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
private String emitFormat;
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 61a85a5..941f5a4 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -476,7 +476,7 @@
OperatorMeta o1Meta = dag.getMeta(o1);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
- TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<TestInputOperator<Object>>();
+ TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<>();
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner);
TestPlanContext ctx = new TestPlanContext();
@@ -497,7 +497,7 @@
plan.onStatusUpdate(o1p1);
Assert.assertEquals("scale up triggered", 1, ctx.events.size());
// add another partition, keep existing as is
- partitioner.extraPartitions.add(new DefaultPartition<TestInputOperator<Object>>(o1));
+ partitioner.extraPartitions.add(new DefaultPartition<>(o1));
Runnable r = ctx.events.remove(0);
r.run();
partitioner.extraPartitions.clear();
@@ -745,7 +745,7 @@
partitions.add(new DefaultPartition<Operator>(operator, p1Keys, 1, null));
}
- ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<Partition<Operator>>();
+ ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<>();
for (Partition<Operator> p : partitions) {
lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), p.getPartitionKeys(), -1, null));
}
@@ -793,7 +793,7 @@
for (Set<PartitionKeys> expectedKeys: expectedKeysSets) {
List<Partition<Operator>> clonePartitions = Lists.newArrayList();
for (PartitionKeys pks: twoBitPartitionKeys) {
- Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<InputPort<?>, PartitionKeys>();
+ Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>();
p1Keys.put(operator.inport1, pks);
int load = expectedKeys.contains(pks) ? 0 : -1;
clonePartitions.add(new DefaultPartition<Operator>(operator, p1Keys, load, null));
@@ -876,7 +876,7 @@
Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size());
Set<OperatorMeta> c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), dag.getMeta(o3));
- Set<OperatorMeta> c2ActNodes = new HashSet<OperatorMeta>();
+ Set<OperatorMeta> c2ActNodes = new HashSet<>();
PTContainer c2 = plan.getContainers().get(1);
for (PTOperator pNode : c2.getOperators()) {
c2ActNodes.add(pNode.getOperatorMeta());
@@ -1139,7 +1139,7 @@
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
- TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<TestGeneratorInputOperator>();
+ TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<>();
o1Partitioner.setPartitionCount(2);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
@@ -1312,7 +1312,7 @@
}
Assert.assertEquals("repartition event", 1, ctx.events.size());
- o1Partitioner.extraPartitions.add(new DefaultPartition<TestGeneratorInputOperator>(o1));
+ o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1));
ctx.events.remove(0).run();
o1Partitioner.extraPartitions.clear();
@@ -1607,7 +1607,7 @@
}
T paritionable = first.getPartitionedInstance();
for (int i = partitions.size(); i < numTotal; ++i) {
- newPartitions.add(new DefaultPartition<T>(paritionable));
+ newPartitions.add(new DefaultPartition<>(paritionable));
}
return newPartitions;
}
@@ -1767,7 +1767,7 @@
List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 cascadingUnifiers and one-downstream partition unifier
- List<PTOperator> finalUnifiers = new ArrayList<PTOperator>();
+ List<PTOperator> finalUnifiers = new ArrayList<>();
for (PTOperator o : o1Unifiers) {
Assert.assertEquals("inputs " + o, 2, o.getInputs().size());
Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
@@ -2054,7 +2054,7 @@
if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) {
// parallel partitioned, fill to requested count
for (int i = newPartitions.size(); i < context.getParallelPartitionCount(); i++) {
- newPartitions.add(new DefaultPartition<T>(partitions.iterator().next().getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition<>(partitions.iterator().next().getPartitionedInstance()));
}
}
return newPartitions;
@@ -2215,9 +2215,9 @@
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
- dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+ dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024);
- dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+ dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
@@ -2237,7 +2237,7 @@
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
- dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+ dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true);
dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
index 77334db..2cb3e58 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java
@@ -41,7 +41,7 @@
@Test
public void testEmergencySinks() throws InterruptedException
{
- final List<Object> list = new ArrayList<Object>();
+ final List<Object> list = new ArrayList<>();
final StreamCodec<Object> myserde = new StreamCodec<Object>()
{
@Override
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
index e094d44..a487176 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java
@@ -50,7 +50,7 @@
byte[] buffer = publisher.consume();
FastSubscriber subscriber = new FastSubscriber("subscriber", 1024);
- subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<Object>();
+ subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<>();
SweepableReservoir sr = subscriber.acquireReservoir("res", 1024);
sr.setSink(new Sink<Object>()
{
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index fd67121..32e5095 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -94,7 +94,7 @@
@SuppressWarnings({"SleepWhileInLoop"})
public void testBufferServerStream() throws Exception
{
- final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+ final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
final AtomicInteger messageCount = new AtomicInteger();
Sink<Object> sink = new Sink<Object>()
{
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index ed145c7..9db6fe9 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -55,13 +55,13 @@
{
final int totalTupleCount = 5000;
- final PassThroughNode<Object> operator1 = new PassThroughNode<Object>();
+ final PassThroughNode<Object> operator1 = new PassThroughNode<>();
final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(),
null));
node1.setId(1);
operator1.setup(node1.context);
- final PassThroughNode<Object> operator2 = new PassThroughNode<Object>();
+ final PassThroughNode<Object> operator2 = new PassThroughNode<>();
final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(),
null));
node2.setId(2);
@@ -115,7 +115,7 @@
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 1024 * 5);
node1.connectInputPort("input", reservoir1);
- Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, Node<?>>();
+ Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<>();
launchNodeThread(node1, activeNodes);
launchNodeThread(node2, activeNodes);
stream.activate(streamContext);
@@ -206,7 +206,7 @@
}
};
- public final DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+ public final DefaultOutputPort<T> output = new DefaultOutputPort<>();
private boolean logMessages = false;
public boolean isLogMessages()
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
index f0748eb..287a796 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java
@@ -47,7 +47,7 @@
public static class TestInputOperator extends BaseOperator implements InputOperator
{
- public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+ public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
@Override
public void emitTuples()
@@ -60,7 +60,7 @@
public static class FirstGenericOperator extends BaseOperator
{
public static long endwindowCount;
- public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+ public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
{
@Override
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
index e614750..26e913b 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
@@ -298,7 +298,7 @@
public static class ThreadIdValidatingInputOperator implements InputOperator
{
- public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+ public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
public static long threadId;
@Override
@@ -416,12 +416,12 @@
assert (threadList.contains(Thread.currentThread().getId()));
}
- public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>();
+ public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
}
public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator
{
- public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<Long>();
+ public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<>();
}
public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator
@@ -621,9 +621,9 @@
slc.run();
Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
- Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+ Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
- Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+ Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
@@ -640,9 +640,9 @@
slc.run();
Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
- Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<Long>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
+ Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
- Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<Long>(ThreadIdValidatingOutputOperator.threadList)).size());
+ Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
}
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 38460ea..15de177 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -149,7 +149,7 @@
@Before
public void init()
{
- final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+ final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
messageCount = new AtomicInteger(0);
Sink<Object> sink = new Sink<Object>()
diff --git a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
index 902b8b6..af821ea 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java
@@ -37,7 +37,7 @@
public long time;
}
- PriorityQueue<TimedRunnable> queue = new PriorityQueue<TimedRunnable>(16, new Comparator<TimedRunnable>()
+ PriorityQueue<TimedRunnable> queue = new PriorityQueue<>(16, new Comparator<TimedRunnable>()
{
@Override
public int compare(TimedRunnable o1, TimedRunnable o2)
diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index 0326b6a..7399aff 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -322,7 +322,7 @@
public static class TestHomeDirectory extends TestWatcher
{
- Map<String, String> env = new HashMap<String, String>();
+ Map<String, String> env = new HashMap<>();
String userHome;
@Override
@@ -444,7 +444,7 @@
private static final long serialVersionUID = 201404091805L;
}
- transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<OperatorWindowIdPair, Object>();
+ transient HashMap<OperatorWindowIdPair, Object> store = new HashMap<>();
@Override
public synchronized void save(Object object, int operatorId, long windowId) throws IOException
@@ -467,7 +467,7 @@
@Override
public synchronized long[] getWindowIds(int operatorId) throws IOException
{
- ArrayList<Long> windowIds = new ArrayList<Long>();
+ ArrayList<Long> windowIds = new ArrayList<>();
for (OperatorWindowIdPair key : store.keySet()) {
if (key.operatorId == operatorId) {
windowIds.add(key.windowId);
diff --git a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
index 5bc70dc..9d12fdc 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java
@@ -65,7 +65,7 @@
@Test
public void testElement()
{
- StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1);
+ StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
Integer i = 10;
instance.add(i);
Object result = instance.element();
@@ -78,7 +78,7 @@
@Test
public void testOffer()
{
- StablePriorityQueue<Integer> instance = new StablePriorityQueue<Integer>(1);
+ StablePriorityQueue<Integer> instance = new StablePriorityQueue<>(1);
Integer i = 10;
assertTrue(instance.offer(i));
Object result = instance.peek();
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 9ac28c0..e1fb860 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -109,9 +109,9 @@
};
@OutputPortFieldAnnotation(optional = false, error = true)
- public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
- public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<Double>();
+ public final transient DefaultOutputPort<Double> output1 = new DefaultOutputPort<>();
@Override
public String getName()
@@ -417,7 +417,7 @@
{
String classpath = System.getProperty("java.class.path");
String[] paths = classpath.split(":");
- List<String> fnames = new LinkedList<String>();
+ List<String> fnames = new LinkedList<>();
for (String cp : paths) {
File f = new File(cp);
if (!f.isDirectory()) {
@@ -480,7 +480,7 @@
@Test
public void testValueSerialization() throws Exception
{
- TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>();
+ TestOperator<String, Map<String, Number>> bean = new TestOperator<>();
bean.map.put("key1", new Structured());
bean.stringArray = new String[]{"one", "two", "three"};
bean.stringList = Lists.newArrayList("four", "five");
@@ -491,7 +491,7 @@
bean.structuredArray[0].name = "s1";
bean.color = Color.BLUE;
bean.booleanProp = true;
- bean.nestedList = new LinkedList<OperatorDiscoveryTest.Structured>();
+ bean.nestedList = new LinkedList<>();
Structured st = new Structured();
st.name = "nestedone";
st.size = 10;
@@ -640,15 +640,15 @@
private List<Structured> nestedList;
private Properties props;
private Structured nested;
- private Map<String, Structured> map = new HashMap<String, Structured>();
+ private Map<String, Structured> map = new HashMap<>();
private String[] stringArray;
private Color color;
private Structured[] structuredArray;
private T[] genericArray;
- private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<String, List<Map<String, Number>>>();
+ private Map<String, List<Map<String, Number>>> nestedParameterizedType = new HashMap<>();
private Map<?, ? super Long> wildcardType = new HashMap<Object, Number>();
- private List<int[]> listofIntArray = new LinkedList<int[]>();
- private List<T> parameterizedTypeVariable = new LinkedList<T>();
+ private List<int[]> listofIntArray = new LinkedList<>();
+ private List<T> parameterizedTypeVariable = new LinkedList<>();
private Z genericType;
private int[][] multiDimensionPrimitiveArray;
private Structured[][] multiDimensionComplexArray;
@@ -1056,7 +1056,7 @@
@Test
public void testLogicalPlanConfiguration() throws Exception
{
- TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>();
+ TestOperator<String, Map<String, Number>> bean = new InputTestOperator<>();
bean.map.put("key1", new Structured());
bean.stringArray = new String[]{"one", "two", "three"};
bean.stringList = Lists.newArrayList("four", "five");
@@ -1113,12 +1113,12 @@
public static class SchemaRequiredOperator extends BaseOperator implements InputOperator
{
@OutputPortFieldAnnotation(schemaRequired = true)
- public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(schemaRequired = false)
- public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output1 = new DefaultOutputPort<>();
- public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> output2 = new DefaultOutputPort<>();
@Override
public void emitTuples()
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index 9f414dc..76fa963 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -113,7 +113,7 @@
lastRequests = requests;
// delegate processing to dispatch thread
- FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>()
+ FutureTask<Object> future = new FutureTask<>(new Callable<Object>()
{
@Override
public Object call() throws Exception
@@ -351,7 +351,7 @@
@Test
public void testSubmitLogicalPlanChange() throws JSONException, Exception
{
- List<LogicalPlanRequest> requests = new ArrayList<LogicalPlanRequest>();
+ List<LogicalPlanRequest> requests = new ArrayList<>();
WebResource r = resource();
CreateOperatorRequest request1 = new CreateOperatorRequest();
@@ -366,7 +366,7 @@
requests.add(request2);
ObjectMapper mapper = new ObjectMapper();
- final Map<String, Object> m = new HashMap<String, Object>();
+ final Map<String, Object> m = new HashMap<>();
m.put("requests", requests);
final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m));
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
index 09b8785..8674e9f 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java
@@ -87,10 +87,10 @@
{
}
};
- final OutputPort<T2> outportT2 = new DefaultOutputPort<T2>();
- final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>();
+ final OutputPort<T2> outportT2 = new DefaultOutputPort<>();
+ final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
final StringOutputPort outportString = new StringOutputPort(this);
- final OutputPort<List<T0>> outportList = new DefaultOutputPort<List<T0>>();
+ final OutputPort<List<T0>> outportList = new DefaultOutputPort<>();
final GenericSubClassOutputPort outClassObject = new GenericSubClassOutputPort(this);
}
@@ -107,8 +107,8 @@
{
}
};
- final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<Map<String, Number>>();
- final OutputPort<Number> outportNumberParam = new DefaultOutputPort<Number>();
+ final OutputPort<Map<String, Number>> outportT2 = new DefaultOutputPort<>();
+ final OutputPort<Number> outportNumberParam = new DefaultOutputPort<>();
final StringOutputPort outportString = new StringOutputPort(this);
}
@@ -163,7 +163,7 @@
static class ParameterizedTypeOperator<T> extends BaseOperator
{
- final OutputPort<T> output = new DefaultOutputPort<T>();
+ final OutputPort<T> output = new DefaultOutputPort<>();
}
static class StringParameterOperator extends ParameterizedTypeOperator<String>