[CARBONDATA-3655] Support set base64 string as struct<binary> field value
This closes #3564
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index ac39bd0..db88cd4 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -32,6 +32,7 @@
import org.apache.carbondata.core.statusmanager.StageInput;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
import org.apache.carbon.core.metadata.StageManager;
@@ -51,6 +52,7 @@
) {
super(factory, identifier, table);
final Properties writerProperties = factory.getConfiguration().getWriterProperties();
+ final Properties carbonProperties = factory.getConfiguration().getCarbonProperties();
final String commitThreshold =
writerProperties.getProperty(CarbonLocalProperty.COMMIT_THRESHOLD);
this.writerFactory = new WriterFactory(table, writePath) {
@@ -58,12 +60,22 @@
protected org.apache.carbondata.sdk.file.CarbonWriter newWriter(
final Object[] row) {
try {
- return org.apache.carbondata.sdk.file.CarbonWriter.builder()
+ final CarbonWriterBuilder writerBuilder =
+ org.apache.carbondata.sdk.file.CarbonWriter.builder()
.outputPath(super.getWritePath(row))
.writtenBy("flink")
.withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath()))
- .withCsvInput()
- .build();
+ .withCsvInput();
+ for (String propertyName : carbonProperties.stringPropertyNames()) {
+ try {
+ writerBuilder.withLoadOption(propertyName,
+ carbonProperties.getProperty(propertyName));
+ } catch (IllegalArgumentException exception) {
+ LOGGER.warn("Fail to set load option [" + propertyName + "], may be unsupported.",
+ exception);
+ }
+ }
+ return writerBuilder.build();
} catch (IOException | InvalidLoadOptionException exception) {
// TODO
throw new UnsupportedOperationException(exception);
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 1d3ec6b..ecae32a 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -33,6 +33,7 @@
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
import org.apache.carbon.core.metadata.StageManager;
@@ -54,6 +55,7 @@
) {
super(factory, identifier, table);
final Properties writerProperties = factory.getConfiguration().getWriterProperties();
+ final Properties carbonProperties = factory.getConfiguration().getCarbonProperties();
final String commitThreshold =
writerProperties.getProperty(CarbonS3Property.COMMIT_THRESHOLD);
this.writerFactory = new WriterFactory(table, writePath) {
@@ -61,13 +63,23 @@
protected org.apache.carbondata.sdk.file.CarbonWriter newWriter(
final Object[] row) {
try {
- return org.apache.carbondata.sdk.file.CarbonWriter.builder()
+ final CarbonWriterBuilder writerBuilder =
+ org.apache.carbondata.sdk.file.CarbonWriter.builder()
.outputPath(super.getWritePath(row))
.writtenBy("flink")
.withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath()))
.withCsvInput()
- .withHadoopConf(configuration)
- .build();
+ .withHadoopConf(configuration);
+ for (String propertyName : carbonProperties.stringPropertyNames()) {
+ try {
+ writerBuilder.withLoadOption(propertyName,
+ carbonProperties.getProperty(propertyName));
+ } catch (IllegalArgumentException exception) {
+ LOGGER.warn("Fail to set load option [" + propertyName + "], may be unsupported.",
+ exception);
+ }
+ }
+ return writerBuilder.build();
} catch (IOException | InvalidLoadOptionException exception) {
// TODO
throw new UnsupportedOperationException(exception);
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 447e83e..fe2fa38 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -19,9 +19,10 @@
import java.io.{File, InputStreamReader}
import java.util
-import java.util.{Collections, Properties}
+import java.util.{Base64, Collections, Properties}
import com.google.gson.Gson
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.statusmanager.StageInput
@@ -34,9 +35,10 @@
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Test
-
import scala.collection.JavaConverters._
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+
class TestCarbonPartitionWriter extends QueryTest {
val tableName = "test_flink_partition"
@@ -168,7 +170,7 @@
data(1) = index.asInstanceOf[AnyRef]
data(2) = 12345.asInstanceOf[AnyRef]
data(3) = "test\0011\0012"
- data(4) = "test"
+ data(4) = Base64.getEncoder.encodeToString(Array[Byte](2, 3, 4))
data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
data(6) = "20191218"
data
@@ -212,6 +214,10 @@
checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+ val rows = sql(s"select * from $tableName limit 1").collect()
+ assertResult(1)(rows.length)
+ assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
+
} finally {
sql(s"drop table if exists $tableName").collect()
delDir(new File(dataPath))
@@ -237,6 +243,7 @@
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
+ properties.setProperty("binary_decoder", "base64")
properties
}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 9b40a5f..649741f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -325,7 +325,7 @@
try {
sql("DROP TABLE IF EXISTS carbon_table")
val rdd = spark.sparkContext.parallelize(1 to 3)
- .map(x => Row("a" + x % 10, "b", x, "YWJj".getBytes()))
+ .map(x => Row("a" + x % 10, "b", x, "abc".getBytes()))
val customSchema = StructType(Array(
StructField("c1", StringType),
StructField("c2", StringType),
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 3b36bf2..d4781bf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -44,7 +44,6 @@
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
-import org.apache.carbondata.core.metadata.ColumnIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -74,7 +73,8 @@
level: Int = 0): String = {
try {
FieldConverter.objectToString(row.get(idx), serializationNullFormat, complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, isComplexType, level)
+ timeStampFormat, dateFormat, isVarcharType, isComplexType, level,
+ carbonLoadModel.getBinaryDecoder)
} catch {
case e: Exception =>
if (e.getMessage.startsWith(FieldConverter.stringLengthExceedErrorMsg)) {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 24e7765..0cbccdd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -325,7 +325,8 @@
tableName = table.getTableName,
factPathFromUser = null,
dimFilesPath = Seq(),
- options = scala.collection.immutable.Map("fileheader" -> header),
+ options = scala.collection.immutable.Map("fileheader" -> header,
+ "binary_decoder" -> "base64"),
isOverwriteTable = false,
inputSqlString = null,
dataFrame = Some(selectedDataFrame),
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index fdfe9e9..23e9322 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -44,6 +44,7 @@
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -105,6 +106,8 @@
private DataType dataType;
+ private BinaryDecoder binaryDecoder;
+
private PrimitiveDataType(int outputArrayIndex, int dataCounter) {
this.outputArrayIndex = outputArrayIndex;
this.dataCounter = dataCounter;
@@ -137,13 +140,14 @@
* @param nullFormat
*/
public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String columnId,
- CarbonDimension carbonDimension, String nullFormat) {
+ CarbonDimension carbonDimension, String nullFormat, BinaryDecoder binaryDecoder) {
this.name = carbonColumn.getColName();
this.parentName = parentName;
this.columnId = columnId;
this.carbonDimension = carbonDimension;
this.isDictionary = isDictionaryDimension(carbonDimension);
this.nullFormat = nullFormat;
+ this.binaryDecoder = binaryDecoder;
this.dataType = carbonColumn.getDataType();
if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
@@ -334,8 +338,12 @@
value = ByteUtil.toXorBytes(Long.parseLong(parsedValue));
}
} else if (this.carbonDimension.getDataType().equals(DataTypes.BINARY)) {
- value = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input,
- this.carbonDimension.getDataType());
+ if (binaryDecoder == null) {
+ value = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input,
+ this.carbonDimension.getDataType());
+ } else {
+ value = binaryDecoder.decode(parsedValue);
+ }
} else {
value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
this.carbonDimension.getDataType(), dateFormat);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 0b43a5e..2f089bf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -76,22 +76,10 @@
return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
isEmptyBadRecord);
} else if (dataField.getColumn().isComplex()) {
- return new ComplexFieldConverterImpl(createComplexDataType(dataField, nullFormat), index);
+ return new ComplexFieldConverterImpl(
+ createComplexDataType(dataField, nullFormat, getBinaryDecoder(binaryDecoder)), index);
} else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
- BinaryDecoder binaryDecoderObject = null;
- if (binaryDecoder.equalsIgnoreCase(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64)) {
- binaryDecoderObject = new Base64BinaryDecoder();
- } else if (binaryDecoder.equalsIgnoreCase(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX)) {
- binaryDecoderObject = new HexBinaryDecoder();
- } else if (!StringUtils.isBlank(binaryDecoder)) {
- throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
- "Hex or no decode for string, don't support " + binaryDecoder);
- } else {
- binaryDecoderObject = new DefaultBinaryDecoder();
- }
-
+ BinaryDecoder binaryDecoderObject = getBinaryDecoder(binaryDecoder);
return new BinaryFieldConverterImpl(dataField, nullFormat,
index, isEmptyBadRecord, binaryDecoderObject);
} else {
@@ -109,11 +97,33 @@
}
}
+ private BinaryDecoder getBinaryDecoder(String binaryDecoder) {
+ BinaryDecoder binaryDecoderObject;
+ if (binaryDecoder == null) {
+ return null;
+ }
+ if (binaryDecoder.equalsIgnoreCase(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64)) {
+ binaryDecoderObject = new Base64BinaryDecoder();
+ } else if (binaryDecoder.equalsIgnoreCase(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX)) {
+ binaryDecoderObject = new HexBinaryDecoder();
+ } else if (!StringUtils.isBlank(binaryDecoder)) {
+ throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
+ "Hex or no decode for string, don't support " + binaryDecoder);
+ } else {
+ binaryDecoderObject = new DefaultBinaryDecoder();
+ }
+ return binaryDecoderObject;
+ }
+
/**
* Create parser for the carbon column.
*/
- public static GenericDataType createComplexDataType(DataField dataField, String nullFormat) {
- return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), nullFormat);
+ public static GenericDataType createComplexDataType(
+ DataField dataField, String nullFormat, BinaryDecoder binaryDecoder) {
+ return createComplexType(
+ dataField.getColumn(), dataField.getColumn().getColName(), nullFormat, binaryDecoder);
}
/**
@@ -123,7 +133,7 @@
*/
private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
- String nullFormat) {
+ String nullFormat, BinaryDecoder binaryDecoder) {
DataType dataType = carbonColumn.getDataType();
if (DataTypes.isArrayType(dataType) || DataTypes.isMapType(dataType)) {
List<CarbonDimension> listOfChildDimensions =
@@ -134,7 +144,7 @@
carbonColumn.hasEncoding(Encoding.DICTIONARY));
for (CarbonDimension dimension : listOfChildDimensions) {
arrayDataType.addChildren(
- createComplexType(dimension, carbonColumn.getColName(), nullFormat));
+ createComplexType(dimension, carbonColumn.getColName(), nullFormat, binaryDecoder));
}
return arrayDataType;
} else if (DataTypes.isStructType(dataType)) {
@@ -146,12 +156,12 @@
carbonColumn.hasEncoding(Encoding.DICTIONARY));
for (CarbonDimension dimension : dimensions) {
structDataType.addChildren(
- createComplexType(dimension, carbonColumn.getColName(), nullFormat));
+ createComplexType(dimension, carbonColumn.getColName(), nullFormat, binaryDecoder));
}
return structDataType;
} else {
return new PrimitiveDataType(carbonColumn, parentName, carbonColumn.getColumnId(),
- (CarbonDimension) carbonColumn, nullFormat);
+ (CarbonDimension) carbonColumn, nullFormat, binaryDecoder);
}
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 789d157..3a71845 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -112,7 +112,7 @@
if (srcDataField[i].getColumn().isComplex()) {
// create a ComplexDataType
dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(),
- FieldEncoderFactory.createComplexDataType(srcDataField[i], nullFormat));
+ FieldEncoderFactory.createComplexDataType(srcDataField[i], nullFormat, null));
}
}
}
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index e74c191..9393773 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -20,6 +20,7 @@
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util
+import java.util.Base64
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -44,7 +45,9 @@
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
isComplexType: Boolean = false,
- level: Int = 0): String = {
+ level: Int = 0,
+ binaryCodec: String
+ ): String = {
if (value == null) {
serializationNullFormat
} else {
@@ -64,8 +67,13 @@
case b: java.lang.Boolean => b.toString
case s: java.lang.Short => s.toString
case f: java.lang.Float => f.toString
- case bs: Array[Byte] => new String(bs,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+ case bs: Array[Byte] =>
+ if ("base64".equalsIgnoreCase(binaryCodec)) {
+ // Insert flow is inner flow, the inner binary codec fixed with base64 unify.
+ Base64.getEncoder.encodeToString(bs)
+ } else {
+ new String(bs, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+ }
case s: scala.collection.Seq[Any] =>
if (s.nonEmpty) {
val delimiter = complexDelimiters.get(level)
@@ -73,7 +81,8 @@
s.foreach { x =>
val nextLevel = level + 1
builder.append(objectToString(x, serializationNullFormat, complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+ binaryCodec = binaryCodec))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
@@ -90,10 +99,12 @@
val builder = new StringBuilder()
m.foreach { x =>
builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+ binaryCodec = binaryCodec))
.append(keyValueDelimiter)
builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+ binaryCodec = binaryCodec))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
@@ -108,7 +119,8 @@
while (i < len) {
val nextLevel = level + 1
builder.append(objectToString(r(i), serializationNullFormat, complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel,
+ binaryCodec = binaryCodec))
.append(delimiter)
i += 1
}
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 16e7258..03ca09e 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -69,7 +69,8 @@
FieldConverter.objectToString(
x, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat,
- isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i))
+ isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i),
+ binaryCodec = null)
} }.toArray
override def close(): Unit = {