[CARBONDATA-2151][Streaming] Fix filter query issue on streaming table

1.Fix filter query issue for timestamp, date, decimal
2.Add more test case
dataType: int, streaming, float, double, decimal, timestamp, date, complex
operation: =, <>, >=, >, <, <=, in, like, between, is null, is not null

This closes #1969
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 689da9f..8dcac30 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1790,6 +1790,10 @@
   }
 
   public static void updateIndexOfColumnExpression(Expression exp, int dimOridnalMax) {
+    // if expression is null, not require to update index.
+    if (exp == null) {
+      return;
+    }
     if (exp.getChildren() == null || exp.getChildren().size() == 0) {
       if (exp instanceof ColumnExpression) {
         ColumnExpression ce = (ColumnExpression) exp;
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 465bee6..fad37fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -135,7 +135,7 @@
 
   @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax) {
     if (isDimensionPresentInCurrentBlock) {
-      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      byte[][] filterValues = dimColumnExecuterInfo.getExcludeFilterKeys();
       byte[] col = (byte[])value.getVal(dimColEvaluatorInfo.getDimension().getOrdinal());
       for (int i = 0; i < filterValues.length; i++) {
         if (0 == ByteUtil.UnsafeComparer.INSTANCE.compareTo(col, 0, col.length,
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
index 85891dc..53d3068 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
@@ -35,6 +35,10 @@
   BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder, boolean useBitsetPipeLine)
       throws FilterUnsupportedException, IOException;
 
+  /**
+   * apply range filter on a row
+   * @return true: if the value satisfy the filter; or else false.
+   */
   boolean applyFilter(RowIntf value, int dimOrdinalMax)
       throws FilterUnsupportedException, IOException;
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index ee373c5..797fe9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -37,6 +37,7 @@
 import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -146,6 +147,34 @@
   }
 
   /**
+   * apply range filter on a row
+   */
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+
+    byte[] col = (byte[]) value.getVal(dimColEvaluatorInfo.getDimension().getOrdinal());
+    byte[][] filterValues = this.filterRangesValues;
+
+    if (isDimensionPresentInCurrentBlock) {
+      boolean result;
+      if (greaterThanExp) {
+        result = ByteUtil.compare(filterValues[0], col) < 0;
+      } else {
+        result = ByteUtil.compare(filterValues[0], col) <= 0;
+      }
+
+      if (result) {
+        if (lessThanExp) {
+          return ByteUtil.compare(filterValues[1], col) > 0;
+        } else {
+          return ByteUtil.compare(filterValues[1], col) >= 0;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
    * Method to find presence of LessThan Expression.
    * @return
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 89489a2..8956f30 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -272,7 +272,8 @@
     return bitSetGroup;
   }
 
-  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+  @Override
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
       throws FilterUnsupportedException, IOException {
     try {
       return exp.evaluate(value).getBoolean();
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 306f3fa..3981211 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -34,6 +34,7 @@
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -248,6 +249,23 @@
     return null;
   }
 
+  @Override
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    if (isDimensionPresentInCurrentBlock[0]) {
+      byte[] col =
+          (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+      return ByteUtil.compare(filterRangeValues[0], col) < 0;
+    }
+
+    if (isMeasurePresentInCurrentBlock[0]) {
+      Object col =
+          value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+      return comparator.compare(msrFilterRangeValues[0], col) < 0;
+    }
+    return false;
+  }
+
   private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index db55e42..f2ddcb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -34,6 +34,7 @@
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -248,6 +249,23 @@
     return null;
   }
 
+  @Override
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    if (isDimensionPresentInCurrentBlock[0]) {
+      byte[] col =
+          (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+      return ByteUtil.compare(filterRangeValues[0], col) <= 0;
+    }
+
+    if (isMeasurePresentInCurrentBlock[0]) {
+      Object col =
+          value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+      return comparator.compare(msrFilterRangeValues[0], col) <= 0;
+    }
+    return false;
+  }
+
   private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 88cf75c..a44bc1a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -37,6 +37,7 @@
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -230,6 +231,23 @@
     return null;
   }
 
+  @Override
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    if (isDimensionPresentInCurrentBlock[0]) {
+      byte[] col =
+          (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+      return ByteUtil.compare(filterRangeValues[0], col) >= 0;
+    }
+
+    if (isMeasurePresentInCurrentBlock[0]) {
+      Object col =
+          value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+      return comparator.compare(msrFilterRangeValues[0], col) >= 0;
+    }
+    return false;
+  }
+
   private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 7f735c2..447ab46 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -37,6 +37,7 @@
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -232,6 +233,23 @@
     return null;
   }
 
+  @Override
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    if (isDimensionPresentInCurrentBlock[0]) {
+      byte[] col =
+          (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+      return ByteUtil.compare(filterRangeValues[0], col) > 0;
+    }
+
+    if (isMeasurePresentInCurrentBlock[0]) {
+      Object col =
+          value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+      return comparator.compare(msrFilterRangeValues[0], col) > 0;
+    }
+    return false;
+  }
+
   private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 19626f0..773089b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -607,7 +607,7 @@
               filterValues[filterMap[colCount]] = v;
             }
             if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
+              outputValues[projectionMap[colCount]] = Decimal.apply(v);
             }
           } else {
             input.skipBytes(len);
diff --git a/integration/spark-common-test/src/test/resources/streamSample.csv b/integration/spark-common-test/src/test/resources/streamSample.csv
index 590ea90..3443237 100644
--- a/integration/spark-common-test/src/test/resources/streamSample.csv
+++ b/integration/spark-common-test/src/test/resources/streamSample.csv
@@ -1,6 +1,6 @@
-id,name,city,salary,file
-100000001,batch_1,city_1,0.1,school_1:school_11$20
-100000002,batch_2,city_2,0.2,school_2:school_22$30
-100000003,batch_3,city_3,0.3,school_3:school_33$40
-100000004,batch_4,city_4,0.4,school_4:school_44$50
-100000005,batch_5,city_5,0.5,school_5:school_55$60
+id,name,city,salary,tax,percent,birthday,register,updated,file
+100000001,batch_1,city_1,0.1,0.01,80.01,1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01,school_1:school_11$20
+100000002,batch_2,city_2,0.2,0.02,80.02,1990-01-02,2010-01-02 10:01:01,2010-01-02 10:01:01,school_2:school_22$30
+100000003,batch_3,city_3,0.3,0.03,80.03,1990-01-03,2010-01-03 10:01:01,2010-01-03 10:01:01,school_3:school_33$40
+100000004,batch_4,city_4,0.4,0.04,80.04,1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01,school_4:school_44$50
+100000005,batch_5,city_5,0.5,0.05,80.05,1990-01-05,2010-01-05 10:01:01,2010-01-05 10:01:01,school_5:school_55$60
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 4b3a957..94baf86 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.carbondata
 
 import java.io.{File, PrintWriter}
+import java.math.BigDecimal
 import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
 import java.util.concurrent.Executors
 
 import scala.collection.mutable
@@ -28,14 +30,13 @@
 import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
-import org.apache.carbondata.streaming.CarbonStreamException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
@@ -43,6 +44,12 @@
   private val dataFilePath = s"$resourcesPath/streamSample.csv"
 
   override def beforeAll {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
     sql("DROP DATABASE IF EXISTS streaming CASCADE")
     sql("CREATE DATABASE streaming")
     sql("USE streaming")
@@ -235,7 +242,8 @@
     )
 
     val row = sql("select * from streaming.stream_table_file order by id").head()
-    assertResult(Row(10, "name_10", "city_10", 100000.0))(row)
+    val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
+    assertResult(exceptedRow)(row)
   }
 
   // bad records
@@ -287,12 +295,12 @@
     )
 
     // non-filter
-    val result = sql("select * from streaming.stream_table_filter order by id").collect()
+    val result = sql("select * from streaming.stream_table_filter order by id, name").collect()
     assert(result != null)
     assert(result.length == 55)
     // check one row of streaming data
-    assert(result(1).getInt(0) == 1)
-    assert(result(1).getString(1) == "name_1")
+    assert(result(1).isNullAt(0))
+    assert(result(1).getString(1) == "name_6")
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
@@ -300,35 +308,259 @@
     // filter
     checkAnswer(
       sql("select * from stream_table_filter where id = 1"),
-      Seq(Row(1, "name_1", "city_1", 10000.0)))
-
-    checkAnswer(
-      sql("select * from stream_table_filter where name = 'name_2'"),
-      Seq(Row(2, "name_2", "", 20000.0)))
-
-    checkAnswer(
-      sql("select * from stream_table_filter where city = 'city_1'"),
-      Seq(Row(1, "name_1", "city_1", 10000.0),
-        Row(100000001, "batch_1", "city_1", 0.1)))
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
 
     checkAnswer(
       sql("select * from stream_table_filter where id > 49 and id < 100000002"),
-      Seq(Row(50, "name_50", "city_50", 500000.0),
-        Row(100000001, "batch_1", "city_1", 0.1)))
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
 
     checkAnswer(
-      sql("select * from stream_table_filter where id is null"),
-      Seq(Row(null, "name_6", "city_6", 60000.0)))
+      sql("select * from stream_table_filter where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = 'name_3'"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary = 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary between 80001 and 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday = '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null order by name"),
+      Seq(Row(null, "", "", null, null, null, null, null, null),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and name <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
 
     checkAnswer(
       sql("select * from stream_table_filter where city = ''"),
-      Seq(Row(2, "name_2", "", 20000.0)))
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and city <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and tax is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and percent is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and birthday is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and register is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and updated is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
 
     // agg
     checkAnswer(
       sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
           "from stream_table_filter where id >= 2 and id <= 100000004"),
-      Seq(Row(52, 100000004, "batch_1", 7692332, 400001278)))
+      Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
 
     checkAnswer(
       sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
@@ -382,54 +614,238 @@
     )
 
     // non-filter
-    val result = sql("select * from streaming.stream_table_filter_complex order by id").collect()
+    val result = sql("select * from streaming.stream_table_filter_complex order by id, name").collect()
     assert(result != null)
     assert(result.length == 55)
     // check one row of streaming data
     assert(result(0).isNullAt(0))
-    assert(result(0).getString(1) == "name_6")
-    assert(result(0).getStruct(4).getInt(1) == 6)
+    assert(result(0).getString(1) == "")
+    assert(result(0).getStruct(9).isNullAt(1))
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
-    assert(result(50).getStruct(4).getInt(1) == 20)
+    assert(result(50).getStruct(9).getInt(1) == 20)
 
     // filter
     checkAnswer(
       sql("select * from stream_table_filter_complex where id = 1"),
-      Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1))))
-
-    checkAnswer(
-      sql("select * from stream_table_filter_complex where name = 'name_2'"),
-      Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
-
-    checkAnswer(
-      sql("select * from stream_table_filter_complex where file.age = 3"),
-      Seq(Row(3, "name_3", "city_3", 30000.0, Row(wrap(Array("school_3", "school_33")), 3))))
-
-    checkAnswer(
-      sql("select * from stream_table_filter_complex where city = 'city_1'"),
-      Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1)),
-        Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
-      Seq(Row(50, "name_50", "city_50", 500000.0, Row(wrap(Array("school_50", "school_5050")), 50)),
-        Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
 
     checkAnswer(
-      sql("select * from stream_table_filter_complex where id is null"),
-      Seq(Row(null, "name_6", "city_6", 60000.0, Row(wrap(Array("school_6", "school_66")), 6))))
+      sql("select * from stream_table_filter_complex where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = 'name_3'"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary = 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary between 80001 and 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null order by name"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null)),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and name <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and city <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and tax is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and register is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and updated is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     // agg
     checkAnswer(
       sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
           "from stream_table_filter_complex where id >= 2 and id <= 100000004"),
-      Seq(Row(52, 100000004, "batch_1", 27, 1408)))
+      Seq(Row(51, 100000004, "batch_1", 27, 1406)))
 
     checkAnswer(
       sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
@@ -715,6 +1131,7 @@
         val clientSocket = serverSocket.accept()
         val socketWriter = new PrintWriter(clientSocket.getOutputStream())
         var index = 0
+        var timeRow = true
         for (_ <- 1 to writeNums) {
           // write 5 records per iteration
           val stringBuilder = new StringBuilder()
@@ -723,22 +1140,32 @@
             if (badRecords) {
               if (index == 2) {
                 // null value
-                stringBuilder.append(index.toString + ",name_" + index
-                                     + ",," + (10000.00 * index).toString +
-                                     ",school_" + index + ":school_" + index + index + "$" + index)
+                stringBuilder.append(",,,,,,,,,")
               } else if (index == 6) {
                 // illegal number
                 stringBuilder.append(index.toString + "abc,name_" + index
-                                     + ",city_" + index + "," + (10000.00 * index).toString +
+                                     + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                     ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
                                      ",school_" + index + ":school_" + index + index + "$" + index)
               } else {
-                stringBuilder.append(index.toString + ",name_" + index
-                                     + ",city_" + index + "," + (10000.00 * index).toString +
-                                     ",school_" + index + ":school_" + index + index + "$" + index)
+
+                if (index == 9 && timeRow) {
+                  timeRow = false
+                  stringBuilder.append(index.toString + ",name_" + index
+                                       + ",city_" + index + "," + (10000.00 * index).toString + ",0.04,80.04" +
+                                       ",1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01" +
+                                       ",school_" + index + ":school_" + index + index + "$" + index)
+                } else {
+                  stringBuilder.append(index.toString + ",name_" + index
+                                       + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                       ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
+                                       ",school_" + index + ":school_" + index + index + "$" + index)
+                }
               }
             } else {
               stringBuilder.append(index.toString + ",name_" + index
-                                   + ",city_" + index + "," + (10000.00 * index).toString +
+                                   + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                   ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
                                    ",school_" + index + ":school_" + index + index + "$" + index)
             }
             stringBuilder.append("\n")
@@ -781,6 +1208,7 @@
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+            .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
             .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
             .start()
           qry.awaitTermination()
@@ -857,9 +1285,15 @@
           "name_" + id,
           "city_" + id,
           10000.00 * id,
+          BigDecimal.valueOf(0.01),
+          80.01,
+          "1990-01-01",
+          "2010-01-01 10:01:01",
+          "2010-01-01 10:01:01",
           "school_" + id + ":school_" + id + id + "$" + id)
       }
-    val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary", "file")
+    val csvDataDF = spark.createDataFrame(csvRDD).toDF(
+      "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "file")
 
     csvDataDF.write
       .option("header", "false")
@@ -875,12 +1309,6 @@
       tableIdentifier: TableIdentifier): Thread = {
     new Thread() {
       override def run(): Unit = {
-        val inputSchema = new StructType()
-          .add("id", "integer")
-          .add("name", "string")
-          .add("city", "string")
-          .add("salary", "float")
-          .add("file", "string")
         var qry: StreamingQuery = null
         try {
           val readSocketDF = spark.readStream.text(csvDataDir)
@@ -892,6 +1320,7 @@
             .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
+            .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
             .start()
 
           qry.awaitTermination()
@@ -914,11 +1343,16 @@
          | id INT,
          | name STRING,
          | city STRING,
-         | salary FLOAT
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
-         | 'sort_columns'='name', 'dictionary_include'='city')
+         | 'sort_columns'='name', 'dictionary_include'='city,register')
          | """.stripMargin)
 
     if (withBatchLoad) {
@@ -938,11 +1372,16 @@
          | name STRING,
          | city STRING,
          | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
          | file struct<school:array<string>, age:int>
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
-         | 'sort_columns'='name', 'dictionary_include'='city')
+         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated')
          | """.stripMargin)
 
     if (withBatchLoad) {