APEXMALHAR-2502 #FixKuduOutputOperator for extensibility
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
index 250334b..ff668b0 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
@@ -114,7 +114,7 @@
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class);
@NotNull
- private WindowDataManager windowDataManager;
+ protected WindowDataManager windowDataManager;
private transient long currentWindowId;
@@ -231,7 +231,10 @@
private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext)
{
currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode());
- currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp());
+ Long propagatedTimeStamp = kuduExecutionContext.getPropagatedTimestamp();
+ if ( propagatedTimeStamp != null) { // set propagation timestamp only if enabled
+ currentOperation.setPropagatedTimestamp(propagatedTimeStamp);
+ }
PartialRow partialRow = currentOperation.getRow();
Object payload = kuduExecutionContext.getPayload();
Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns();
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
index aed6b8b..99eaf1b 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
@@ -213,7 +213,7 @@
return this;
}
- protected ApexKuduConnection build()
+ public ApexKuduConnection build()
{
ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this);
return apexKuduConnection;
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
index 6de7190..6c7e9a6 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.Properties;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.kudu.client.ExternalConsistencyMode;
import org.apache.kudu.client.SessionConfiguration;
@@ -65,11 +67,13 @@
public BaseKuduOutputOperator() throws IOException, ClassNotFoundException
{
+ windowDataManager = new FSWindowDataManager();
initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
}
public BaseKuduOutputOperator(String configFileInClasspath) throws IOException, ClassNotFoundException
{
+ windowDataManager = new FSWindowDataManager();
initConnectionBuilderProperties(configFileInClasspath);
}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
index 27d382d..a346705 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
@@ -50,7 +50,7 @@
private ExternalConsistencyMode externalConsistencyMode;
- private long propagatedTimestamp;
+ private Long propagatedTimestamp;
public T getPayload()
{
@@ -82,12 +82,12 @@
this.externalConsistencyMode = externalConsistencyMode;
}
- public long getPropagatedTimestamp()
+ public Long getPropagatedTimestamp()
{
return propagatedTimestamp;
}
- public void setPropagatedTimestamp(long propagatedTimestamp)
+ public void setPropagatedTimestamp(Long propagatedTimestamp)
{
this.propagatedTimestamp = propagatedTimestamp;
}