[CARBONDATA-3655] Support set base64 string as struct<binary> field value

This closes #3564
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index ac39bd0..db88cd4 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -32,6 +32,7 @@
 import org.apache.carbondata.core.statusmanager.StageInput;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
 
 import org.apache.carbon.core.metadata.StageManager;
 
@@ -51,6 +52,7 @@
   ) {
     super(factory, identifier, table);
     final Properties writerProperties = factory.getConfiguration().getWriterProperties();
+    final Properties carbonProperties = factory.getConfiguration().getCarbonProperties();
     final String commitThreshold =
         writerProperties.getProperty(CarbonLocalProperty.COMMIT_THRESHOLD);
     this.writerFactory = new WriterFactory(table, writePath) {
@@ -58,12 +60,22 @@
       protected org.apache.carbondata.sdk.file.CarbonWriter newWriter(
           final Object[] row) {
         try {
-          return org.apache.carbondata.sdk.file.CarbonWriter.builder()
+          final CarbonWriterBuilder writerBuilder =
+              org.apache.carbondata.sdk.file.CarbonWriter.builder()
               .outputPath(super.getWritePath(row))
               .writtenBy("flink")
               .withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath()))
-              .withCsvInput()
-              .build();
+              .withCsvInput();
+          for (String propertyName : carbonProperties.stringPropertyNames()) {
+            try {
+              writerBuilder.withLoadOption(propertyName,
+                  carbonProperties.getProperty(propertyName));
+            } catch (IllegalArgumentException exception) {
+              LOGGER.warn("Fail to set load option [" + propertyName + "], may be unsupported.",
+                  exception);
+            }
+          }
+          return writerBuilder.build();
         } catch (IOException | InvalidLoadOptionException exception) {
           // TODO
           throw new UnsupportedOperationException(exception);
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 1d3ec6b..ecae32a 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -33,6 +33,7 @@
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
 
 import org.apache.carbon.core.metadata.StageManager;
 
@@ -54,6 +55,7 @@
   ) {
     super(factory, identifier, table);
     final Properties writerProperties = factory.getConfiguration().getWriterProperties();
+    final Properties carbonProperties = factory.getConfiguration().getCarbonProperties();
     final String commitThreshold =
         writerProperties.getProperty(CarbonS3Property.COMMIT_THRESHOLD);
     this.writerFactory = new WriterFactory(table, writePath) {
@@ -61,13 +63,23 @@
       protected org.apache.carbondata.sdk.file.CarbonWriter newWriter(
           final Object[] row) {
         try {
-          return org.apache.carbondata.sdk.file.CarbonWriter.builder()
+          final CarbonWriterBuilder writerBuilder =
+              org.apache.carbondata.sdk.file.CarbonWriter.builder()
               .outputPath(super.getWritePath(row))
               .writtenBy("flink")
               .withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath()))
               .withCsvInput()
-              .withHadoopConf(configuration)
-              .build();
+              .withHadoopConf(configuration);
+          for (String propertyName : carbonProperties.stringPropertyNames()) {
+            try {
+              writerBuilder.withLoadOption(propertyName,
+                  carbonProperties.getProperty(propertyName));
+            } catch (IllegalArgumentException exception) {
+              LOGGER.warn("Fail to set load option [" + propertyName + "], may be unsupported.",
+                  exception);
+            }
+          }
+          return writerBuilder.build();
         } catch (IOException | InvalidLoadOptionException exception) {
           // TODO
           throw new UnsupportedOperationException(exception);
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 447e83e..fe2fa38 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -19,9 +19,10 @@
 
 import java.io.{File, InputStreamReader}
 import java.util
-import java.util.{Collections, Properties}
+import java.util.{Base64, Collections, Properties}
 
 import com.google.gson.Gson
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.StageInput
@@ -34,9 +35,10 @@
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Test
-
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+
 class TestCarbonPartitionWriter extends QueryTest {
 
   val tableName = "test_flink_partition"
@@ -168,7 +170,7 @@
           data(1) = index.asInstanceOf[AnyRef]
           data(2) = 12345.asInstanceOf[AnyRef]
           data(3) = "test\0011\0012"
-          data(4) = "test"
+          data(4) = Base64.getEncoder.encodeToString(Array[Byte](2, 3, 4))
           data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
           data(6) = "20191218"
           data
@@ -212,6 +214,10 @@
 
       checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
 
+      val rows = sql(s"select * from $tableName limit 1").collect()
+      assertResult(1)(rows.length)
+      assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
+
     } finally {
       sql(s"drop table if exists $tableName").collect()
       delDir(new File(dataPath))
@@ -237,6 +243,7 @@
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
     properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
     properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
+    properties.setProperty("binary_decoder", "base64")
     properties
   }
 
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 9b40a5f..649741f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -325,7 +325,7 @@
     try {
       sql("DROP TABLE IF EXISTS carbon_table")
       val rdd = spark.sparkContext.parallelize(1 to 3)
-              .map(x => Row("a" + x % 10, "b", x, "YWJj".getBytes()))
+              .map(x => Row("a" + x % 10, "b", x, "abc".getBytes()))
       val customSchema = StructType(Array(
         StructField("c1", StringType),
         StructField("c2", StringType),
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 3b36bf2..d4781bf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -44,7 +44,6 @@
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
-import org.apache.carbondata.core.metadata.ColumnIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -74,7 +73,8 @@
       level: Int = 0): String = {
     try {
       FieldConverter.objectToString(row.get(idx), serializationNullFormat, complexDelimiters,
-        timeStampFormat, dateFormat, isVarcharType, isComplexType, level)
+        timeStampFormat, dateFormat, isVarcharType, isComplexType, level,
+        carbonLoadModel.getBinaryDecoder)
     } catch {
       case e: Exception =>
         if (e.getMessage.startsWith(FieldConverter.stringLengthExceedErrorMsg)) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 24e7765..0cbccdd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -325,7 +325,8 @@
           tableName = table.getTableName,
           factPathFromUser = null,
           dimFilesPath = Seq(),
-          options = scala.collection.immutable.Map("fileheader" -> header),
+          options = scala.collection.immutable.Map("fileheader" -> header,
+            "binary_decoder" -> "base64"),
           isOverwriteTable = false,
           inputSqlString = null,
           dataFrame = Some(selectedDataFrame),
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index fdfe9e9..23e9322 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -44,6 +44,7 @@
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
 import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -105,6 +106,8 @@
 
   private DataType dataType;
 
+  private BinaryDecoder binaryDecoder;
+
   private PrimitiveDataType(int outputArrayIndex, int dataCounter) {
     this.outputArrayIndex = outputArrayIndex;
     this.dataCounter = dataCounter;
@@ -137,13 +140,14 @@
    * @param nullFormat
    */
   public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String columnId,
-      CarbonDimension carbonDimension, String nullFormat) {
+      CarbonDimension carbonDimension, String nullFormat, BinaryDecoder binaryDecoder) {
     this.name = carbonColumn.getColName();
     this.parentName = parentName;
     this.columnId = columnId;
     this.carbonDimension = carbonDimension;
     this.isDictionary = isDictionaryDimension(carbonDimension);
     this.nullFormat = nullFormat;
+    this.binaryDecoder = binaryDecoder;
     this.dataType = carbonColumn.getDataType();
 
     if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
@@ -334,8 +338,12 @@
                   value = ByteUtil.toXorBytes(Long.parseLong(parsedValue));
                 }
               } else if (this.carbonDimension.getDataType().equals(DataTypes.BINARY)) {
-                value = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input,
-                    this.carbonDimension.getDataType());
+                if (binaryDecoder == null) {
+                  value = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input,
+                      this.carbonDimension.getDataType());
+                } else {
+                  value = binaryDecoder.decode(parsedValue);
+                }
               } else {
                 value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
                     this.carbonDimension.getDataType(), dateFormat);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 0b43a5e..2f089bf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -76,22 +76,10 @@
         return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
             isEmptyBadRecord);
       } else if (dataField.getColumn().isComplex()) {
-        return new ComplexFieldConverterImpl(createComplexDataType(dataField, nullFormat), index);
+        return new ComplexFieldConverterImpl(
+            createComplexDataType(dataField, nullFormat, getBinaryDecoder(binaryDecoder)), index);
       } else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
-        BinaryDecoder binaryDecoderObject = null;
-        if (binaryDecoder.equalsIgnoreCase(
-            CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64)) {
-          binaryDecoderObject = new Base64BinaryDecoder();
-        } else if (binaryDecoder.equalsIgnoreCase(
-            CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX)) {
-          binaryDecoderObject = new HexBinaryDecoder();
-        } else if (!StringUtils.isBlank(binaryDecoder)) {
-          throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
-              "Hex or no decode for string, don't support " + binaryDecoder);
-        } else {
-          binaryDecoderObject = new DefaultBinaryDecoder();
-        }
-
+        BinaryDecoder binaryDecoderObject = getBinaryDecoder(binaryDecoder);
         return new BinaryFieldConverterImpl(dataField, nullFormat,
             index, isEmptyBadRecord, binaryDecoderObject);
       } else {
@@ -109,11 +97,33 @@
     }
   }
 
+  private BinaryDecoder getBinaryDecoder(String binaryDecoder) {
+    BinaryDecoder binaryDecoderObject;
+    if (binaryDecoder == null) {
+      return null;
+    }
+    if (binaryDecoder.equalsIgnoreCase(
+        CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64)) {
+      binaryDecoderObject = new Base64BinaryDecoder();
+    } else if (binaryDecoder.equalsIgnoreCase(
+        CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX)) {
+      binaryDecoderObject = new HexBinaryDecoder();
+    } else if (!StringUtils.isBlank(binaryDecoder)) {
+      throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
+          "Hex or no decode for string, don't support " + binaryDecoder);
+    } else {
+      binaryDecoderObject = new DefaultBinaryDecoder();
+    }
+    return binaryDecoderObject;
+  }
+
   /**
    * Create parser for the carbon column.
    */
-  public static GenericDataType createComplexDataType(DataField dataField, String nullFormat) {
-    return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), nullFormat);
+  public static GenericDataType createComplexDataType(
+      DataField dataField, String nullFormat, BinaryDecoder binaryDecoder) {
+    return createComplexType(
+        dataField.getColumn(), dataField.getColumn().getColName(), nullFormat, binaryDecoder);
   }
 
   /**
@@ -123,7 +133,7 @@
    */
 
   private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
-      String nullFormat) {
+      String nullFormat, BinaryDecoder binaryDecoder) {
     DataType dataType = carbonColumn.getDataType();
     if (DataTypes.isArrayType(dataType) || DataTypes.isMapType(dataType)) {
       List<CarbonDimension> listOfChildDimensions =
@@ -134,7 +144,7 @@
               carbonColumn.hasEncoding(Encoding.DICTIONARY));
       for (CarbonDimension dimension : listOfChildDimensions) {
         arrayDataType.addChildren(
-            createComplexType(dimension, carbonColumn.getColName(), nullFormat));
+            createComplexType(dimension, carbonColumn.getColName(), nullFormat, binaryDecoder));
       }
       return arrayDataType;
     } else if (DataTypes.isStructType(dataType)) {
@@ -146,12 +156,12 @@
               carbonColumn.hasEncoding(Encoding.DICTIONARY));
       for (CarbonDimension dimension : dimensions) {
         structDataType.addChildren(
-            createComplexType(dimension, carbonColumn.getColName(), nullFormat));
+            createComplexType(dimension, carbonColumn.getColName(), nullFormat, binaryDecoder));
       }
       return structDataType;
     } else {
       return new PrimitiveDataType(carbonColumn, parentName, carbonColumn.getColumnId(),
-          (CarbonDimension) carbonColumn, nullFormat);
+          (CarbonDimension) carbonColumn, nullFormat, binaryDecoder);
     }
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 789d157..3a71845 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -112,7 +112,7 @@
       if (srcDataField[i].getColumn().isComplex()) {
         // create a ComplexDataType
         dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(),
-            FieldEncoderFactory.createComplexDataType(srcDataField[i], nullFormat));
+            FieldEncoderFactory.createComplexDataType(srcDataField[i], nullFormat, null));
       }
     }
   }
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index e74c191..9393773 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -20,6 +20,7 @@
 import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 import java.util
+import java.util.Base64
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
@@ -44,7 +45,9 @@
       dateFormat: SimpleDateFormat,
       isVarcharType: Boolean = false,
       isComplexType: Boolean = false,
-      level: Int = 0): String = {
+      level: Int = 0,
+      binaryCodec: String
+  ): String = {
     if (value == null) {
       serializationNullFormat
     } else {
@@ -64,8 +67,13 @@
         case b: java.lang.Boolean => b.toString
         case s: java.lang.Short => s.toString
         case f: java.lang.Float => f.toString
-        case bs: Array[Byte] => new String(bs,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+        case bs: Array[Byte] =>
+          if ("base64".equalsIgnoreCase(binaryCodec)) {
+            // Insert flow is inner flow, the inner binary codec fixed with base64 unify.
+            Base64.getEncoder.encodeToString(bs)
+          } else {
+            new String(bs, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+          }
         case s: scala.collection.Seq[Any] =>
           if (s.nonEmpty) {
             val delimiter = complexDelimiters.get(level)
@@ -73,7 +81,8 @@
             s.foreach { x =>
               val nextLevel = level + 1
               builder.append(objectToString(x, serializationNullFormat, complexDelimiters,
-                timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+                timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+                binaryCodec = binaryCodec))
                 .append(delimiter)
             }
             builder.substring(0, builder.length - delimiter.length())
@@ -90,10 +99,12 @@
             val builder = new StringBuilder()
             m.foreach { x =>
               builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters,
-                timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+                timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+                binaryCodec = binaryCodec))
                 .append(keyValueDelimiter)
               builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters,
-                timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+                timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+                binaryCodec = binaryCodec))
                 .append(delimiter)
             }
             builder.substring(0, builder.length - delimiter.length())
@@ -108,7 +119,8 @@
           while (i < len) {
             val nextLevel = level + 1
             builder.append(objectToString(r(i), serializationNullFormat, complexDelimiters,
-              timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+              timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+              binaryCodec = binaryCodec))
               .append(delimiter)
             i += 1
           }
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 16e7258..03ca09e 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -69,7 +69,8 @@
       FieldConverter.objectToString(
         x, serializationNullFormat, complexDelimiters,
         timeStampFormat, dateFormat,
-        isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i))
+        isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i),
+        binaryCodec = null)
     } }.toArray
 
   override def close(): Unit = {