[HOTFIX] Fix InsertFromStage complex data type issue for partition table
Why is this PR needed?
CarbonInsertFromStageCommand with complex data type are not working fine.
What changes were proposed in this PR?
For the partition table, the complex data type of target table should be converted to the binary data type.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3556
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c07f08b..5471420 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -75,7 +75,7 @@
/**
* DataType converter for different computing engines
*/
- private static final ThreadLocal<DataTypeConverter> converter = new ThreadLocal<>();
+ private static DataTypeConverter converter;
/**
* This method will convert a given value to its specific type
@@ -84,8 +84,8 @@
* @param dataType
* @return
*/
- public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
- int scale, int precision) {
+ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType, int scale,
+ int precision) {
return getMeasureValueBasedOnDataType(msrValue, dataType, scale, precision, false);
}
@@ -105,7 +105,7 @@
new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
if (useConverter) {
- return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+ return converter.convertFromBigDecimalToDecimal(decimal);
} else {
return decimal;
}
@@ -140,11 +140,10 @@
if (dataType == DataTypes.BOOLEAN) {
return BooleanConvert.parseBoolean(dimValue);
} else if (DataTypes.isDecimal(dataType)) {
- BigDecimal bigDecimal =
- new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
+ BigDecimal bigDecimal = new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
if (useConverter) {
- return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+ return converter.convertFromBigDecimalToDecimal(decimal);
} else {
return decimal;
}
@@ -457,7 +456,7 @@
}
} else {
// Default action for String/Varchar
- return getDataTypeConverter().convertFromStringToUTF8String(dimensionValue);
+ return converter.convertFromStringToUTF8String(dimensionValue);
}
}
@@ -518,7 +517,7 @@
} else if (actualDataType == DataTypes.LONG) {
return ByteUtil.toXorBytes((Long) dimensionValue);
} else if (actualDataType == DataTypes.TIMESTAMP) {
- return ByteUtil.toXorBytes((Long)dimensionValue);
+ return ByteUtil.toXorBytes((Long) dimensionValue);
} else {
// Default action for String/Varchar
return ByteUtil.toBytes(dimensionValue.toString());
@@ -970,11 +969,12 @@
/**
* set the data type converter as per computing engine
+ *
* @param converterLocal
*/
public static void setDataTypeConverter(DataTypeConverter converterLocal) {
if (converterLocal != null) {
- converter.set(converterLocal);
+ converter = converterLocal;
timeStampformatter.remove();
dateformatter.remove();
}
@@ -989,17 +989,10 @@
}
public static DataTypeConverter getDataTypeConverter() {
- DataTypeConverter dataTypeConverter = converter.get();
- if (dataTypeConverter == null) {
- synchronized (converter) {
- dataTypeConverter = converter.get();
- if (dataTypeConverter == null) {
- dataTypeConverter = new DataTypeConverterImpl();
- converter.set(dataTypeConverter);
- }
- }
+ if (converter == null) {
+ converter = new DataTypeConverterImpl();
}
- return dataTypeConverter;
+ return converter;
}
public static DataType valueOf(String name) {
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index c24c3bf..ac39bd0 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -113,6 +113,9 @@
synchronized (this) {
if (!this.flushed) {
this.closeWriters();
+ this.commit();
+ this.writerFactory.reset();
+ this.writeCount.set(0);
this.flushed = true;
}
}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 0c8ccbd..1d3ec6b 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -120,6 +120,9 @@
synchronized (this) {
if (!this.flushed) {
this.closeWriters();
+ this.commit();
+ this.writerFactory.reset();
+ this.writeCount.set(0);
this.flushed = true;
}
}
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index cc3c4b4..447e83e 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -71,11 +71,11 @@
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
- val dataCount = 10000
+ val dataCount = 1000
val source = new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
- val data = new Array[AnyRef](5)
+ val data = new Array[AnyRef](7)
data(0) = "test" + index
data(1) = index.asInstanceOf[AnyRef]
data(2) = 12345.asInstanceOf[AnyRef]
@@ -86,7 +86,7 @@
@throws[InterruptedException]
override def onFinish(): Unit = {
- Thread.sleep(30000L)
+ Thread.sleep(5000L)
}
}
val stream = environment.addSource(source)
@@ -118,18 +118,99 @@
assertResult(false)(FileFactory
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
- // ensure the carbon data file count in data directory
- // is same of the data file count which stage files recorded.
- assertResult(true)(FileFactory.getCarbonFile(dataLocation).listFiles().length ==
- collectStageInputs(CarbonTablePath.getStageDir(tablePath)).map(
- stageInput =>
- stageInput.getLocations.asScala.map(location => location.getFiles.size()).sum
- ).sum
+ sql(s"INSERT INTO $tableName STAGE")
+
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+
+ } finally {
+ sql(s"drop table if exists $tableName").collect()
+ delDir(new File(dataPath))
+ }
+ }
+
+ @Test
+ def testComplexType(): Unit = {
+ sql(s"drop table if exists $tableName").collect()
+ sql(
+ s"""
+ | CREATE TABLE $tableName (stringField string, intField int, shortField short,
+ | structField struct<value1:string,value2:int,value3:int>, binaryField struct<value1:binary>)
+ | STORED AS carbondata
+ | PARTITIONED BY (hour_ string, date_ string)
+ | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin
+ ).collect()
+
+ val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+ val dataTempPath = rootPath + "/data/temp/"
+ val dataPath = rootPath + "/data/"
+ delDir(new File(dataPath))
+ new File(dataPath).mkdir()
+
+ try {
+ val tablePath = storeLocation + "/" + tableName + "/"
+
+ val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation)
+ val carbonProperties = newCarbonProperties(storeLocation)
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(6)
+ environment.enableCheckpointing(2000L)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ val dataCount = 1000
+ val source = new TestSource(dataCount) {
+ @throws[InterruptedException]
+ override def get(index: Int): Array[AnyRef] = {
+ val data = new Array[AnyRef](7)
+ data(0) = "test" + index
+ data(1) = index.asInstanceOf[AnyRef]
+ data(2) = 12345.asInstanceOf[AnyRef]
+ data(3) = "test\0011\0012"
+ data(4) = "test"
+ data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
+ data(6) = "20191218"
+ data
+ }
+
+ @throws[InterruptedException]
+ override def onFinish(): Unit = {
+ Thread.sleep(5000L)
+ }
+ }
+ val stream = environment.addSource(source)
+ val factory = CarbonWriterFactory.builder("Local").build(
+ "default",
+ tableName,
+ tablePath,
+ new Properties,
+ writerProperties,
+ carbonProperties
)
+ val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+ stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
+ override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
+ }).addSink(streamSink)
+
+ try environment.execute
+ catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
+
+ val dataLocation = dataPath + "default" + CarbonCommonConstants.FILE_SEPARATOR +
+ tableName + CarbonCommonConstants.FILE_SEPARATOR
+
+ assertResult(true)(FileFactory.isFileExist(dataLocation))
+ assertResult(false)(FileFactory
+ .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
} finally {
sql(s"drop table if exists $tableName").collect()
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 67c7bab..9195863 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -64,7 +64,7 @@
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
- val dataCount = 10000
+ val dataCount = 1000
val source = new TestSource(dataCount) {
@throws[InterruptedException]
override def get(index: Int): Array[AnyRef] = {
@@ -103,7 +103,7 @@
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
// ensure the stage snapshot file and all stage files are deleted
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
@@ -116,9 +116,9 @@
}
private def newWriterProperties(
- dataTempPath: String,
- dataPath: String,
- storeLocation: String) = {
+ dataTempPath: String,
+ dataPath: String,
+ storeLocation: String) = {
val properties = new Properties
properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index f7b8668..bb5e946 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -26,15 +26,17 @@
import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -99,7 +101,7 @@
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
convertStepRowCounter)
- }.filter(_ != null)// Filter the bad record
+ }.filter(_ != null) // Filter the bad record
// 3. Sort
val configuration = DataLoadProcessBuilder.createConfiguration(model)
@@ -269,7 +271,7 @@
val configuration = DataLoadProcessBuilder.createConfiguration(model)
val header = configuration.getHeader
val rangeColumn = model.getRangePartitionColumn
- val rangeColumnIndex = (0 until header.length).find{
+ val rangeColumnIndex = (0 until header.length).find {
index =>
header(index).equalsIgnoreCase(rangeColumn.getColName)
}.get
@@ -427,7 +429,7 @@
.map(_.getColName)
.toArray
val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
- val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+ val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
sparkSession,
columnProjection = new CarbonProjection(columns),
null,
@@ -436,13 +438,13 @@
carbonTable.getTableInfo,
new CarbonInputMetrics,
null,
- null,
+ classOf[SparkDataTypeConverterImpl],
classOf[CarbonRowReadSupport],
splits.asJava)
.map { row =>
- new GenericRow(row.getData.asInstanceOf[Array[Any]])
+ new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
}
- sparkSession.createDataFrame(rdd, schema)
+ SparkSQLUtil.execute(rdd, schema, sparkSession)
}
}
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 8f39f9b..13e7c45 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -22,13 +22,17 @@
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EmptyRule
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
object SparkSQLUtil {
@@ -38,6 +42,10 @@
Dataset.ofRows(sparkSession, logicalPlan)
}
+ def execute(rdd: RDD[InternalRow], schema: StructType, sparkSession: SparkSession): DataFrame = {
+ execute(LogicalRDD(schema.toAttributes, rdd)(sparkSession), sparkSession)
+ }
+
def getSparkSession: SparkSession = {
SparkSession.getDefaultSession.get
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index eb63d03..24e7765 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -32,12 +32,12 @@
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -85,7 +85,6 @@
val tablePath = table.getTablePath
val stagePath = CarbonTablePath.getStageDir(tablePath)
val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
- var loadModel: CarbonLoadModel = null
val lock = acquireIngestLock(table)
try {
@@ -133,44 +132,21 @@
val executorService = Executors.newFixedThreadPool(numThreads)
val stageInputs = collectStageInputs(executorService, stageFiles)
- // 3) add new segment with INSERT_IN_PROGRESS into table status
- loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
- CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
-
- // 4) write all existing stage file names and segmentId into a new snapshot file
- // The content of snapshot file is: first line is segmentId, followed by each line is
- // one stage file name
- val content =
- (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
- FileFactory.writeFile(content, snapshotFilePath)
-
- // 5) perform data loading
+ // 3) perform data loading
if (table.isHivePartitionTable) {
- startLoadingWithPartition(spark, table, loadModel, stageInputs)
+ startLoadingWithPartition(spark, table, stageInputs, stageFiles, snapshotFilePath)
} else {
- startLoading(spark, table, loadModel, stageInputs)
+ startLoading(spark, table, stageInputs, stageFiles, snapshotFilePath)
}
- // 6) write segment file and update the segment entry to SUCCESS
- val segmentFileName = SegmentFileStore.writeSegmentFile(
- table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
- SegmentFileStore.updateTableStatusFile(
- table, loadModel.getSegmentId, segmentFileName,
- table.getCarbonTableIdentifier.getTableId,
- new SegmentFileStore(table.getTablePath, segmentFileName),
- SegmentStatus.SUCCESS)
-
- // 7) delete stage files
+ // 4) delete stage files
deleteStageFiles(executorService, stageFiles)
- // 8) delete the snapshot file
+ // 5) delete the snapshot file
FileFactory.getCarbonFile(snapshotFilePath).delete()
} catch {
case ex: Throwable =>
LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
- if (loadModel != null) {
- CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
- }
throw ex
} finally {
lock.unlock()
@@ -266,24 +242,55 @@
private def startLoading(
spark: SparkSession,
table: CarbonTable,
- loadModel: CarbonLoadModel,
- stageInput: Seq[StageInput]
+ stageInput: Seq[StageInput],
+ stageFiles: Array[(CarbonFile, CarbonFile)],
+ snapshotFilePath: String
): Unit = {
- val splits = stageInput.flatMap(_.createSplits().asScala)
- LOGGER.info(s"start to load ${splits.size} files into " +
- s"${table.getDatabaseName}.${table.getTableName}")
- val start = System.currentTimeMillis()
- val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
- DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
- spark,
- Option(dataFrame),
- loadModel,
- SparkSQLUtil.sessionState(spark).newHadoopConf()
- ).map { row =>
- (row._1, FailureCauses.NONE == row._2._2.failureCauses)
- }
+ var loadModel: CarbonLoadModel = null
+ try {
+ // 1) add new segment with INSERT_IN_PROGRESS into table status
+ loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
+ CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
- LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
+ // 2) write all existing stage file names and segmentId into a new snapshot file
+ // The content of snapshot file is: first line is segmentId, followed by each line is
+ // one stage file name
+ val content =
+ (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
+ FileFactory.writeFile(content, snapshotFilePath)
+
+ // 3) do loading.
+ val splits = stageInput.flatMap(_.createSplits().asScala)
+ LOGGER.info(s"start to load ${splits.size} files into " +
+ s"${table.getDatabaseName}.${table.getTableName}")
+ val start = System.currentTimeMillis()
+ val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+ spark,
+ Option(dataFrame),
+ loadModel,
+ SparkSQLUtil.sessionState(spark).newHadoopConf()
+ ).map { row =>
+ (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+ }
+ LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
+
+ // 4) write segment file and update the segment entry to SUCCESS
+ val segmentFileName = SegmentFileStore.writeSegmentFile(
+ table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
+ SegmentFileStore.updateTableStatusFile(
+ table, loadModel.getSegmentId, segmentFileName,
+ table.getCarbonTableIdentifier.getTableId,
+ new SegmentFileStore(table.getTablePath, segmentFileName),
+ SegmentStatus.SUCCESS)
+ } catch {
+ case ex: Throwable =>
+ LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
+ if (loadModel != null) {
+ CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
+ }
+ throw ex
+ }
}
/**
@@ -292,15 +299,18 @@
private def startLoadingWithPartition(
spark: SparkSession,
table: CarbonTable,
- loadModel: CarbonLoadModel,
- stageInput: Seq[StageInput]
+ stageInput: Seq[StageInput],
+ stageFiles: Array[(CarbonFile, CarbonFile)],
+ snapshotFilePath: String
): Unit = {
val partitionDataList = listPartitionFiles(stageInput)
+
+ val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
+ FileFactory.writeFile(content, snapshotFilePath)
+
val start = System.currentTimeMillis()
- var index = 0
partitionDataList.map {
case (partition, splits) =>
- index = index + 1
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}. " +
s"Partition information: ${partition.mkString(",")}")
@@ -484,22 +494,20 @@
.map(_.getColName)
.toArray
val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
- val rdd: RDD[Row] = new CarbonScanRDD[InternalRow](
- sparkSession,
- columnProjection = new CarbonProjection(columns),
- null,
- carbonTable.getAbsoluteTableIdentifier,
- carbonTable.getTableInfo.serialize,
- carbonTable.getTableInfo,
- new CarbonInputMetrics,
- null,
- null,
- classOf[SparkRowReadSupportImpl],
- splits.asJava
- ).map { row =>
- new GenericRow(row.toSeq(schema).toArray)
- }
- sparkSession.createDataFrame(rdd, schema)
+ val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
+ sparkSession,
+ columnProjection = new CarbonProjection(columns),
+ null,
+ carbonTable.getAbsoluteTableIdentifier,
+ carbonTable.getTableInfo.serialize,
+ carbonTable.getTableInfo,
+ new CarbonInputMetrics,
+ null,
+ classOf[SparkDataTypeConverterImpl],
+ classOf[SparkRowReadSupportImpl],
+ splits.asJava
+ )
+ SparkSQLUtil.execute(rdd, schema, sparkSession)
}
override protected def opName: String = "INSERT STAGE"
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0309e91..1334178 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -52,7 +52,6 @@
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -662,34 +661,14 @@
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+ val catalogAttributes = catalogTable.schema.toAttributes
// Converts the data as per the loading steps before give it to writer or sorter
- val convertedRdd = convertData(
+ val updatedRdd = convertData(
rdd,
sparkSession,
loadModel,
isDataFrame,
partitionValues)
- val updatedRdd = if (isDataFrame) {
- val columnCount = loadModel.getCsvHeaderColumns.length
- convertedRdd.map { row =>
- val array = new Array[AnyRef](columnCount)
- val data = row.getData
- var i = 0
- while (i < columnCount) {
- data(i) match {
- case string: String =>
- array(i) = UTF8String.fromString(string)
- case _ =>
- array(i) = data(i)
- }
- i = i + 1
- }
- array
- }.map(row => InternalRow.fromSeq(row))
- } else {
- convertedRdd.map(row => InternalRow.fromSeq(row.getData))
- }
- val catalogAttributes = catalogTable.schema.toAttributes
var attributes = curAttributes.map(a => {
catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
})
@@ -783,7 +762,7 @@
sparkSession: SparkSession,
model: CarbonLoadModel,
isDataFrame: Boolean,
- partitionValues: Array[String]): RDD[CarbonRow] = {
+ partitionValues: Array[String]): RDD[InternalRow] = {
val sc = sparkSession.sparkContext
val info =
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
@@ -827,7 +806,7 @@
partialSuccessAccum,
inputStepRowCounter,
keepActualData = true)
- }.filter(_ != null)
+ }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
finalRDD
}