blob: f17bd9156d6e6d4dba5152189af802ad1fc3e56f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.load
import scala.util.Random
import com.univocity.parsers.common.TextParsingException
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
object DataLoadProcessorStepOnSpark {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def toStringArrayRow(row: InternalRow, columnCount: Int): StringArrayRow = {
val outRow = new StringArrayRow(new Array[String](columnCount))
outRow.setValues(row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]])
}
def toRDDIterator(
rows: Iterator[Row],
modelBroadcast: Broadcast[CarbonLoadModel]): Iterator[Array[AnyRef]] = {
new Iterator[Array[AnyRef]] {
val iter = new NewRddIterator(rows, modelBroadcast.value, TaskContext.get())
override def hasNext: Boolean = iter.hasNext
override def next(): Array[AnyRef] = iter.next
}
}
def inputFunc(
rows: Iterator[Array[AnyRef]],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val rowParser = new RowParserImpl(conf.getDataFields, conf)
val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
wrapException(e, model)
}
new Iterator[CarbonRow] {
override def hasNext: Boolean = rows.hasNext
override def next(): CarbonRow = {
var row : CarbonRow = null
if(isRawDataRequired) {
val rawRow = rows.next()
row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
} else {
row = new CarbonRow(rowParser.parseRow(rows.next()))
}
rowCounter.add(1)
row
}
}
}
def inputAndconvertFunc(
rows: Iterator[Array[AnyRef]],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
partialSuccessAccum: Accumulator[Int],
rowCounter: Accumulator[Int],
keepActualData: Boolean = false): Iterator[CarbonRow] = {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val rowParser = new RowParserImpl(conf.getDataFields, conf)
val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
if (keepActualData) {
conf.getDataFields.foreach(_.setUseActualData(keepActualData))
}
val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger)
rowConverter.initialize()
TaskContext.get().addTaskCompletionListener { context =>
val hasBadRecord: Boolean = CarbonBadRecordUtil.hasBadRecord(model)
close(conf, badRecordLogger, rowConverter)
GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord)
}
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
val hasBadRecord : Boolean = CarbonBadRecordUtil.hasBadRecord(model)
close(conf, badRecordLogger, rowConverter)
GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord)
wrapException(e, model)
}
new Iterator[CarbonRow] {
override def hasNext: Boolean = rows.hasNext
override def next(): CarbonRow = {
var row : CarbonRow = null
if(isRawDataRequired) {
val rawRow = rows.next()
row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
} else {
row = new CarbonRow(rowParser.parseRow(rows.next()))
}
row = rowConverter.convert(row)
rowCounter.add(1)
row
}
}
}
def convertFunc(
rows: Iterator[CarbonRow],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
partialSuccessAccum: Accumulator[Int],
rowCounter: Accumulator[Int],
keepActualData: Boolean = false): Iterator[CarbonRow] = {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
if (keepActualData) {
conf.getDataFields.foreach(_.setUseActualData(keepActualData))
}
val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger)
rowConverter.initialize()
TaskContext.get().addTaskCompletionListener { context =>
val hasBadRecord: Boolean = CarbonBadRecordUtil.hasBadRecord(model)
close(conf, badRecordLogger, rowConverter)
GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord)
}
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
val hasBadRecord : Boolean = CarbonBadRecordUtil.hasBadRecord(model)
close(conf, badRecordLogger, rowConverter)
GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord)
wrapException(e, model)
}
new Iterator[CarbonRow] {
override def hasNext: Boolean = rows.hasNext
override def next(): CarbonRow = {
val row = rowConverter.convert(rows.next())
rowCounter.add(1)
row
}
}
}
def close(conf: CarbonDataLoadConfiguration,
badRecordLogger: BadRecordsLogger,
rowConverter: RowConverterImpl): Unit = {
if (badRecordLogger != null) {
badRecordLogger.closeStreams()
CarbonBadRecordUtil.renameBadRecord(conf)
}
if (rowConverter != null) {
rowConverter.finish()
}
}
def convertTo3Parts(
rows: Iterator[CarbonRow],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val sortParameters = SortParameters.createSortParameters(conf)
val sortStepRowHandler = new SortStepRowHandler(sortParameters)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
wrapException(e, model)
}
new Iterator[CarbonRow] {
override def hasNext: Boolean = rows.hasNext
override def next(): CarbonRow = {
val row =
new CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData))
rowCounter.add(1)
row
}
}
}
def writeFunc(
rows: Iterator[CarbonRow],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
rowCounter: Accumulator[Int],
conf: Broadcast[SerializableConfiguration]) {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
var model: CarbonLoadModel = null
var tableName: String = null
var rowConverter: RowConverterImpl = null
var dataWriter: DataWriterProcessorStepImpl = null
try {
model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val storeLocation = Array(getTempStoreLocation(index))
val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation)
tableName = model.getTableName
// When we use sortBy, it means we have 2 stages. Stage1 can't get the finder from Stage2
// directly because they are in different processes. We need to set cardinality finder in
// Stage1 again.
rowConverter = new RowConverterImpl(conf.getDataFields, conf, null)
rowConverter.initialize()
conf.setCardinalityFinder(rowConverter)
dataWriter = new DataWriterProcessorStepImpl(conf)
val dataHandlerModel = dataWriter.getDataHandlerModel
var dataHandler: CarbonFactHandler = null
var rowsNotExist = true
while (rows.hasNext) {
if (rowsNotExist) {
rowsNotExist = false
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel)
dataHandler.initialise()
}
val row = dataWriter.processRow(rows.next(), dataHandler)
rowCounter.add(1)
row
}
if (!rowsNotExist) {
dataWriter.finish(dataHandler)
}
} catch {
case e: CarbonDataWriterException =>
LOGGER.error(e, "Failed for table: " + tableName + " in Data Writer Step")
throw new CarbonDataLoadingException("Error while initializing data handler : " +
e.getMessage)
case e: Exception =>
LOGGER.error(e, "Failed for table: " + tableName + " in Data Writer Step")
throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage, e)
} finally {
if (rowConverter != null) {
rowConverter.finish()
}
// close the dataWriter once the write in done success or fail. if not closed then thread to
// to prints the rows processed in each step for every 10 seconds will never exit.
if(null != dataWriter) {
dataWriter.close()
}
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
}
}
private def getTempStoreLocation(index: Int): String = {
var storeLocation = ""
// this property is used to determine whether temp location for carbon is inside
// container temp dir or is yarn application directory.
val carbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false")
if (carbonUseLocalDir.equalsIgnoreCase("true")) {
val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != storeLocations && storeLocations.nonEmpty) {
storeLocation = storeLocations(Random.nextInt(storeLocations.length))
}
if (storeLocation == null) {
storeLocation = System.getProperty("java.io.tmpdir")
}
} else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '_' + index
storeLocation
}
private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
e match {
case e: CarbonDataLoadingException => throw e
case e: TextParsingException =>
LOGGER.error(e, "Data Loading failed for table " + model.getTableName)
throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName,
e)
case e: Exception =>
LOGGER.error(e, "Data Loading failed for table " + model.getTableName)
throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName,
e)
}
}
}