[CARBONDATA-3653] Support huge data for complex child columns

Why is this PR needed?

Currently complex child columns string and binary is stored as short length. So, if the data is more than 32000 characters. Data load will fail for binary and long string columns.

What changes were proposed in this PR?

complex child columns string, binary, decimal, date is stored as byte_array page with short length. Changed it to int length. [Just separating string and binary is hard now, to do in future]
Handled compatibility by introducing the new encoding type for complex child columns

Does this PR introduce any user interface change?

No

Is any new testcase added?

Yes

This closes #3562
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 338f0b2..b3463f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -698,7 +698,8 @@
    * @return
    * @throws IOException
    */
-  public abstract byte[] getComplexChildrenLVFlattenedBytePage() throws IOException;
+  public abstract byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType)
+      throws IOException;
 
   /**
    * For complex type columns
@@ -746,7 +747,8 @@
       return getDecimalPage().length;
     } else if (dataType == BYTE_ARRAY
         && columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
-      return getComplexChildrenLVFlattenedBytePage().length;
+      return getComplexChildrenLVFlattenedBytePage(
+          columnPageEncoderMeta.getColumnSpec().getSchemaDataType()).length;
     } else if (dataType == BYTE_ARRAY
         && (columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_STRUCT
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_ARRAY
@@ -785,7 +787,8 @@
       return compressor.compressByte(getDecimalPage());
     } else if (dataType == BYTE_ARRAY
         && columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
-      return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
+      return compressor.compressByte(getComplexChildrenLVFlattenedBytePage(
+          columnPageEncoderMeta.getColumnSpec().getSchemaDataType()));
     } else if (dataType == BYTE_ARRAY
         && (columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_STRUCT
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_ARRAY
@@ -805,8 +808,8 @@
    * Decompress data and create a column page using the decompressed data,
    * except for decimal page
    */
-  public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compressedData,
-      int offset, int length, boolean isLVEncoded)
+  public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compressedData, int offset,
+      int length, boolean isLVEncoded, boolean isComplexPrimitiveIntLengthEncoding)
       throws MemoryException {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
@@ -836,8 +839,14 @@
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
             || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newComplexLVBytesPage(columnSpec, lvVarBytes,
-          CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName());
+      if (isComplexPrimitiveIntLengthEncoding) {
+        // decode as int length
+        return newComplexLVBytesPage(columnSpec, lvVarBytes,
+            CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
+      } else {
+        return newComplexLVBytesPage(columnSpec, lvVarBytes,
+            CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName());
+      }
     } else if (isLVEncoded && storeDataType == BYTE_ARRAY &&
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index d0389d3..b789e00 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -179,7 +179,7 @@
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) {
     throw new UnsupportedOperationException("internal error");
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index f428c60..2e3a330 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -28,6 +28,7 @@
 import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
 import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 import org.apache.log4j.Logger;
 
@@ -364,11 +365,11 @@
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException {
     if (null != encodedDataColumnPage) {
-      return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+      return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage(dataType);
     } else {
-      return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+      return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage(dataType);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index f45f482..a579bdd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -296,7 +296,7 @@
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(stream);
     for (int i = 0; i < arrayElementCount; i++) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index b105239..0d34b06 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -25,6 +25,8 @@
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
@@ -88,11 +90,15 @@
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(stream);
     for (byte[] byteArrayDatum : byteArrayData) {
-      out.writeShort((short)byteArrayDatum.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        out.writeInt(byteArrayDatum.length);
+      } else {
+        out.writeShort((short) byteArrayDatum.length);
+      }
       out.write(byteArrayDatum);
     }
     return stream.toByteArray();
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 4ef5e5d..00c8ca0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -24,6 +24,7 @@
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
@@ -396,7 +397,7 @@
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) {
     byte[] data = new byte[totalLength];
     CarbonUnsafe.getUnsafe()
         .copyMemory(baseAddress, baseOffset, data, CarbonUnsafe.BYTE_ARRAY_OFFSET, totalLength);
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 01f1d55..1381dc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -243,9 +243,15 @@
     int counter = 0;
     // extract Length field in input and calculate total length
     for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
-      length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
-      rowOffset.putInt(counter, offset);
-      lvEncodedOffset += lvLength + length;
+      if (lvLength == CarbonCommonConstants.INT_SIZE_IN_BYTE) {
+        length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
+        rowOffset.putInt(counter, offset);
+        lvEncodedOffset += lvLength + length;
+      } else {
+        length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
+        rowOffset.putInt(counter, offset);
+        lvEncodedOffset += lvLength + length;
+      }
       rowId++;
       counter++;
     }
@@ -465,15 +471,30 @@
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException {
     // output LV encoded byte array
     int offset = 0;
-    byte[] data = new byte[totalLength + ((rowOffset.getActualRowCount() - 1) * 2)];
+    int outputLength;
+    if (dataType == DataTypes.BYTE_ARRAY) {
+      outputLength = totalLength + ((rowOffset.getActualRowCount() - 1)
+          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    } else {
+      outputLength = totalLength + ((rowOffset.getActualRowCount() - 1)
+          * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    }
+    byte[] data = new byte[outputLength];
     for (int rowId = 0; rowId < rowOffset.getActualRowCount() - 1; rowId++) {
-      short length = (short) (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId));
-      ByteUtil.setShort(data, offset, length);
-      copyBytes(rowId, data, offset + 2, length);
-      offset += 2 + length;
+      if (dataType == DataTypes.BYTE_ARRAY) {
+        int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId);
+        ByteUtil.setInt(data, offset, length);
+        copyBytes(rowId, data, offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, length);
+        offset += CarbonCommonConstants.INT_SIZE_IN_BYTE + length;
+      } else {
+        short length = (short) (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId));
+        ByteUtil.setShort(data, offset, length);
+        copyBytes(rowId, data, offset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE, length);
+        offset += CarbonCommonConstants.SHORT_SIZE_IN_BYTE + length;
+      }
     }
     return data;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index b7b2529..e63dd82 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -194,6 +194,11 @@
     while (index < input.getComplexColumnIndex()) {
       ColumnPage subColumnPage = input.getColumnPage(index);
       encodedPages[index] = encodedColumn(subColumnPage);
+      // by default add this encoding,
+      // it is used for checking length of
+      // complex child byte array columns (short and int)
+      encodedPages[index].getPageMetadata().getEncoders()
+          .add(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY);
       index++;
     }
     return encodedPages;
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index cb39bb9..36e2bc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -83,6 +83,8 @@
       String compressor, boolean fullVectorFill) throws IOException {
     assert (encodings.size() >= 1);
     assert (encoderMetas.size() == 1);
+    boolean isComplexPrimitiveIntLengthEncoding =
+        encodings.contains(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY);
     Encoding encoding = encodings.get(0);
     byte[] encoderMeta = encoderMetas.get(0).array();
     ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
@@ -91,7 +93,10 @@
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
-      return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
+      DirectCompressCodec directCompressCodec =
+          new DirectCompressCodec(metadata.getStoreDataType());
+      directCompressCodec.setComplexPrimitiveIntLengthEncoding(isComplexPrimitiveIntLengthEncoding);
+      return directCompressCodec.createDecoder(metadata);
     } else if (encoding == ADAPTIVE_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.setFillCompleteVector(fullVectorFill);
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 36f1e64..1400f25 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -128,7 +128,7 @@
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false, false);
         return LazyColumnPage.newPage(page, converter);
       }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index d0bbedb..7b1c12d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -139,7 +139,7 @@
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
           page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
-          page = ColumnPage.decompress(meta, input, offset, length, false);
+          page = ColumnPage.decompress(meta, input, offset, length, false, false);
         }
         return LazyColumnPage.newPage(page, converter);
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 64a0ebf..cf9b739 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -116,7 +116,7 @@
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false, false);
         return LazyColumnPage.newPage(page, converter);
       }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 5651368..4638652 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -116,7 +116,7 @@
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
           page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
-          page = ColumnPage.decompress(meta, input, offset, length, false);
+          page = ColumnPage.decompress(meta, input, offset, length, false, false);
         }
         return LazyColumnPage.newPage(page, converter);
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index a378988..9683cb8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -58,6 +58,12 @@
     this.dataType = dataType;
   }
 
+  boolean isComplexPrimitiveIntLengthEncoding = false;
+
+  public void setComplexPrimitiveIntLengthEncoding(boolean complexPrimitiveIntLengthEncoding) {
+    isComplexPrimitiveIntLengthEncoding = complexPrimitiveIntLengthEncoding;
+  }
+
   @Override
   public String getName() {
     return "DirectCompressCodec";
@@ -102,7 +108,8 @@
         if (DataTypes.isDecimal(dataType)) {
           decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
-          decodedPage = ColumnPage.decompress(meta, input, offset, length, false);
+          decodedPage = ColumnPage
+              .decompress(meta, input, offset, length, false, isComplexPrimitiveIntLengthEncoding);
         }
         return LazyColumnPage.newPage(decodedPage, converter);
       }
@@ -150,8 +157,9 @@
       @Override
       public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
-        return LazyColumnPage
-            .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+        return LazyColumnPage.newPage(ColumnPage
+            .decompress(meta, input, offset, length, isLVEncoded,
+                isComplexPrimitiveIntLengthEncoding), converter);
       }
     };
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 9fd25d6..b7fabe2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -120,6 +120,8 @@
         return org.apache.carbondata.format.Encoding.BIT_PACKED;
       case DIRECT_DICTIONARY:
         return org.apache.carbondata.format.Encoding.DIRECT_DICTIONARY;
+      case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY:
+        return org.apache.carbondata.format.Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
       default:
         return org.apache.carbondata.format.Encoding.DICTIONARY;
     }
@@ -457,6 +459,8 @@
         return Encoding.DIRECT_COMPRESS_VARCHAR;
       case BIT_PACKED:
         return Encoding.BIT_PACKED;
+      case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY:
+        return Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
       case DIRECT_DICTIONARY:
         return Encoding.DIRECT_DICTIONARY;
       default:
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
index 6e32c89..30f83a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
@@ -38,7 +38,8 @@
   ADAPTIVE_FLOATING,
   BOOL_BYTE,
   ADAPTIVE_DELTA_FLOATING,
-  DIRECT_COMPRESS_VARCHAR;
+  DIRECT_COMPRESS_VARCHAR,
+  INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
 
   public static Encoding valueOf(int ordinal) {
     if (ordinal == DICTIONARY.ordinal()) {
@@ -73,6 +74,8 @@
       return ADAPTIVE_DELTA_FLOATING;
     } else if (ordinal == DIRECT_COMPRESS_VARCHAR.ordinal()) {
       return DIRECT_COMPRESS_VARCHAR;
+    } else if (ordinal == INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY.ordinal()) {
+      return INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
     } else {
       throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index b275a27..7d25fe7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -107,7 +107,11 @@
     byte[] currentVal =
         copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber);
     if (!this.isDictionary && !this.isDirectDictionary) {
-      dataOutputStream.writeShort(currentVal.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        dataOutputStream.writeInt(currentVal.length);
+      } else {
+        dataOutputStream.writeShort(currentVal.length);
+      }
     }
     dataOutputStream.write(currentVal);
   }
@@ -158,7 +162,12 @@
       actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
     } else if (!isDictionary) {
       if (size == -1) {
-        size = dataBuffer.getShort();
+        if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+          size = dataBuffer.getInt();
+        } else {
+          size = dataBuffer.getShort();
+        }
+
       }
       byte[] value = new byte[size];
       dataBuffer.get(value, 0, size);
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b3c7093..babbafa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3225,6 +3225,7 @@
         case ADAPTIVE_DELTA_INTEGRAL:
         case ADAPTIVE_FLOATING:
         case ADAPTIVE_DELTA_FLOATING:
+        case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY:
           return true;
       }
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 8422c78..0f8f5ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -1125,4 +1125,18 @@
     return false;
   }
 
+  /**
+   * utility function to check complex column child columns that can exceed 32000 length
+   *
+   * @param dataType
+   * @return
+   */
+  public static boolean isByteArrayComplexChildColumn(DataType dataType) {
+    return ((dataType == DataTypes.STRING) ||
+        (dataType == DataTypes.VARCHAR) ||
+        (dataType == DataTypes.BINARY) ||
+        (dataType == DataTypes.DATE) ||
+        DataTypes.isDecimal(dataType) ||
+        (dataType == DataTypes.BYTE_ARRAY));
+  }
 }
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index ca4bbad..f4aa9b7 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -62,6 +62,7 @@
 	BOOL_BYTE = 12;   // Identifies that a column is encoded using BooleanPageCodec
 	ADAPTIVE_DELTA_FLOATING = 13; // Identifies that a column is encoded using AdaptiveDeltaFloatingCodec
 	DIRECT_COMPRESS_VARCHAR = 14;  // Identifies that a columm is encoded using DirectCompressCodec, it is used for long string columns
+	INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY = 15;  // Identifies that a complex column child stored as INT length or SHORT length
 }
 
 // Only NATIVE_HIVE is supported, others are deprecated since CarbonData 2.0
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index 32a5d92..c673e56 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -21,6 +21,7 @@
 
 import scala.collection.mutable
 
+import org.apache.commons.lang3.RandomStringUtils
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -39,6 +40,8 @@
   val badRecordAction = CarbonProperties.getInstance()
     .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)
 
+  val hugeBinary = RandomStringUtils.randomAlphabetic(33000)
+
   override def beforeAll(): Unit = {
     sql("DROP TABLE IF EXISTS table1")
     sql("DROP TABLE IF EXISTS test")
@@ -969,6 +972,16 @@
     sql("drop table if exists hive_table")
   }
 
+  test("test array of huge binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField array<binary>, autoLabel boolean) stored by 'carbondata'")
+    sql(s"insert into carbon_table values(1,true,'abc',array('$hugeBinary'),false)")
+    val result = sql("SELECT binaryField[0] FROM carbon_table").collect()
+    assert(hugeBinary.equals(new String(result(0).get(0).asInstanceOf[Array[Byte]])))
+    sql("drop table if exists carbon_table")
+  }
+
   test("test struct of binary data type") {
     sql("drop table if exists carbon_table")
     sql("drop table if exists parquet_table")
@@ -982,7 +995,17 @@
     checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
       sql("SELECT binaryField.b FROM parquet_table"))
     sql("drop table if exists carbon_table")
-    sql("drop table if exists hive_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test struct of huge binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField struct<b:binary>, autoLabel boolean) stored as carbondata ")
+    sql(s"insert into carbon_table values(1,true,'abc',named_struct('b','$hugeBinary'),false)")
+    val result = sql("SELECT binaryField.b FROM carbon_table").collect()
+    assert(hugeBinary.equals(new String(result(0).get(0).asInstanceOf[Array[Byte]])))
+    sql("drop table if exists carbon_table")
   }
 
   test("test map of binary data type") {
@@ -1000,6 +1023,16 @@
     sql("drop table if exists hive_table")
   }
 
+  test("test map of huge binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("create table if not exists carbon_table(id int, label boolean, name string," +
+        "binaryField map<int, binary>, autoLabel boolean) stored by 'carbondata'")
+    sql(s"insert into carbon_table values(1,true,'abc',map(1,'$hugeBinary'),false)")
+    val result = sql("SELECT binaryField[1] FROM carbon_table").collect()
+    assert(hugeBinary.equals(new String(result(0).get(0).asInstanceOf[Array[Byte]])))
+    sql("drop table if exists carbon_table")
+  }
+
   test("test map of array and struct binary data type") {
     sql("drop table if exists carbon_table")
     sql("drop table if exists parquet_table")
@@ -1017,7 +1050,7 @@
       sql("SELECT binaryField1[1][1] FROM parquet_table"))
     checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
       sql("SELECT binaryField2[1].b FROM parquet_table"))
-    sql("drop table if exists hive_table")
+    sql("drop table if exists parquet_table")
     sql("drop table if exists carbon_table")
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index b7a4508..fdfe9e9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -387,17 +387,29 @@
 
   private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value)
       throws IOException {
-    dataOutputStream.writeShort(value.length);
+    if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+      dataOutputStream.writeInt(value.length);
+    } else {
+      dataOutputStream.writeShort(value.length);
+    }
     dataOutputStream.write(value);
   }
 
   private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
       throws IOException {
     if (this.carbonDimension.getDataType() == DataTypes.STRING) {
-      dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      } else {
+        dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      }
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
-      dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      } else {
+        dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      }
       dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
     }
     String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
@@ -422,8 +434,14 @@
       KeyGenerator[] generator)
       throws IOException, KeyGenException {
     if (!this.isDictionary) {
-      int sizeOfData = byteArrayInput.getShort();
-      dataOutputStream.writeShort(sizeOfData);
+      int sizeOfData;
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        sizeOfData = byteArrayInput.getInt();
+        dataOutputStream.writeInt(sizeOfData);
+      } else {
+        sizeOfData = byteArrayInput.getShort();
+        dataOutputStream.writeShort(sizeOfData);
+      }
       byte[] bb = new byte[sizeOfData];
       byteArrayInput.get(bb, 0, sizeOfData);
       dataOutputStream.write(bb);
@@ -465,7 +483,13 @@
   public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray,
       ByteBuffer inputArray) {
     if (!isDictionary) {
-      byte[] key = new byte[inputArray.getShort()];
+      int length;
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        length = inputArray.getInt();
+      } else {
+        length = inputArray.getShort();
+      }
+      byte[] key = new byte[length];
       inputArray.get(key);
       columnsArray.get(outputArrayIndex).add(key);
     } else {
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index 6f90155..3a64377 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -33,6 +33,7 @@
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1168,4 +1169,44 @@
     reader.close();
   }
 
+  @Test public void testHugeBinaryWithComplexType()
+      throws IOException, InvalidLoadOptionException, InterruptedException {
+    int num = 1;
+    int rows = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[2];
+    fields[0] = new Field("arrayField", DataTypes.createArrayType(DataTypes.BINARY));
+    ArrayList<StructField> structFields = new ArrayList<>();
+    structFields.add(new StructField("b", DataTypes.BINARY));
+    fields[1] = new Field("structField", DataTypes.createStructType(structFields));
+
+    String description = RandomStringUtils.randomAlphabetic(33000);
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields))
+          .writtenBy("BinaryExample").withPageSizeInMb(5).build();
+
+      for (int i = 0; i < rows; i++) {
+        // write data
+        writer.write(new String[] { description, description });
+      }
+      writer.close();
+    }
+    CarbonReader reader = CarbonReader.builder(path, "_temp").build();
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Object[] arrayResult = (Object[]) row[0];
+      Object[] structResult = (Object[]) row[1];
+      assert (new String((byte[]) arrayResult[0]).equalsIgnoreCase(description));
+      assert (new String((byte[]) structResult[0]).equalsIgnoreCase(description));
+    }
+    reader.close();
+  }
+
 }