APEXMALHAR-2502 #FixKuduOutputOperator for extensibility
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
index 250334b..ff668b0 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
@@ -114,7 +114,7 @@
   private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class);
 
   @NotNull
-  private WindowDataManager windowDataManager;
+  protected WindowDataManager windowDataManager;
 
   private transient long currentWindowId;
 
@@ -231,7 +231,10 @@
   private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext)
   {
     currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode());
-    currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp());
+    Long propagatedTimeStamp = kuduExecutionContext.getPropagatedTimestamp();
+    if ( propagatedTimeStamp != null) { // set propagation timestamp only if enabled
+      currentOperation.setPropagatedTimestamp(propagatedTimeStamp);
+    }
     PartialRow partialRow = currentOperation.getRow();
     Object payload = kuduExecutionContext.getPayload();
     Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns();
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
index aed6b8b..99eaf1b 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
@@ -213,7 +213,7 @@
       return this;
     }
 
-    protected ApexKuduConnection build()
+    public ApexKuduConnection build()
     {
       ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this);
       return apexKuduConnection;
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
index 6de7190..6c7e9a6 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.kudu.client.ExternalConsistencyMode;
 import org.apache.kudu.client.SessionConfiguration;
 
@@ -65,11 +67,13 @@
 
   public BaseKuduOutputOperator() throws IOException, ClassNotFoundException
   {
+    windowDataManager = new FSWindowDataManager();
     initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
   }
 
   public BaseKuduOutputOperator(String configFileInClasspath) throws IOException, ClassNotFoundException
   {
+    windowDataManager = new FSWindowDataManager();
     initConnectionBuilderProperties(configFileInClasspath);
   }
 
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
index 27d382d..a346705 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
@@ -50,7 +50,7 @@
 
   private ExternalConsistencyMode externalConsistencyMode;
 
-  private long propagatedTimestamp;
+  private Long propagatedTimestamp;
 
   public T getPayload()
   {
@@ -82,12 +82,12 @@
     this.externalConsistencyMode = externalConsistencyMode;
   }
 
-  public long getPropagatedTimestamp()
+  public Long getPropagatedTimestamp()
   {
     return propagatedTimestamp;
   }
 
-  public void setPropagatedTimestamp(long propagatedTimestamp)
+  public void setPropagatedTimestamp(Long propagatedTimestamp)
   {
     this.propagatedTimestamp = propagatedTimestamp;
   }