APEXCORE-722 Made data members for Default Port classes as private with protected access methods
diff --git a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
index dc8705c..6957105 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.api;
+import org.apache.hadoop.classification.InterfaceStability;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.Operator.InputPort;
@@ -31,8 +33,8 @@
*/
public abstract class DefaultInputPort<T> implements InputPort<T>, Sink<T>
{
- protected int count;
- protected boolean connected = false;
+ private int count;
+ private boolean connected = false;
/**
* <p>Constructor for DefaultInputPort.</p>
@@ -109,4 +111,9 @@
*/
public abstract void process(T tuple);
+ @InterfaceStability.Evolving
+ protected int incrementCount()
+ {
+ return ++count;
+ }
}
diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
index acd562f..7892fdb 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
@@ -37,7 +37,7 @@
public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable";
private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class);
- protected transient Sink<Object> sink;
+ private transient Sink<Object> sink;
private transient Thread operatorThread;
/**
diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
index f17d540..5f965bc 100644
--- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
@@ -36,7 +36,7 @@
@Override
public boolean putControl(ControlTuple payload)
{
- count++;
+ incrementCount();
return processControl(payload);
}
diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
index 9400bbd..ee55284 100644
--- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
@@ -36,7 +36,7 @@
{
public ControlAwareDefaultOutputPort()
{
- sink = ControlTupleEnabledSink.BLACKHOLE;
+ setSink(ControlTupleEnabledSink.BLACKHOLE);
}
/**
@@ -46,18 +46,18 @@
public void emitControl(ControlTuple tuple)
{
verifyOperatorThread();
- ((ControlTupleEnabledSink)sink).putControl(tuple);
+ ((ControlTupleEnabledSink)getSink()).putControl(tuple);
}
public boolean isConnected()
{
- return sink != ControlTupleEnabledSink.BLACKHOLE;
+ return getSink() != ControlTupleEnabledSink.BLACKHOLE;
}
@Override
public void setSink(Sink<Object> s)
{
- this.sink = (s == null ? ControlTupleEnabledSink.BLACKHOLE : s);
+ super.setSink(s == null ? ControlTupleEnabledSink.BLACKHOLE : s);
}
}
diff --git a/pom.xml b/pom.xml
index 5774bb7..ff91946 100644
--- a/pom.xml
+++ b/pom.xml
@@ -440,6 +440,9 @@
<excludes>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
+ <!-- TODO This needs to be removed once 3.7.0 is released and old version points to 3.7.0 -->
+ <exclude>com.datatorrent.api.DefaultInputPort</exclude>
+ <exclude>com.datatorrent.api.DefaultOutputPort</exclude>
</excludes>
</parameter>
</configuration>