[CARBONDATA-3830] Support Array and Struct of all primitive type reading from presto

Why is this PR needed?
Currently, presto cannot read complex data type stores. Sometimes it gives empty results and some times exception.

What changes were proposed in this PR?
Supported all the 13 complex primitive types (including binary, refer the testcase added) with non-nested array and struct data type

Supported complex type in Direct vector filling flow :
Currently, spark integration carbondata will use row level filling for complex type instead of vector filling. But presto supports only vector reading. so need to support complex type in vector filling.

Supported complex primitive vector handling in DIRECT_COMPESS, ADAPTIVE_CODEC flows
Encoding of all the complex primitive type is either DIRECT_COMPESS or ADAPTIVE_CODEC, it will never use a legacy encoding. so, because of this string, varchar (with/without local dictionary), binary, date vector filling need to handle in DIRECT_COMPESS. Parent column also comes as DIRECT_COMPESS. Extracted data from parent column page here.

Supported vector stack in complex column vectorInfo to store all the children vectors.

Keep a list of children vector inside CarbonColumnVectorImpl.java

Support ComplexStreamReader to fill presto ROW (struct) block and ARRAY block.

Handle null value filling by wrapping children vector with ColumnarVectorWrapperDirect

Limitations / next work:
Some pending TODO 's are,

Local dictionary need to handle for string / varchar columns as DIRECT_COMPRESS flow don't have that handling
Can support map of all primitive types
Can support multilevel nested arrays and struct

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes [Added test case for all 13 primitive type with array and struct, null values and more than one page data]

This closes #3887

Co-authored-by: akkio-97 <akshay.nuthala@gmail.com>
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java
index d53c9d3..2538687 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java
@@ -257,6 +257,11 @@
           .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
               nullBitSet, isLocalDictEncodedPage, pageMetadata.numberOfRowsInpage,
               reusableDataBuffer);
+      if (vectorInfo.vector.getType().isComplexType() && !vectorInfo.vectorStack.isEmpty()) {
+        // For complex type, always top of the vector stack is processed in decodeAndFillVector.
+        // so, pop() the top vector as its processing is finished.
+        vectorInfo.vectorStack.pop();
+      }
       return null;
     } else {
       return decoder
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index de2b720..c3aa63f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -70,11 +70,11 @@
     }
     BitSet nullBitset = new BitSet();
     CarbonColumnVector dictionaryVector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector.getDictionaryVector(), invertedIndex, nullBitset,
-            vectorInfo.deletedRows, false, true);
+        .getDirectVectorWrapperFactory(vectorInfo, vector.getDictionaryVector(), invertedIndex,
+            nullBitset, vectorInfo.deletedRows, false, true);
     vector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBitset, vectorInfo.deletedRows,
-            false, false);
+        .getDirectVectorWrapperFactory(vectorInfo, vector, invertedIndex, nullBitset,
+            vectorInfo.deletedRows, false, false);
     for (int i = 0; i < rowsNum; i++) {
       int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
       if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 4327b7e..4c59181 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -56,7 +56,8 @@
     BitSet deletedRows = vectorInfo.deletedRows;
     BitSet nullBits = new BitSet(numOfRows);
     vector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false, false);
+        .getDirectVectorWrapperFactory(vectorInfo, vector, invertedIndex, nullBits, deletedRows,
+            false, false);
     fillVector(data, vectorInfo, vector);
     if (vector instanceof ConvertibleVector) {
       ((ConvertibleVector) vector).convert();
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 972fa97..0f3e555 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -109,8 +109,8 @@
     AbstractNonDictionaryVectorFiller vectorFiller = NonDictionaryVectorFillerFactory
         .getVectorFiller(getLengthSize(), dt, numberOfRows, dataLength);
     vector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector, invertedIndex, new BitSet(), vectorInfo.deletedRows,
-            false, false);
+        .getDirectVectorWrapperFactory(vectorInfo, vector, invertedIndex, new BitSet(),
+            vectorInfo.deletedRows, false, false);
     vectorFiller.fillVector(data, vector);
     if (vector instanceof ConvertibleVector) {
       ((ConvertibleVector) vector).convert();
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 235deb8..4928646 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -254,10 +254,16 @@
         DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
       BitSet deletedRows = vectorInfo.deletedRows;
-      DataType vectorDataType = vector.getType();
+      pageSize = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, pageSize);
       vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true, false);
+          .getDirectVectorWrapperFactory(vectorInfo, vector, null, nullBits, deletedRows, true,
+              false);
+      DataType vectorDataType = vector.getType();
       int rowId = 0;
+      int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
+      int shortIntSizeInBytes = DataTypes.SHORT_INT.getSizeInBytes();
+      int intSizeInBytes = DataTypes.INT.getSizeInBytes();
+      int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
       if (vectorDataType == DataTypes.FLOAT) {
         float floatFactor = factor.floatValue();
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
@@ -265,23 +271,30 @@
             vector.putFloat(i, (max - pageData[i]) / floatFactor);
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          int size = pageSize * shortSizeInBytes;
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector
                 .putFloat(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)) / floatFactor);
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          int size = pageSize * shortIntSizeInBytes;
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
             vector.putFloat(rowId++, (max - shortInt) / floatFactor);
           }
         } else if (pageDataType == DataTypes.INT) {
-          int size = pageSize * DataTypes.INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          int size = pageSize * intSizeInBytes;
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putFloat(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)) / floatFactor);
           }
+        } else if (pageDataType == DataTypes.LONG) {
+          // complex primitive float type can enter here.
+          int size = pageSize * longSizeInBytes;
+          for (int i = 0; i < size; i += longSizeInBytes) {
+            vector.putFloat(rowId++,
+                (float) ((max - ByteUtil.toLongLittleEndian(pageData, i)) / factor));
+          }
         } else {
           throw new RuntimeException("internal error: " + this.toString());
         }
@@ -291,25 +304,25 @@
             vector.putDouble(rowId++, (max - pageData[i]) / factor);
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          int size = pageSize * shortSizeInBytes;
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putDouble(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)) / factor);
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          int size = pageSize * shortIntSizeInBytes;
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
             vector.putDouble(rowId++, (max - shortInt) / factor);
           }
         } else if (pageDataType == DataTypes.INT) {
-          int size = pageSize * DataTypes.INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          int size = pageSize * intSizeInBytes;
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putDouble(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)) / factor);
           }
         } else if (pageDataType == DataTypes.LONG) {
-          int size = pageSize * DataTypes.LONG.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          int size = pageSize * longSizeInBytes;
+          for (int i = 0; i < size; i += longSizeInBytes) {
             vector.putDouble(rowId++, (max - ByteUtil.toLongLittleEndian(pageData, i)) / factor);
           }
         } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index a7b9e5b..29c8b7b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -311,12 +311,8 @@
     public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
         DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      DataType vectorDataType = vector.getType();
       BitSet deletedRows = vectorInfo.deletedRows;
-      vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
-              true, false);
-      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      fillVector(pageData, vector, pageDataType, pageSize, vectorInfo, nullBits);
       if ((deletedRows == null || deletedRows.isEmpty())
           && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
@@ -328,8 +324,14 @@
       }
     }
 
-    private void fillVector(byte[] pageData, CarbonColumnVector vector,
-        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+    private void fillVector(byte[] pageData, CarbonColumnVector vector, DataType pageDataType,
+        int pageSize, ColumnVectorInfo vectorInfo, BitSet nullBits) {
+      // get the updated values if it is decode of child vector
+      pageSize = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, pageSize);
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vectorInfo, vector, null, nullBits, vectorInfo.deletedRows,
+              true, false);
+      DataType vectorDataType = vector.getType();
       int newScale = 0;
       if (vectorInfo.measure != null) {
         newScale = vectorInfo.measure.getMeasure().getScale();
@@ -352,7 +354,7 @@
           for (int i = 0; i < pageSize; i++) {
             vector.putLong(i, (max - (long) pageData[i]) * 1000);
           }
-        } else if (vectorDataType == DataTypes.BOOLEAN) {
+        } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
           for (int i = 0; i < pageSize; i++) {
             vector.putByte(i, (byte) (max - pageData[i]));
           }
@@ -376,28 +378,29 @@
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+        int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
+        int size = pageSize * shortSizeInBytes;
         if (vectorDataType == DataTypes.SHORT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putShort(rowId++, (short) (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putInt(rowId++, (int) (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putLong(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector
                 .putLong(rowId++, (max - (long) ByteUtil.toShortLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             BigDecimal decimal =
                 decimalConverter.getDecimal(max - ByteUtil.toShortLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
@@ -406,28 +409,29 @@
             vector.putDecimal(rowId++, decimal, precision);
           }
         } else if (vectorDataType == DataTypes.FLOAT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putFloat(rowId++, (int) (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putDouble(rowId++, (max - ByteUtil.toShortLittleEndian(pageData, i)));
           }
         }
       } else if (pageDataType == DataTypes.SHORT_INT) {
-        int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
+        int shortIntSizeInBytes = DataTypes.SHORT_INT.getSizeInBytes();
+        int size = pageSize * shortIntSizeInBytes;
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
             vector.putInt(rowId++, (int) (max - shortInt));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
             vector.putLong(rowId++, (max - shortInt));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             vector.putLong(rowId++, (max - (long) ByteUtil.valueOf3Bytes(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
@@ -442,34 +446,35 @@
             vector.putDecimal(i, decimal, precision);
           }
         } else if (vectorDataType == DataTypes.FLOAT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
             vector.putFloat(rowId++, (int) (max - shortInt));
           }
         } else {
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             int shortInt = ByteUtil.valueOf3Bytes(pageData, i);
             vector.putDouble(rowId++, (max - shortInt));
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int size = pageSize * DataTypes.INT.getSizeInBytes();
+        int intSizeInBytes = DataTypes.INT.getSizeInBytes();
+        int size = pageSize * intSizeInBytes;
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putInt(rowId++, (int) (max - ByteUtil.toIntLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putLong(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putLong(rowId++, (max - (long) ByteUtil.toIntLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             BigDecimal decimal =
                 decimalConverter.getDecimal(max - ByteUtil.toIntLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
@@ -478,24 +483,25 @@
             vector.putDecimal(rowId++, decimal, precision);
           }
         } else {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putDouble(rowId++, (max - ByteUtil.toIntLittleEndian(pageData, i)));
           }
         }
       } else if (pageDataType == DataTypes.LONG) {
-        int size = pageSize * DataTypes.LONG.getSizeInBytes();
+        int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
+        int size = pageSize * longSizeInBytes;
         if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          for (int i = 0; i < size; i += longSizeInBytes) {
             vector.putLong(rowId++, (max - ByteUtil.toLongLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          for (int i = 0; i < size; i += longSizeInBytes) {
             vector.putLong(rowId++, (max - ByteUtil.toLongLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           int precision = vectorInfo.measure.getMeasure().getPrecision();
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          for (int i = 0; i < size; i += longSizeInBytes) {
             BigDecimal decimal =
                 decimalConverter.getDecimal(max - ByteUtil.toLongLittleEndian(pageData, i));
             if (decimal.scale() < newScale) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 92ea181..d07eaa7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -244,31 +244,44 @@
         DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
       BitSet deletedRows = vectorInfo.deletedRows;
-      DataType vectorDataType = vector.getType();
-      vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true, false);
       int rowId = 0;
+      // get the updated values if it is decode of child vector
+      pageSize = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, pageSize);
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vectorInfo, vector, null, nullBits, vectorInfo.deletedRows,
+              true, false);
+      DataType vectorDataType = vector.getType();
+      int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
+      int shortIntSizeInBytes = DataTypes.SHORT_INT.getSizeInBytes();
+      int intSizeInBytes = DataTypes.INT.getSizeInBytes();
+      int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
       if (vectorDataType == DataTypes.FLOAT) {
         if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
           for (int i = 0; i < pageSize; i++) {
             vector.putFloat(i, (pageData[i] / floatFactor));
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          int size = pageSize * shortSizeInBytes;
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putFloat(rowId++, (ByteUtil.toShortLittleEndian(pageData, i) / floatFactor));
           }
 
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          int size = pageSize * shortIntSizeInBytes;
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             vector.putFloat(rowId++, (ByteUtil.valueOf3Bytes(pageData, i) / floatFactor));
           }
         } else if (pageDataType == DataTypes.INT) {
-          int size = pageSize * DataTypes.INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          int size = pageSize * intSizeInBytes;
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putFloat(rowId++, (ByteUtil.toIntLittleEndian(pageData, i) / floatFactor));
           }
+        } else if (pageDataType == DataTypes.LONG) {
+          // complex primitive float type can enter here.
+          int size = pageSize * longSizeInBytes;
+          for (int i = 0; i < size; i += longSizeInBytes) {
+            vector.putFloat(rowId++, (float) (ByteUtil.toLongLittleEndian(pageData, i) / factor));
+          }
         } else {
           throw new RuntimeException("internal error: " + this.toString());
         }
@@ -278,24 +291,24 @@
             vector.putDouble(i, (pageData[i] / factor));
           }
         } else if (pageDataType == DataTypes.SHORT) {
-          int size = pageSize * DataTypes.SHORT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          int size = pageSize * shortSizeInBytes;
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putDouble(rowId++, (ByteUtil.toShortLittleEndian(pageData, i) / factor));
           }
         } else if (pageDataType == DataTypes.SHORT_INT) {
-          int size = pageSize * DataTypes.SHORT_INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.SHORT_INT.getSizeInBytes()) {
+          int size = pageSize * shortIntSizeInBytes;
+          for (int i = 0; i < size; i += shortIntSizeInBytes) {
             vector.putDouble(rowId++, (ByteUtil.valueOf3Bytes(pageData, i) / factor));
           }
 
         } else if (pageDataType == DataTypes.INT) {
-          int size = pageSize * DataTypes.INT.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          int size = pageSize * intSizeInBytes;
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putDouble(rowId++, (ByteUtil.toIntLittleEndian(pageData, i) / factor));
           }
         } else if (pageDataType == DataTypes.LONG) {
-          int size = pageSize * DataTypes.LONG.getSizeInBytes();
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          int size = pageSize * longSizeInBytes;
+          for (int i = 0; i < size; i += longSizeInBytes) {
             vector.putDouble(rowId++, (ByteUtil.toLongLittleEndian(pageData, i) / factor));
           }
         } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 812793f..7c8ff75 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -285,12 +285,8 @@
     public void decodeAndFillVector(byte[] pageData, ColumnVectorInfo vectorInfo, BitSet nullBits,
         DataType pageDataType, int pageSize) {
       CarbonColumnVector vector = vectorInfo.vector;
-      DataType vectorDataType = vector.getType();
       BitSet deletedRows = vectorInfo.deletedRows;
-      vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
-              true, false);
-      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
+      fillVector(pageData, vector, pageDataType, pageSize, vectorInfo, nullBits);
       if ((deletedRows == null || deletedRows.isEmpty())
           && !(vectorInfo.vector instanceof SequentialFill)) {
         for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
@@ -303,8 +299,14 @@
 
     }
 
-    private void fillVector(byte[] pageData, CarbonColumnVector vector, DataType vectorDataType,
-        DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo, BitSet nullBits) {
+    private void fillVector(byte[] pageData, CarbonColumnVector vector, DataType pageDataType,
+        int pageSize, ColumnVectorInfo vectorInfo, BitSet nullBits) {
+      // get the updated values if it is decode of child vector
+      pageSize = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, pageSize);
+      vector = ColumnarVectorWrapperDirectFactory
+          .getDirectVectorWrapperFactory(vectorInfo, vector, null, nullBits, vectorInfo.deletedRows,
+              true, false);
+      DataType vectorDataType = vector.getType();
       int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
         if (vectorDataType == DataTypes.SHORT) {
@@ -323,7 +325,7 @@
           for (int i = 0; i < pageSize; i++) {
             vector.putLong(i, (long) pageData[i] * 1000);
           }
-        } else if (vectorDataType == DataTypes.BOOLEAN) {
+        } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
           vector.putBytes(0, pageSize, pageData, 0);
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
@@ -338,32 +340,33 @@
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+        int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
+        int size = pageSize * shortSizeInBytes;
         if (vectorDataType == DataTypes.SHORT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putShort(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putInt(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putLong(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putLong(rowId++, ((long) ByteUtil.toShortLittleEndian(pageData, i)) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else if (vectorDataType == DataTypes.FLOAT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putFloat(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putDouble(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         }
@@ -400,35 +403,37 @@
           }
         }
       } else if (pageDataType == DataTypes.INT) {
-        int size = pageSize * DataTypes.INT.getSizeInBytes();
+        int intSizeInBytes = DataTypes.INT.getSizeInBytes();
+        int size = pageSize * intSizeInBytes;
         if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putLong(rowId++, (long) ByteUtil.toIntLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += intSizeInBytes) {
             vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
           }
         }
       } else if (pageDataType == DataTypes.LONG) {
-        int size = pageSize * DataTypes.LONG.getSizeInBytes();
+        int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
+        int size = pageSize * longSizeInBytes;
         if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          for (int i = 0; i < size; i += longSizeInBytes) {
             vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
+          for (int i = 0; i < size; i += longSizeInBytes) {
             vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
@@ -436,8 +441,9 @@
           decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         }
       } else {
-        int size = pageSize * DataTypes.DOUBLE.getSizeInBytes();
-        for (int i = 0; i < size; i += DataTypes.DOUBLE.getSizeInBytes()) {
+        int doubleSizeInBytes = DataTypes.DOUBLE.getSizeInBytes();
+        int size = pageSize * doubleSizeInBytes;
+        for (int i = 0; i < size; i += doubleSizeInBytes) {
           vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, i));
         }
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 5e78bd3..27520c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -20,10 +20,12 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.ReusableDataBuffer;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
@@ -36,11 +38,14 @@
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertibleVector;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.SequentialFill;
@@ -243,23 +248,74 @@
       CarbonColumnVector vector = vectorInfo.vector;
       DataType vectorDataType = vector.getType();
       BitSet deletedRows = vectorInfo.deletedRows;
-      vector = ColumnarVectorWrapperDirectFactory
-          .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
-              true, false);
-      fillVector(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo, nullBits);
-      if ((deletedRows == null || deletedRows.isEmpty())
-          && !(vectorInfo.vector instanceof SequentialFill)) {
-        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
-          vector.putNull(i);
-        }
+      if (vectorDataType.isComplexType() && vectorInfo.vectorStack.isEmpty()) {
+        // Only if vectorStack is empty, initialize with the parent vector
+        vectorInfo.vectorStack.push(vectorInfo.vector);
       }
-      if (vector instanceof ConvertibleVector) {
-        ((ConvertibleVector) vector).convert();
+      // If top of vector stack is a complex vector,
+      // then add their children into the stack and load them too.
+      if (!vectorInfo.vectorStack.isEmpty() && vectorInfo.vectorStack.peek().getType()
+          .isComplexType()) {
+        CarbonColumnVector parentVector = vectorInfo.vectorStack.peek();
+        CarbonColumnVectorImpl parentVectorImpl =
+            (CarbonColumnVectorImpl) (parentVector.getColumnVector());
+        // parse the parent page data,
+        // save the information about number of child in each row in parent vector
+        if (DataTypes.isStructType(parentVectorImpl.getType())) {
+          parentVectorImpl.setNumberOfChildElementsForStruct(pageData, pageSize);
+        } else {
+          parentVectorImpl.setNumberOfChildElementsForArray(pageData, pageSize);
+        }
+        for (CarbonColumnVector childVector : parentVector.getColumnVector().getChildrenVector()) {
+          // push each child
+          vectorInfo.vectorStack.push(childVector);
+          // load the child page, here child page loading flow updated vector from top of the stack
+          // and pop() the child vector once loading is finished.
+          ((CarbonColumnVectorImpl) (vectorInfo.vectorStack.peek().getColumnVector())).loadPage();
+        }
+        vector = ColumnarVectorWrapperDirectFactory
+            .getDirectVectorWrapperFactory(vectorInfo, parentVector, vectorInfo.invertedIndex,
+                nullBits, vectorInfo.deletedRows, true, false);
+        fillVectorBasedOnType(pageData, vector, vectorDataType, pageDataType, pageSize,
+            vectorInfo, nullBits);
+      } else {
+        pageSize = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, pageSize);
+        vector = ColumnarVectorWrapperDirectFactory
+            .getDirectVectorWrapperFactory(vectorInfo, vector, vectorInfo.invertedIndex, nullBits,
+                deletedRows, true, false);
+        fillVectorBasedOnType(pageData, vector, vector.getType(), pageDataType, pageSize,
+            vectorInfo, nullBits);
+        if ((deletedRows == null || deletedRows.isEmpty())
+            && !(vectorInfo.vector instanceof SequentialFill)) {
+          for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+            vector.putNull(i);
+          }
+        }
+        if (vector instanceof ConvertibleVector) {
+          ((ConvertibleVector) vector).convert();
+        }
       }
     }
 
-    private void fillVector(byte[] pageData, CarbonColumnVector vector, DataType vectorDataType,
-        DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo, BitSet nullBits) {
+    private void fillVectorBasedOnType(byte[] pageData, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo,
+        BitSet nullBits) {
+      if (vectorInfo.vector.getColumnVector() != null
+          && vector.getColumnVector() == vectorInfo.vector.getColumnVector() && vectorInfo.vector
+          .getColumnVector().getType().isComplexType()) {
+        List<Integer> childElementsForEachRow =
+            ((CarbonColumnVectorImpl) vector.getColumnVector())
+                .getNumberOfChildrenElementsInEachRow();
+        vector.getColumnVector().putComplexObject(childElementsForEachRow);
+      } else {
+        fillPrimitiveType(pageData, vector, vectorDataType, pageDataType, pageSize, vectorInfo,
+            nullBits);
+      }
+    }
+
+    private void fillPrimitiveType(byte[] pageData, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo,
+        BitSet nullBits) {
       int rowId = 0;
       if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
         if (vectorDataType == DataTypes.SHORT) {
@@ -289,32 +345,32 @@
           }
         }
       } else if (pageDataType == DataTypes.SHORT) {
-        int size = pageSize * DataTypes.SHORT.getSizeInBytes();
+        int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
+        int size = pageSize * shortSizeInBytes;
         if (vectorDataType == DataTypes.SHORT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putShort(rowId++, (ByteUtil.toShortLittleEndian(pageData, i)));
           }
         } else if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putInt(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putLong(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putLong(rowId++, (long) ByteUtil.toShortLittleEndian(pageData, i) * 1000);
           }
         } else if (DataTypes.isDecimal(vectorDataType)) {
           DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
           decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
         } else {
-          for (int i = 0; i < size; i += DataTypes.SHORT.getSizeInBytes()) {
+          for (int i = 0; i < size; i += shortSizeInBytes) {
             vector.putDouble(rowId++, ByteUtil.toShortLittleEndian(pageData, i));
           }
         }
-
       } else if (pageDataType == DataTypes.SHORT_INT) {
         if (vectorDataType == DataTypes.INT) {
           for (int i = 0; i < pageSize; i++) {
@@ -340,51 +396,104 @@
             vector.putDouble(i, shortInt);
           }
         }
-      } else if (pageDataType == DataTypes.INT) {
-        int size = pageSize * DataTypes.INT.getSizeInBytes();
-        if (vectorDataType == DataTypes.INT) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
-            vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
-          }
-        } else if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
-            vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
-          }
-        } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
-            vector.putLong(rowId++, (long) ByteUtil.toIntLittleEndian(pageData, i) * 1000);
-          }
-        } else if (DataTypes.isDecimal(vectorDataType)) {
-          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
-        } else {
-          for (int i = 0; i < size; i += DataTypes.INT.getSizeInBytes()) {
-            vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
-          }
-        }
-      } else if (pageDataType == DataTypes.LONG) {
-        int size = pageSize * DataTypes.LONG.getSizeInBytes();
-        if (vectorDataType == DataTypes.LONG) {
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
-            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
-          }
-        } else if (vectorDataType == DataTypes.TIMESTAMP) {
-          for (int i = 0; i < size; i += DataTypes.LONG.getSizeInBytes()) {
-            vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 1000);
-          }
-        } else if (DataTypes.isDecimal(vectorDataType)) {
-          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
-          decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
-        }
-      } else if (vectorDataType == DataTypes.FLOAT) {
-        int size = pageSize * DataTypes.FLOAT.getSizeInBytes();
-        for (int i = 0; i < size; i += DataTypes.FLOAT.getSizeInBytes()) {
-          vector.putFloat(rowId++, ByteUtil.toFloatLittleEndian(pageData, i));
-        }
       } else {
-        int size = pageSize * DataTypes.DOUBLE.getSizeInBytes();
-        for (int i = 0; i < size; i += DataTypes.DOUBLE.getSizeInBytes()) {
-          vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, i));
+        int intSizeInBytes = DataTypes.INT.getSizeInBytes();
+        if (pageDataType == DataTypes.INT) {
+          int size = pageSize * intSizeInBytes;
+          if (vectorDataType == DataTypes.INT) {
+            for (int i = 0; i < size; i += intSizeInBytes) {
+              vector.putInt(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
+            }
+          } else if (vectorDataType == DataTypes.LONG) {
+            for (int i = 0; i < size; i += intSizeInBytes) {
+              vector.putLong(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
+            }
+          } else if (vectorDataType == DataTypes.TIMESTAMP) {
+            for (int i = 0; i < size; i += intSizeInBytes) {
+              vector.putLong(rowId++, (long) ByteUtil.toIntLittleEndian(pageData, i) * 1000);
+            }
+          } else if (DataTypes.isDecimal(vectorDataType)) {
+            DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+            decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
+          } else {
+            for (int i = 0; i < size; i += intSizeInBytes) {
+              vector.putDouble(rowId++, ByteUtil.toIntLittleEndian(pageData, i));
+            }
+          }
+        } else if (pageDataType == DataTypes.LONG) {
+          int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
+          int size = pageSize * longSizeInBytes;
+          if (vectorDataType == DataTypes.LONG) {
+            for (int i = 0; i < size; i += longSizeInBytes) {
+              vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i));
+            }
+          } else if (vectorDataType == DataTypes.TIMESTAMP) {
+            for (int i = 0; i < size; i += longSizeInBytes) {
+              vector.putLong(rowId++, ByteUtil.toLongLittleEndian(pageData, i) * 1000);
+            }
+          } else if (DataTypes.isDecimal(vectorDataType)) {
+            DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+            decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
+          }
+        } else if (pageDataType == DataTypes.BYTE_ARRAY) {
+          // for complex primitive types
+          if (vectorDataType == DataTypes.STRING || vectorDataType == DataTypes.BINARY
+              || vectorDataType == DataTypes.VARCHAR) {
+            // for complex primitive string, binary, varchar type
+            int offset = 0;
+            for (int i = 0; i < pageSize; i++) {
+              int len = ByteBuffer.wrap(pageData, offset, intSizeInBytes).getInt();
+              offset += intSizeInBytes;
+              if (vectorDataType == DataTypes.BINARY && len == 0) {
+                vector.putNull(i);
+                continue;
+              }
+              byte[] row = new byte[len];
+              System.arraycopy(pageData, offset, row, 0, len);
+              if (Arrays.equals(row, CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY)) {
+                vector.putNull(i);
+              } else {
+                vector.putObject(i, row);
+              }
+              offset += len;
+            }
+          } else if (vectorDataType == DataTypes.DATE) {
+            // for complex primitive date type
+            int offset = 0;
+            for (int i = 0; i < pageSize; i++) {
+              int len = ByteBuffer.wrap(pageData, offset, intSizeInBytes).getInt();
+              offset += intSizeInBytes;
+              int surrogateInternal =
+                  ByteUtil.toXorInt(pageData, offset, intSizeInBytes);
+              if (len == 0) {
+                vector.putObject(0, null);
+              } else {
+                vector.putObject(0, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate);
+              }
+              offset += len;
+            }
+          } else if (DataTypes.isDecimal(vectorDataType)) {
+            // for complex primitive decimal type
+            DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+            if (decimalConverter == null) {
+              decimalConverter = DecimalConverterFactory.INSTANCE
+                  .getDecimalConverter(((DecimalType) vectorDataType).getPrecision(),
+                      ((DecimalType) vectorDataType).getScale());
+            }
+            decimalConverter.fillVector(pageData, pageSize, vectorInfo, nullBits, pageDataType);
+          }
+        } else if (vectorDataType == DataTypes.FLOAT) {
+          int floatSizeInBytes = DataTypes.FLOAT.getSizeInBytes();
+          int size = pageSize * floatSizeInBytes;
+          for (int i = 0; i < size; i += floatSizeInBytes) {
+            vector.putFloat(rowId++, ByteUtil.toFloatLittleEndian(pageData, i));
+          }
+        } else {
+          int doubleSizeInBytes = DataTypes.DOUBLE.getSizeInBytes();
+          int size = pageSize * doubleSizeInBytes;
+          for (int i = 0; i < size; i += doubleSizeInBytes) {
+            vector.putDouble(rowId++, ByteUtil.toDoubleLittleEndian(pageData, i));
+          }
         }
       }
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 62e94d8..7659cba 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -19,6 +19,7 @@
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
 
@@ -112,8 +113,17 @@
       // TODO we need to find way to directly set to vector with out conversion. This way is very
       // inefficient.
       CarbonColumnVector vector = getCarbonColumnVector(vectorInfo, nullBitSet);
-      int precision = vectorInfo.measure.getMeasure().getPrecision();
-      int newMeasureScale = vectorInfo.measure.getMeasure().getScale();
+      int precision;
+      int newMeasureScale;
+      if (vectorInfo.measure == null) {
+        // complex primitive decimal flow comes as dimension
+        precision = ((DecimalType) vector.getType()).getPrecision();
+        newMeasureScale = ((DecimalType) vector.getType()).getScale();
+        size = ColumnVectorInfo.getUpdatedPageSizeForChildVector(vectorInfo, size);
+      } else {
+        precision = vectorInfo.measure.getMeasure().getPrecision();
+        newMeasureScale = vectorInfo.measure.getMeasure().getScale();
+      }
       if (!(valuesToBeConverted instanceof byte[])) {
         throw new UnsupportedOperationException("This object type " + valuesToBeConverted.getClass()
             + " is not supported in this method");
@@ -132,12 +142,13 @@
           }
         }
       } else if (pageType == DataTypes.SHORT) {
+        int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes();
         for (int i = 0; i < size; i++) {
           if (nullBitSet.get(i)) {
             vector.putNull(i);
           } else {
             BigDecimal value = BigDecimal
-                .valueOf(ByteUtil.toShortLittleEndian(data, i * DataTypes.SHORT.getSizeInBytes()),
+                .valueOf(ByteUtil.toShortLittleEndian(data, i * shortSizeInBytes),
                     scale);
             if (value.scale() < newMeasureScale) {
               value = value.setScale(newMeasureScale);
@@ -146,12 +157,13 @@
           }
         }
       } else if (pageType == DataTypes.SHORT_INT) {
+        int shortIntSizeInBytes = DataTypes.SHORT_INT.getSizeInBytes();
         for (int i = 0; i < size; i++) {
           if (nullBitSet.get(i)) {
             vector.putNull(i);
           } else {
             BigDecimal value = BigDecimal
-                .valueOf(ByteUtil.valueOf3Bytes(data, i * DataTypes.SHORT_INT.getSizeInBytes()),
+                .valueOf(ByteUtil.valueOf3Bytes(data, i * shortIntSizeInBytes),
                     scale);
             if (value.scale() < newMeasureScale) {
               value = value.setScale(newMeasureScale);
@@ -159,32 +171,73 @@
             vector.putDecimal(i, value, precision);
           }
         }
-      } else if (pageType == DataTypes.INT) {
-        for (int i = 0; i < size; i++) {
-          if (nullBitSet.get(i)) {
-            vector.putNull(i);
-          } else {
-            BigDecimal value = BigDecimal
-                .valueOf(ByteUtil.toIntLittleEndian(data, i * DataTypes.INT.getSizeInBytes()),
-                    scale);
-            if (value.scale() < newMeasureScale) {
-              value = value.setScale(newMeasureScale);
+      } else {
+        int intSizeInBytes = DataTypes.INT.getSizeInBytes();
+        if (pageType == DataTypes.INT) {
+          for (int i = 0; i < size; i++) {
+            if (nullBitSet.get(i)) {
+              vector.putNull(i);
+            } else {
+              BigDecimal value = BigDecimal
+                  .valueOf(ByteUtil.toIntLittleEndian(data, i * intSizeInBytes),
+                      scale);
+              if (value.scale() < newMeasureScale) {
+                value = value.setScale(newMeasureScale);
+              }
+              vector.putDecimal(i, value, precision);
             }
-            vector.putDecimal(i, value, precision);
           }
-        }
-      } else if (pageType == DataTypes.LONG) {
-        for (int i = 0; i < size; i++) {
-          if (nullBitSet.get(i)) {
-            vector.putNull(i);
-          } else {
-            BigDecimal value = BigDecimal
-                .valueOf(ByteUtil.toLongLittleEndian(data, i * DataTypes.LONG.getSizeInBytes()),
-                    scale);
+        } else if (pageType == DataTypes.LONG) {
+          int longSizeInBytes = DataTypes.LONG.getSizeInBytes();
+          for (int i = 0; i < size; i++) {
+            if (nullBitSet.get(i)) {
+              vector.putNull(i);
+            } else {
+              BigDecimal value = BigDecimal
+                  .valueOf(ByteUtil.toLongLittleEndian(data, i * longSizeInBytes),
+                      scale);
+              if (value.scale() < newMeasureScale) {
+                value = value.setScale(newMeasureScale);
+              }
+              vector.putDecimal(i, value, precision);
+            }
+          }
+        } else if (pageType == DataTypes.BYTE_ARRAY) {
+          // complex primitive decimal dimension
+          int offset = 0;
+          for (int j = 0; j < size; j++) {
+            // here decimal data will be Length[4 byte], scale[1 byte], value[Length byte]
+            int len = ByteBuffer.wrap(data, offset, intSizeInBytes).getInt();
+            offset += intSizeInBytes;
+            if (len == 0) {
+              vector.putNull(j);
+              continue;
+            }
+            // jump the scale offset
+            offset += 1;
+            // remove scale from the length
+            len -= 1;
+            byte[] row = new byte[len];
+            System.arraycopy(data, offset, row, 0, len);
+            long val;
+            if (len == 1) {
+              val = row[0];
+            } else if (len == 2) {
+              val = ByteUtil.toShort(row, 0);
+            } else if (len == 4) {
+              val = ByteUtil.toInt(row, 0);
+            } else if (len == 3) {
+              val = ByteUtil.valueOf3Bytes(row, 0);
+            } else {
+              // TODO: check if other value can come
+              val = ByteUtil.toLong(row, 0, len);
+            }
+            BigDecimal value = BigDecimal.valueOf(val, scale);
             if (value.scale() < newMeasureScale) {
               value = value.setScale(newMeasureScale);
             }
-            vector.putDecimal(i, value, precision);
+            vector.putDecimal(j, value, precision);
+            offset += len;
           }
         }
       }
@@ -278,6 +331,7 @@
     public void fillVector(Object valuesToBeConverted, int size,
         ColumnVectorInfo vectorInfo, BitSet nullBitSet, DataType pageType) {
       CarbonColumnVector vector = getCarbonColumnVector(vectorInfo, nullBitSet);
+      //TODO handle complex child
       int precision = vectorInfo.measure.getMeasure().getPrecision();
       int newMeasureScale = vectorInfo.measure.getMeasure().getScale();
       if (scale < newMeasureScale) {
@@ -329,6 +383,7 @@
     public void fillVector(Object valuesToBeConverted, int size,
         ColumnVectorInfo vectorInfo, BitSet nullBitSet, DataType pageType) {
       CarbonColumnVector vector = getCarbonColumnVector(vectorInfo, nullBitSet);
+      //TODO handle complex child
       int precision = vectorInfo.measure.getMeasure().getPrecision();
       int newMeasureScale = vectorInfo.measure.getMeasure().getScale();
       if (valuesToBeConverted instanceof byte[][]) {
@@ -363,8 +418,8 @@
     CarbonColumnVector vector = vectorInfo.vector;
     BitSet deletedRows = vectorInfo.deletedRows;
     vector = ColumnarVectorWrapperDirectFactory
-        .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBitSet, deletedRows,
-            true, false);
+        .getDirectVectorWrapperFactory(vectorInfo, vector, vectorInfo.invertedIndex, nullBitSet,
+            deletedRows, true, false);
     return vector;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 84c9cd0..418a0f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -98,6 +98,14 @@
         columnVectorInfo.dimension = queryDimensions[i];
         columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
         allColumnInfo[queryDimensions[i].getOrdinal()] = columnVectorInfo;
+      } else if (queryDimensions[i].getDimension().isComplex()) {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        complexList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
+        columnVectorInfo.genericQueryType =
+            executionInfo.getComplexDimensionInfoMap().get(columnVectorInfo.ordinal);
+        allColumnInfo[queryDimensions[i].getOrdinal()] = columnVectorInfo;
       } else if (queryDimensions[i].getDimension().getDataType() != DataTypes.DATE) {
         ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
         noDictInfoList.add(columnVectorInfo);
@@ -112,14 +120,6 @@
             .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
         columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
         allColumnInfo[queryDimensions[i].getOrdinal()] = columnVectorInfo;
-      } else if (queryDimensions[i].getDimension().isComplex()) {
-        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
-        complexList.add(columnVectorInfo);
-        columnVectorInfo.dimension = queryDimensions[i];
-        columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
-        columnVectorInfo.genericQueryType =
-            executionInfo.getComplexDimensionInfoMap().get(columnVectorInfo.ordinal);
-        allColumnInfo[queryDimensions[i].getOrdinal()] = columnVectorInfo;
       } else {
         ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
         dictInfoList.add(columnVectorInfo);
@@ -251,7 +251,7 @@
 
   private void fillResultToColumnarBatch(BlockletScannedResult scannedResult) {
     scannedResult.fillDataChunks(dictionaryInfo, noDictionaryInfo, measureColumnInfo,
-        measureInfo.getMeasureOrdinals());
+        measureInfo.getMeasureOrdinals(), complexInfo);
 
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0623177..549f85c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -98,6 +98,9 @@
    */
   protected CarbonIterator queryIterator;
 
+  // Size of the ReusableDataBuffer based on the number of dimension projection columns
+  private int reusableDimensionBufferSize;
+
   public AbstractQueryExecutor(Configuration configuration) {
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
     queryProperties = new QueryExecutorProperties();
@@ -149,11 +152,26 @@
       QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getIndexFilter().getResolver(),
           queryProperties.complexFilterDimension, queryProperties.filterMeasures);
     }
-
     queryStatistic = new QueryStatistic();
     queryStatistic
         .addStatistics(QueryStatisticsConstants.LOAD_DICTIONARY, System.currentTimeMillis());
     queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+    for (int i = 0; i < queryModel.getProjectionDimensions().size(); i++) {
+      reusableDimensionBufferSize++;
+      findChildrenSize(queryModel.getProjectionDimensions().get(i).getDimension());
+    }
+  }
+
+  // Recursively determine the size needed for ReusableDataBuffer[]
+  private void findChildrenSize(CarbonDimension carbonDimension) {
+    if (carbonDimension == null || carbonDimension.getListOfChildDimensions() == null) {
+      return;
+    }
+    List<CarbonDimension> childDimension = carbonDimension.getListOfChildDimensions();
+    reusableDimensionBufferSize += childDimension.size();
+    for (CarbonDimension dimension : childDimension) {
+      findChildrenSize(dimension);
+    }
   }
 
   /**
@@ -508,7 +526,7 @@
     int[] dimensionChunkIndexes = QueryUtil.getDimensionChunkIndexes(projectDimensions,
         segmentProperties.getDimensionOrdinalToChunkMapping(),
         currentBlockFilterDimensions, allProjectionListDimensionIndexes);
-    ReusableDataBuffer[] dimensionBuffer = new ReusableDataBuffer[projectDimensions.size()];
+    ReusableDataBuffer[] dimensionBuffer = new ReusableDataBuffer[reusableDimensionBufferSize];
     for (int i = 0; i < dimensionBuffer.length; i++) {
       dimensionBuffer[i] = new ReusableDataBuffer();
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 8457244..fc1353e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -32,6 +32,7 @@
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -153,6 +154,9 @@
 
   private ReusableDataBuffer[] measureReusableBuffer;
 
+  // index used by dimensionReusableBuffer
+  private int dimensionReusableBufferIndex;
+
   public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo,
       QueryStatisticsModel queryStatisticsModel) {
     this.dimensionReusableBuffer = blockExecutionInfo.getDimensionReusableDataBuffer();
@@ -394,11 +398,25 @@
         pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
   }
 
+  void fillChildDataChunks(CarbonColumnVector columnVector, CarbonDimension dimension,
+      ColumnVectorInfo complexInfo) {
+    columnVector.setLazyPage(new LazyPageLoader(lazyBlockletLoader, dimension.getOrdinal(), false,
+        pageIdFiltered[pageCounter], complexInfo,
+        dimensionReusableBuffer[dimensionReusableBufferIndex++]));
+    if (columnVector.getChildrenVector() != null) {
+      int i = 0;
+      for (CarbonColumnVector childVector : columnVector.getChildrenVector()) {
+        fillChildDataChunks(childVector, dimension.getListOfChildDimensions().get(i++),
+            complexInfo);
+      }
+    }
+  }
+
   /**
    * Fill all the vectors with data by decompressing/decoding the column page
    */
   public void fillDataChunks(ColumnVectorInfo[] dictionaryInfo, ColumnVectorInfo[] noDictionaryInfo,
-      ColumnVectorInfo[] msrVectorInfo, int[] measuresOrdinal) {
+      ColumnVectorInfo[] msrVectorInfo, int[] measuresOrdinal, ColumnVectorInfo[] complexInfo) {
     freeDataChunkMemory();
     if (pageCounter >= pageFilteredRowCount.length) {
       return;
@@ -409,12 +427,29 @@
           new LazyPageLoader(lazyBlockletLoader, dictionaryColumnChunkIndexes[i], false,
               pageIdFiltered[pageCounter], dictionaryInfo[i], dimensionReusableBuffer[i]));
     }
-    int startIndex = dictionaryColumnChunkIndexes.length;
+    dimensionReusableBufferIndex = dictionaryColumnChunkIndexes.length;
     for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
       noDictionaryInfo[i].vector.setLazyPage(
           new LazyPageLoader(lazyBlockletLoader, noDictionaryColumnChunkIndexes[i], false,
               pageIdFiltered[pageCounter], noDictionaryInfo[i],
-              dimensionReusableBuffer[startIndex++]));
+              dimensionReusableBuffer[dimensionReusableBufferIndex++]));
+    }
+    dimensionReusableBufferIndex =
+        noDictionaryColumnChunkIndexes.length + dictionaryColumnChunkIndexes.length;
+
+    for (int i = 0; i < this.complexParentBlockIndexes.length; i++) {
+      complexInfo[i].vector.getColumnVector().setLazyPage(
+          new LazyPageLoader(lazyBlockletLoader, complexParentBlockIndexes[i], false,
+              pageIdFiltered[pageCounter], complexInfo[i],
+              dimensionReusableBuffer[dimensionReusableBufferIndex++]));
+      // set child pages
+      int dimIndex = 0;
+      for (CarbonColumnVector childVector : complexInfo[i].vector.getColumnVector()
+          .getChildrenVector()) {
+        fillChildDataChunks(childVector,
+            complexInfo[i].dimension.getDimension().getListOfChildDimensions().get(dimIndex++),
+            complexInfo[i]);
+      }
     }
 
     for (int i = 0; i < measuresOrdinal.length; i++) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 52494ad..2c31b62 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -18,6 +18,8 @@
 package org.apache.carbondata.core.scan.result.vector;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
@@ -114,4 +116,20 @@
 
   void setLazyPage(LazyPageLoader lazyPage);
 
+  // Added default implementation for interface,
+  // to avoid implementing presto required functions for spark or core module.
+  default List<CarbonColumnVector> getChildrenVector() {
+    return new ArrayList<>(0);
+  }
+
+  // Added default implementation for interface,
+  // to avoid implementing presto required functions for spark or core module.
+  default void putComplexObject(List<Integer> offsetVector) {
+  }
+
+  // Added default implementation for interface,
+  // to avoid implementing presto required functions for spark or core module.
+  default CarbonColumnVector getColumnVector() {
+    return null;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index bbbe78b..ed8be52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -18,12 +18,16 @@
 package org.apache.carbondata.core.scan.result.vector;
 
 import java.util.BitSet;
+import java.util.List;
+import java.util.Stack;
 
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 
 public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
   public int offset;
@@ -39,6 +43,8 @@
   public int[] invertedIndex;
   public BitSet deletedRows;
   public DecimalConverterFactory.DecimalConverter decimalConverter;
+  // Vector stack is used in complex column vectorInfo to store all the children vectors.
+  public Stack<CarbonColumnVector> vectorStack = new Stack<>();
 
   @Override
   public int compareTo(ColumnVectorInfo o) {
@@ -69,4 +75,23 @@
     result = prime * result + (ordinal);
     return result;
   }
+
+  public static Integer getUpdatedPageSizeForChildVector(ColumnVectorInfo vectorInfo,
+      int parentPageSize) {
+    int newPageSize = 0;
+    CarbonColumnVector vector = vectorInfo.vector;
+    if (DataTypes.isArrayType(vector.getType())) {
+      // If it is array children vector,
+      // page size will not same as parent pageSize, so need to re calculate.
+      List<Integer> childElementsCountForEachRow =
+          ((CarbonColumnVectorImpl) vector.getColumnVector())
+              .getNumberOfChildrenElementsInEachRow();
+      for (int childElementsCount : childElementsCountForEachRow) {
+        newPageSize += childElementsCount;
+      }
+      return newPageSize;
+    } else {
+      return parentPageSize;
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index 8231d86..212befe 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -18,8 +18,11 @@
 package org.apache.carbondata.core.scan.result.vector.impl;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -70,10 +73,14 @@
 
   private CarbonColumnVector dictionaryVector;
 
+  private List<CarbonColumnVector> childrenVector;
+
   private LazyPageLoader lazyPage;
 
   private boolean loaded;
 
+  private List<Integer> childElementsForEachRow;
+
   public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
     this.batchSize = batchSize;
     nullBytes = new BitSet(batchSize);
@@ -103,6 +110,53 @@
   }
 
   @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(List<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public List<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildElementsInEachRow(List<Integer> childrenElements) {
+    this.childElementsForEachRow = childrenElements;
+  }
+
+  public void setNumberOfChildElementsForArray(byte[] parentPageData, int pageSize) {
+    // for complex array type, go through parent page to get the child information
+    ByteBuffer childInfoBuffer = ByteBuffer.wrap(parentPageData);
+    List<Integer> childElementsForEachRow = new ArrayList<>();
+    // Parent page array data looks like
+    // number of children in each row [4 byte], Offset [4 byte],
+    // number of children in each row [4 byte], Offset [4 byte]...
+    while (pageSize != childElementsForEachRow.size()) {
+      // get the number of children in current row
+      childElementsForEachRow.add(childInfoBuffer.getInt());
+      // skip offset
+      childInfoBuffer.getInt();
+    }
+    setNumberOfChildElementsInEachRow(childElementsForEachRow);
+  }
+
+  public void setNumberOfChildElementsForStruct(byte[] parentPageData, int pageSize) {
+    // for complex struct type, go through parent page to get the child information
+    ByteBuffer childInfoBuffer = ByteBuffer.wrap(parentPageData);
+    List<Integer> childElementsForEachRow = new ArrayList<>();
+    // Parent page struct data looks like
+    // number of children in each row [2 byte], number of children in each row [2 byte],
+    // number of children in each row [2 byte], number of children in each row [2 byte]...
+    while (pageSize != childElementsForEachRow.size()) {
+      int elements = childInfoBuffer.getShort();
+      childElementsForEachRow.add(elements);
+    }
+    setNumberOfChildElementsInEachRow(childElementsForEachRow);
+  }
+
+  @Override
   public void putBoolean(int rowId, boolean value) {
     byteArr[rowId] =  (byte)((value) ? 1 : 0);
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
index 4c7bb07..0e13090 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java
@@ -20,6 +20,7 @@
 import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Factory to create ColumnarVectors for inverted index and delete delta queries.
@@ -30,6 +31,7 @@
    * Gets carbon vector wrapper to fill the underlying vector based on inverted index and delete
    * delta.
    *
+   * @param vectorInfo       vectorInfo used to get the complex child vector
    * @param columnVector     Actual vector to be filled.
    * @param invertedIndex    Inverted index of column page
    * @param nullBitset       row locations of nulls in bitset
@@ -38,11 +40,15 @@
    *                         there is no null bitset.
    * @return wrapped CarbonColumnVector
    */
-  public static CarbonColumnVector getDirectVectorWrapperFactory(CarbonColumnVector columnVector,
-      int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists,
-      boolean isDictVector) {
+  public static CarbonColumnVector getDirectVectorWrapperFactory(ColumnVectorInfo vectorInfo,
+      CarbonColumnVector columnVector, int[] invertedIndex, BitSet nullBitset, BitSet deletedRows,
+      boolean isnullBitsExists, boolean isDictVector) {
     // If it is sequential data filler then add the null bitset.
     if (columnVector instanceof SequentialFill) {
+      if (columnVector.getType().isComplexType() && !vectorInfo.vectorStack.isEmpty()) {
+        // update to child vector, as it is child vector filling flow
+        columnVector = vectorInfo.vectorStack.peek();
+      }
       // If it has inverted index then create a dummy delete rows bitset so that it goes to
       // ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex, here it does the sequential
       // filling using another vector.
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
index e7aaac9..30efbb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
@@ -20,6 +20,7 @@
 import java.math.BigDecimal;
 import java.util.BitSet;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 
 /**
@@ -42,6 +43,11 @@
   }
 
   @Override
+  public DataType getType() {
+    return columnVector.getType();
+  }
+
+  @Override
   public void putBoolean(int rowId, boolean value) {
     if (!deletedRows.get(rowId)) {
       if (nullBits.get(rowId)) {
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index 49e1be0..b3ffb01 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -48,6 +48,10 @@
     }
   }
 
+  public CarbonColumnVector getColumnVector() {
+    return columnVector;
+  }
+
   @Override
   public void putBoolean(int rowId, boolean value) {
     if (!filteredRows[rowId]) {
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index 94543fb..88c18a7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -22,22 +22,9 @@
 import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.datatype.DecimalType;
-import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.*;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
-import org.apache.carbondata.presto.readers.BooleanStreamReader;
-import org.apache.carbondata.presto.readers.ByteStreamReader;
-import org.apache.carbondata.presto.readers.DecimalSliceStreamReader;
-import org.apache.carbondata.presto.readers.DoubleStreamReader;
-import org.apache.carbondata.presto.readers.FloatStreamReader;
-import org.apache.carbondata.presto.readers.IntegerStreamReader;
-import org.apache.carbondata.presto.readers.LongStreamReader;
-import org.apache.carbondata.presto.readers.ObjectStreamReader;
-import org.apache.carbondata.presto.readers.ShortStreamReader;
-import org.apache.carbondata.presto.readers.SliceStreamReader;
-import org.apache.carbondata.presto.readers.TimestampStreamReader;
+import org.apache.carbondata.presto.readers.*;
 
 public class CarbonVectorBatch {
 
@@ -103,6 +90,8 @@
       } else {
         return null;
       }
+    } else if (field.getDataType().isComplexType()) {
+      return new ComplexTypeStreamReader(batchSize, field);
     } else {
       return new ObjectStreamReader(batchSize, field.getDataType());
     }
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
index 5cbcdcb..099b631 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
@@ -19,6 +19,7 @@
 
 import java.math.BigDecimal;
 import java.util.BitSet;
+import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -30,7 +31,7 @@
 /**
  * Fills the vector directly with out considering any deleted rows.
  */
-class ColumnarVectorWrapperDirect implements CarbonColumnVector, SequentialFill {
+public class ColumnarVectorWrapperDirect implements CarbonColumnVector, SequentialFill {
   /**
    * It is adapter class of complete ColumnarBatch.
    */
@@ -45,7 +46,7 @@
 
   private BitSet nullBitSet;
 
-  ColumnarVectorWrapperDirect(CarbonColumnVectorImpl columnVector) {
+  public ColumnarVectorWrapperDirect(CarbonColumnVectorImpl columnVector) {
     this.columnVector = columnVector;
     this.dictionaryVector = columnVector.getDictionaryVector();
     this.nullBitSet = new BitSet();
@@ -203,8 +204,19 @@
 
   @Override
   public void putObject(int rowId, Object obj) {
-    throw new UnsupportedOperationException(
-        "Not supported this opeartion from " + this.getClass().getName());
+    columnVector.putObject(rowId, obj);
+  }
+
+  public CarbonColumnVectorImpl getColumnVector() {
+    return this.columnVector;
+  }
+
+  public List<CarbonColumnVector> getChildrenVector() {
+    return columnVector.getChildrenVector();
+  }
+
+  public void putComplexObject(List<Integer> offsetVector) {
+    columnVector.putComplexObject(offsetVector);
   }
 
   @Override
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index b3438d8..04e0de5 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -26,6 +26,7 @@
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -163,6 +164,19 @@
     return 0;
   }
 
+  public StructField fillChildFields(CarbonDimension dimension) {
+    List<CarbonDimension> listOfChildDimensions =
+            dimension.getListOfChildDimensions();
+    List<StructField> childFields = null;
+    if (listOfChildDimensions != null) {
+      childFields = new ArrayList<>();
+      for (CarbonDimension childDimension : listOfChildDimensions) {
+        childFields.add(fillChildFields(childDimension));
+      }
+    }
+    return new StructField(dimension.getColName(), dimension.getDataType(), childFields);
+  }
+
   /**
    * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
    * This object is reused. Calling this enables the vectorized reader. This should be called
@@ -173,11 +187,15 @@
     List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
     List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
     StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
-    for (int i = 0; i < queryDimension.size(); i++) {
-      ProjectionDimension dim = queryDimension.get(i);
+    for (ProjectionDimension dim : queryDimension) {
       if (dim.getDimension().isComplex()) {
+        List<CarbonDimension> childDimensions = dim.getDimension().getListOfChildDimensions();
+        List<StructField> childFields = new ArrayList<StructField>();
+        for (CarbonDimension childDimension : childDimensions) {
+          childFields.add(fillChildFields(childDimension));
+        }
         fields[dim.getOrdinal()] =
-            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+            new StructField(dim.getColumnName(), dim.getDimension().getDataType(), childFields);
       } else if (dim.getDimension().getDataType() == DataTypes.DATE) {
         DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(dim.getDimension().getDataType());
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
new file mode 100644
index 0000000..d626cb9
--- /dev/null
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import io.prestosql.spi.block.ArrayBlock;
+import io.prestosql.spi.block.RowBlock;
+import io.prestosql.spi.type.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+
+import org.apache.carbondata.presto.CarbonVectorBatch;
+import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
+
+/**
+ * Class to read the complex type Stream [array/struct/map]
+ */
+
+public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type;
+  protected BlockBuilder builder;
+
+  public ComplexTypeStreamReader(int batchSize, StructField field) {
+    super(batchSize, field.getDataType());
+    this.batchSize = batchSize;
+    this.type = getType(field);
+    List<CarbonColumnVector> childrenList = new ArrayList<>();
+    for (StructField child : field.getChildren()) {
+      childrenList.add(new ColumnarVectorWrapperDirect(Objects.requireNonNull(
+          CarbonVectorBatch.createDirectStreamReader(this.batchSize, child.getDataType(), child))));
+    }
+    setChildrenVector(childrenList);
+    this.builder = type.createBlockBuilder(null, batchSize);
+  }
+
+  Type getType(StructField field) {
+    DataType dataType = field.getDataType();
+    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+      return VarcharType.VARCHAR;
+    } else if (dataType == DataTypes.SHORT) {
+      return SmallintType.SMALLINT;
+    } else if (dataType == DataTypes.INT) {
+      return IntegerType.INTEGER;
+    } else if (dataType == DataTypes.LONG) {
+      return BigintType.BIGINT;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return DoubleType.DOUBLE;
+    } else if (dataType == DataTypes.FLOAT) {
+      return RealType.REAL;
+    } else if (dataType == DataTypes.BOOLEAN) {
+      return BooleanType.BOOLEAN;
+    } else if (dataType == DataTypes.BINARY) {
+      return VarbinaryType.VARBINARY;
+    } else if (dataType == DataTypes.DATE) {
+      return DateType.DATE;
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return TimestampType.TIMESTAMP;
+    } else if (dataType == DataTypes.BYTE) {
+      return TinyintType.TINYINT;
+    } else if (DataTypes.isDecimal(dataType)) {
+      org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
+          (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
+      return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
+    } else if (DataTypes.isArrayType(dataType)) {
+      return new ArrayType(getType(field.getChildren().get(0)));
+    } else if (DataTypes.isStructType(dataType)) {
+      List<RowType.Field> children = new ArrayList<>();
+      for (StructField child : field.getChildren()) {
+        children.add(new RowType.Field(Optional.of(child.getFieldName()), getType(child)));
+      }
+      return RowType.from(children);
+    } else {
+      throw new UnsupportedOperationException("Unsupported type: " + dataType);
+    }
+  }
+
+  @Override public Block buildBlock() {
+    return builder.build();
+  }
+
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  public void putComplexObject(List<Integer> offsetVector) {
+    if (type instanceof ArrayType) {
+      // build child block
+      Block childBlock = buildChildBlock(getChildrenVector().get(0));
+      // prepare an offset vector with 0 as initial offset
+      int[] offsetVectorArray = new int[offsetVector.size() + 1];
+      for (int i = 1; i <= offsetVector.size(); i++) {
+        offsetVectorArray[i] = offsetVectorArray[i - 1] + offsetVector.get(i - 1);
+      }
+      // prepare Array block
+      Block arrayBlock = ArrayBlock
+          .fromElementBlock(offsetVector.size(), Optional.empty(), offsetVectorArray,
+              childBlock);
+      for (int position = 0; position < offsetVector.size(); position++) {
+        type.writeObject(builder, arrayBlock.getObject(position, Block.class));
+      }
+      getChildrenVector().get(0).getColumnVector().reset();
+    } else {
+      // build child blocks
+      List<Block> childBlocks = new ArrayList<>(getChildrenVector().size());
+      for (CarbonColumnVector child : getChildrenVector()) {
+        childBlocks.add(buildChildBlock(child));
+      }
+      // prepare ROW block
+      Block rowBlock = RowBlock
+          .fromFieldBlocks(childBlocks.get(0).getPositionCount(), Optional.empty(),
+              childBlocks.toArray(new Block[0]));
+      for (int position = 0; position < childBlocks.get(0).getPositionCount(); position++) {
+        type.writeObject(builder, rowBlock.getObject(position, Block.class));
+      }
+      for (CarbonColumnVector child : getChildrenVector()) {
+        child.getColumnVector().reset();
+      }
+    }
+  }
+
+  private Block buildChildBlock(CarbonColumnVector carbonColumnVector) {
+    DataType dataType = carbonColumnVector.getType();
+    carbonColumnVector = carbonColumnVector.getColumnVector();
+    if (dataType == DataTypes.STRING || dataType == DataTypes.BINARY
+        || dataType == DataTypes.VARCHAR) {
+      return ((SliceStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.SHORT) {
+      return ((ShortStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
+      return ((IntegerStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.LONG) {
+      return ((LongStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.DOUBLE) {
+      return ((DoubleStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.FLOAT) {
+      return ((FloatStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return ((TimestampStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.BOOLEAN) {
+      return ((BooleanStreamReader) carbonColumnVector).buildBlock();
+    } else if (DataTypes.isDecimal(dataType)) {
+      return ((DecimalSliceStreamReader) carbonColumnVector).buildBlock();
+    } else if (dataType == DataTypes.BYTE) {
+      return ((ByteStreamReader) carbonColumnVector).buildBlock();
+    } else if (DataTypes.isArrayType(dataType) || (DataTypes.isStructType(dataType))) {
+      return ((ComplexTypeStreamReader) carbonColumnVector).buildBlock();
+    } else {
+      throw new UnsupportedOperationException("unsupported for type :" + dataType);
+    }
+  }
+
+  @Override public void putNull(int rowId) {
+    builder.appendNull();
+  }
+
+  @Override public void reset() {
+    builder = type.createBlockBuilder(null, batchSize);
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+}
+
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java
index e103b4a..a2d3319 100644
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -151,7 +151,7 @@
       putNull(rowId);
     } else {
       if (dictionaryBlock == null) {
-        putByteArray(rowId, ByteUtil.toBytes((String) value));
+        putByteArray(rowId, (byte []) value);
       } else {
         putInt(rowId, (int) value);
       }
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index fd4ac35..f5c5629 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -24,19 +24,20 @@
 import org.apache.commons.codec.binary.Hex
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang.RandomStringUtils
-import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.{DataTypes, Field}
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, Field, StructField}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.presto.server.PrestoServer
 import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
 
+import io.prestosql.jdbc.PrestoArray
 
-class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAfterAll {
+class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach {
 
   private val logger = LogServiceFactory
     .getLogService(classOf[PrestoTestNonTransactionalTableFiles].getCanonicalName)
@@ -55,8 +56,9 @@
     val map = new util.HashMap[String, String]()
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
-
     prestoServer.startServer("sdk_output", map)
+    prestoServer.execute("drop schema if exists sdk_output")
+    prestoServer.execute("create schema sdk_output")
   }
 
   override def afterAll(): Unit = {
@@ -71,6 +73,12 @@
     buildTestData(3, null, true)
   }
 
+  def buildStructData(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    createTable
+    buildTestData(3, null, true)
+  }
+
   def buildTestDataMultipleFiles(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
     createTable
@@ -79,8 +87,6 @@
 
   private def createTable = {
     prestoServer.execute("drop table if exists sdk_output.files")
-    prestoServer.execute("drop schema if exists sdk_output")
-    prestoServer.execute("create schema sdk_output")
     prestoServer
       .execute(
         "create table sdk_output.files(name varchar, age int, id tinyint, height double, salary " +
@@ -90,8 +96,6 @@
 
   private def createTableBinary = {
     prestoServer.execute("drop table if exists sdk_output.files1")
-    prestoServer.execute("drop schema if exists sdk_output")
-    prestoServer.execute("create schema sdk_output")
     prestoServer
       .execute(
         "create table sdk_output.files1(name boolean, age int, id varbinary, height double, salary " +
@@ -356,4 +360,340 @@
     val expectedRes: List[Map[String, Any]] = List(Map("_col0" -> 32001))
     assert(actualRes.toString() equals expectedRes.toString())
   }
-}
\ No newline at end of file
+
+  test("test struct of primitive type") {
+    import scala.collection.JavaConverters._
+    val writerPathComplex = storePath + "/sdk_output/files4"
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+    prestoServer.execute("drop table if exists sdk_output.files4")
+    prestoServer
+      .execute(
+        "create table sdk_output.files4(stringField varchar, structField ROW(byteField tinyint, shortField SMALLINT, intField Integer, " +
+        "longField BIGINT, floatField real, doubleField DOUBLE, binaryField varbinary, dateField date, timeStampField timestamp, " +
+        "booleanField boolean, longStringField varchar, decimalField decimal(8,2), stringChildField varchar)) with(format='CARBON') ")
+
+    val imagePath = rootPath + "/sdk/sdk/src/test/resources/image/carbondatalogo.jpg"
+    val bis = new BufferedInputStream(new FileInputStream(imagePath))
+    var hexValue: Array[Char] = null
+    val originBinary = new Array[Byte](bis.available)
+    while (bis.read(originBinary) != -1) {
+      hexValue = Hex.encodeHex(originBinary)
+    }
+    bis.close()
+    val binaryValue = String.valueOf(hexValue)
+
+    val longChar = RandomStringUtils.randomAlphabetic(33000)
+
+    val fields = List(new StructField("byteField", DataTypes.BYTE),
+      new StructField("shortField", DataTypes.SHORT),
+      new StructField("intField", DataTypes.INT),
+      new StructField("longField", DataTypes.LONG),
+      new StructField("floatField", DataTypes.FLOAT),
+      new StructField("doubleField", DataTypes.DOUBLE),
+      new StructField("binaryField", DataTypes.BINARY),
+      new StructField("dateField", DataTypes.DATE),
+      new StructField("timeStampField", DataTypes.TIMESTAMP),
+      new StructField("booleanField", DataTypes.BOOLEAN),
+      new StructField("longStringField", DataTypes.VARCHAR),
+      new StructField("decimalField", DataTypes.createDecimalType(8, 2)),
+      new StructField("stringChildField", DataTypes.STRING))
+    val structType = Array(new Field("stringField", DataTypes.STRING), new Field
+    ("structField", "struct", fields.asJava))
+
+    try {
+      val options: util.Map[String, String] = Map("bAd_RECords_action" -> "FORCE", "quotechar" -> "\"").asJava
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPathComplex)
+          .uniqueIdentifier(System.nanoTime()).withLoadOptions(options).withBlockSize(2).enableLocalDictionary(false)
+          .withCsvInput(new Schema(structType)).writtenBy("presto").build()
+
+      val array1 = Array[String]("row1",
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null)
+
+      val array2 = Array[String]("row2", "5"
+                                           + "\001" + "5"
+                                           + "\001" + "5"
+                                           + "\001" + "5"
+                                           + "\001" + "5.512"
+                                           + "\001" + "5.512"
+                                           + "\001" + binaryValue
+                                           + "\001" + "2019-03-02"
+                                           + "\001" + "2019-02-12 03:03:34"
+                                           + "\001" + "true"
+                                           + "\001" + longChar
+                                           + "\001" + "1.234567"
+                                           + "\001" + "stringName")
+      writer.write(array1)
+      writer.write(array2)
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _: Throwable => None
+    }
+    val actualResult: List[Map[String, Any]] = prestoServer
+      .executeQuery("select * from files4 ")
+    assert(actualResult.size == 2)
+
+
+    for(i <- 0 to 1) {
+      val row = actualResult(i)("stringfield")
+      val result = actualResult(i)("structfield").asInstanceOf[java.util.Map[String,Any]]
+      if(row == "row1") { assert(result.get("bytefield") == null)
+        assert(result.get("shortfield") == null)
+        assert(result.get("intfield") == null)
+        assert(result.get("longfield") == null)
+        assert(result.get("floatfield") == null)
+        assert(result.get("doublefield") == null)
+        assert(result.get("binaryfield") == null)
+        assert(result.get("datefield") == null)
+        assert(result.get("timestampfield") == null)
+        assert(result.get("booleanfield") == null)
+        assert(result.get("longstringfield") == null)
+        assert(result.get("decimalfield") == null)
+        assert(result.get("stringchildfield") == null)
+      } else {
+        assert(result.get("bytefield") == 5)
+        assert(result.get("shortfield") == 5)
+        assert(result.get("intfield") == 5)
+        assert(result.get("longfield") == 5L)
+        assert(result.get("floatfield") == 5.512f)
+        assert(result.get("doublefield") == 5.512)
+        assert((result.get("binaryfield").asInstanceOf[Array[Byte]]).length == 118198)
+        assert(result.get("datefield") == "2019-03-02")
+        assert(result.get("timestampfield") == "2019-02-12 03:03:34.000")
+        assert(result.get("booleanfield") == true)
+        assert(result.get("longstringfield") == longChar)
+        assert(result.get("decimalfield") == "1.23")
+        assert(result.get("stringchildfield") == "stringName")
+      }
+    }
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+  }
+
+  test("test struct of date type with huge data") {
+    import scala.collection.JavaConverters._
+    val writerPathComplex = storePath + "/sdk_output/files2"
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+    prestoServer.execute("drop table if exists sdk_output.files2")
+    prestoServer
+      .execute(
+        "create table sdk_output.files2(structField ROW(dateField date)) with(format='CARBON') ")
+    val fields = List(
+      new StructField("dateField", DataTypes.DATE))
+    val structType = Array(new Field("structField", "struct", fields.asJava))
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPathComplex)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).enableLocalDictionary(false)
+          .withCsvInput(new Schema(structType)).writtenBy("presto").build()
+      var i = 0
+      while (i < 100000) {
+        val array = Array[String]("2019-03-02")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _: Throwable => None
+    }
+    val actualResult: List[Map[String, Any]] = prestoServer
+      .executeQuery("select structField from files2 ")
+    assert(actualResult.size == 100000)
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+  }
+
+  test("test Array of primitive type") {
+    val writerPathComplex = storePath + "/sdk_output/files5"
+    import scala.collection.JavaConverters._
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+    prestoServer.execute("drop table if exists sdk_output.files5")
+    prestoServer
+      .execute(
+        "create table sdk_output.files5(arrayByte ARRAY(tinyint), arrayShort ARRAY(smallint), arrayInt ARRAY(int), " +
+        "arrayLong ARRAY(bigint), arrayFloat ARRAY(real), arrayDouble ARRAY(double), " +
+        "arrayBinary ARRAY(varbinary), arrayDate ARRAY(date), arrayTimestamp ARRAY(timestamp), arrayBoolean ARRAY(boolean), " +
+        "arrayVarchar ARRAY(varchar), arrayDecimal ARRAY(decimal(8,2)), arrayString ARRAY(varchar), stringField varchar ) with(format='CARBON') ")
+
+    val imagePath = rootPath + "/sdk/sdk/src/test/resources/image/carbondatalogo.jpg"
+    val bis = new BufferedInputStream(new FileInputStream(imagePath))
+    var hexValue: Array[Char] = null
+    val originBinary = new Array[Byte](bis.available)
+    while (bis.read(originBinary) != -1) {
+      hexValue = Hex.encodeHex(originBinary)
+    }
+    bis.close()
+    val binaryValue = String.valueOf(hexValue)
+
+    val longChar = RandomStringUtils.randomAlphabetic(33000)
+
+    val fields1 = List(new StructField("byteField", DataTypes.BYTE))
+    val structType1 = new Field("arrayByte", "array", fields1.asJava)
+    val fields2 = List(new StructField("shortField", DataTypes.SHORT))
+    val structType2 = new Field("arrayShort", "array", fields2.asJava)
+    val fields3 = List(new StructField("intField", DataTypes.INT))
+    val structType3 = new Field("arrayInt", "array", fields3.asJava)
+    val fields4 = List(new StructField("longField", DataTypes.LONG))
+    val structType4 = new Field("arrayLong", "array", fields4.asJava)
+    val fields5 = List(new StructField("floatField", DataTypes.FLOAT))
+    val structType5 = new Field("arrayFloat", "array", fields5.asJava)
+    val fields6 = List(new StructField("DoubleField", DataTypes.DOUBLE))
+    val structType6 = new Field("arrayDouble", "array", fields6.asJava)
+    val fields7 = List(new StructField("binaryField", DataTypes.BINARY))
+    val structType7 = new Field("arrayBinary", "array", fields7.asJava)
+    val fields8 = List(new StructField("dateField", DataTypes.DATE))
+    val structType8 = new Field("arrayDate", "array", fields8.asJava)
+    val fields9 = List(new StructField("timestampField", DataTypes.TIMESTAMP))
+    val structType9 = new Field("arrayTimestamp", "array", fields9.asJava)
+    val fields10 = List(new StructField("booleanField", DataTypes.BOOLEAN))
+    val structType10 = new Field("arrayBoolean", "array", fields10.asJava)
+    val fields11 = List(new StructField("varcharField", DataTypes.VARCHAR))
+    val structType11 = new Field("arrayVarchar", "array", fields11.asJava)
+    val fields12 = List(new StructField("decimalField", DataTypes.createDecimalType(8, 2)))
+    val structType12 = new Field("arrayDecimal", "array", fields12.asJava)
+    val fields13 = List(new StructField("stringField", DataTypes.STRING))
+    val structType13 = new Field("arrayString", "array", fields13.asJava)
+    val structType14 = new Field("stringField", DataTypes.STRING)
+
+    try {
+      val options: util.Map[String, String] = Map("bAd_RECords_action" -> "FORCE", "quotechar" -> "\"").asJava
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPathComplex).withLoadOptions(options)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).enableLocalDictionary(false)
+          .withCsvInput(new Schema(Array[Field](structType1,structType2,structType3,structType4,structType5,structType6,
+            structType7,structType8,structType9,structType10,structType11,structType12,structType13,structType14))).writtenBy("presto").build()
+
+      var array = Array[String](null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        "row1")
+      writer.write(array)
+      array = Array[String]("3" + "\001" + "5" + "\001" + "4",
+        "4" + "\001" + "5" + "\001" + "6",
+        "4",
+        "2" + "\001" + "59999999" + "\001" + "99999999999" ,
+        "5.4646" + "\001" + "5.55" + "\001" + "0.055",
+        "5.46464646464" + "\001" + "5.55" + "\001" + "0.055",
+        binaryValue,
+        "2019-03-02" + "\001" + "2020-03-02" + "\001" + "2021-04-02",
+        "2019-02-12 03:03:34" + "\001" +"2020-02-12 03:03:34" + "\001" + "2021-03-12 03:03:34",
+        "true" + "\001" + "false",
+        longChar,
+        "999.232323" + "\001" + "0.1234",
+        "japan" + "\001" + "china" + "\001" + "iceland",
+        "row2")
+      writer.write(array)
+      writer.close()
+    } catch {
+      case e: Exception =>
+        assert(false)
+    }
+    val actualResult: List[Map[String, Any]] = prestoServer
+      .executeQuery("select * from files5 ")
+
+    assert(actualResult.size == 2)
+    for( row <- 0 to 1) {
+      var column1 = actualResult(row)("stringfield")
+      if(column1 == "row1") {
+        var column2 =  actualResult(row)("arraybyte").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+        var column3 =  actualResult(row)("arrayshort").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+        var column4 =  actualResult(row)("arrayint").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+        assert(column2(0) == null)
+        assert(column3(0) == null)
+        assert(column4(0) == null)
+      } else if(column1 == "row2") {
+        var column2 =  actualResult(row)("arrayint").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+        if (column2.sameElements(Array(4))) {
+          var column3 =  actualResult(row)("arraybyte").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column4 =  actualResult(row)("arrayshort").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column5 =  actualResult(row)("arraylong").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column6 =  actualResult(row)("arrayfloat").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column7 =  actualResult(row)("arraydouble").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column8 =  actualResult(row)("arraybinary").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column9 =  actualResult(row)("arraydate").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column10 =  actualResult(row)("arraytimestamp").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column11 =  actualResult(row)("arrayboolean").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column12 =  actualResult(row)("arrayvarchar").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column13 =  actualResult(row)("arraydecimal").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+          var column14 =  actualResult(row)("arraystring").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
+
+          assert(column3.sameElements(Array(3,5,4)))
+          assert(column4.sameElements(Array(4,5,6)))
+          assert(column5.sameElements(Array(2L,59999999L, 99999999999L)))
+          assert(column6.sameElements(Array(5.4646f,5.55f,0.055f)))
+          assert(column7.sameElements(Array(5.46464646464,5.55,0.055)))
+          assert(column8(0).asInstanceOf[Array[Byte]].size == 118198)
+          assert(column9.sameElements(Array("2019-03-02","2020-03-02","2021-04-02")))
+          assert(column10.sameElements(Array("2019-02-12 03:03:34.000","2020-02-12 03:03:34.000","2021-03-12 03:03:34.000")))
+          assert(column11.sameElements(Array(true,false)))
+          assert(column12.sameElements(Array(longChar)))
+          assert(column13.sameElements(Array("999.23","0.12")))
+          assert(column14.sameElements(Array("japan","china","iceland")))
+        }
+      }
+    }
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+  }
+
+  test("test Array of date type with huge data") {
+    val writerPathComplex = storePath + "/sdk_output/files6"
+    import scala.collection.JavaConverters._
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+    prestoServer.execute("drop table if exists sdk_output.files6")
+    prestoServer
+      .execute(
+        "create table sdk_output.files6(arrayDate ARRAY(date)) with(format='CARBON') ")
+    val fields8 = List(new StructField("intField", DataTypes.DATE))
+    val structType8 = new Field("arrayDate", "array", fields8.asJava)
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPathComplex)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).enableLocalDictionary(false)
+          .withCsvInput(new Schema(Array[Field](structType8))).writtenBy("presto").build()
+
+      var i = 0
+      while (i < 50000) {
+        val array = Array[String]("2019-03-02" + "\001" + "2020-03-02" + "\001" + "2021-04-02")
+        writer.write(array)
+        val array1 = Array[String]("2021-04-02")
+        writer.write(array1)
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case e: Exception =>
+        assert(false)
+    }
+    val actualResult: List[Map[String, Any]] = prestoServer
+      .executeQuery("select * from files6 ")
+    assert(actualResult.size == 100 * 1000)
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+  }
+
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 5bfc61e..d9f3e8c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -240,16 +240,17 @@
       BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws IOException {
     String parsedValue = null;
     // write null value
-    if (null == input ||
-        (this.carbonDimension.getDataType() == DataTypes.STRING && input.equals(nullFormat))) {
+    if (null == input || ((this.carbonDimension.getDataType() == DataTypes.STRING
+        || this.carbonDimension.getDataType() == DataTypes.VARCHAR) && input.equals(nullFormat))) {
       updateNullValue(dataOutputStream, logHolder);
       return;
     }
     // write null value after converter
     if (!isWithoutConverter) {
       parsedValue = DataTypeUtil.parseValue(input.toString(), carbonDimension);
-      if (null == parsedValue || (this.carbonDimension.getDataType() == DataTypes.STRING
-          && parsedValue.equals(nullFormat))) {
+      if (null == parsedValue || ((this.carbonDimension.getDataType() == DataTypes.STRING
+          || this.carbonDimension.getDataType() == DataTypes.VARCHAR) && parsedValue
+          .equals(nullFormat))) {
         updateNullValue(dataOutputStream, logHolder);
         return;
       }
@@ -413,7 +414,8 @@
 
   private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
       throws IOException {
-    if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+    if (this.carbonDimension.getDataType() == DataTypes.STRING
+        || this.carbonDimension.getDataType() == DataTypes.VARCHAR) {
       if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
         dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
       } else {