blob: 184cc1d77a42dfaf0a8e7d4733bbd37ed0ce450d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.util
import java.util.Date
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.TaskContext
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.stats.QueryStatistic
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.StreamHandoffRDD
import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamOutputFormat}
import org.apache.carbondata.streaming.index.StreamFileIndex
import org.apache.carbondata.streaming.parser.CarbonStreamParser
import org.apache.carbondata.streaming.segment.StreamSegment
/**
* an implement of stream sink, it persist each batch to disk by appending the batch data to
* data files.
*/
class CarbonAppendableStreamSink(
sparkSession: SparkSession,
val carbonTable: CarbonTable,
var currentSegmentId: String,
parameters: Map[String, String],
carbonLoadModel: CarbonLoadModel,
server: Option[DictionaryServer],
operationContext: OperationContext) extends Sink {
private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
// prepare configuration
private val hadoopConf = {
val conf = sparkSession.sessionState.newHadoopConf()
// put all parameters into hadoopConf
parameters.foreach { entry =>
conf.set(entry._1, entry._2)
}
// properties below will be used for default CarbonStreamParser
val complexDelimiters = carbonLoadModel.getComplexDelimiters
conf.set("carbon_complex_delimiter_level_1", complexDelimiters.get(0))
conf.set("carbon_complex_delimiter_level_2", complexDelimiters.get(1))
conf.set("carbon_complex_delimiter_level_3", complexDelimiters.get(2))
conf.set(
DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
carbonLoadModel.getSerializationNullFormat().split(",")(1))
conf.set(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
carbonLoadModel.getTimestampformat())
conf.set(
CarbonCommonConstants.CARBON_DATE_FORMAT,
carbonLoadModel.getDateFormat())
conf
}
// segment max size(byte)
private val segmentMaxSize = hadoopConf.getLong(
CarbonCommonConstants.HANDOFF_SIZE,
CarbonProperties.getInstance().getHandoffSize
)
// auto handoff
private val enableAutoHandoff = hadoopConf.getBoolean(
CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
CarbonProperties.getInstance().isEnableAutoHandoff
)
// measure data type array
private lazy val msrDataTypes = {
carbonLoadModel
.getCarbonDataLoadSchema
.getCarbonTable
.getMeasures
.asScala
.map(_.getDataType)
.toArray
}
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
} else {
val statistic = new QueryStatistic()
// fire pre event on every batch add
// in case of streaming options and optionsFinal can be same
val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
carbonTable.getCarbonTableIdentifier,
carbonLoadModel)
OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
checkOrHandOffSegment()
// committer will record how this spark job commit its output
val committer = FileCommitProtocol.instantiate(
className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
jobId = batchId.toString,
outputPath = fileLogPath,
false)
committer match {
case manifestCommitter: ManifestFileCommitProtocol =>
manifestCommitter.setupManifestOptions(fileLog, batchId)
case _ => // Do nothing
}
CarbonAppendableStreamSink.writeDataFileJob(
sparkSession,
carbonTable,
batchId,
currentSegmentId,
data.queryExecution,
committer,
hadoopConf,
carbonLoadModel,
server,
msrDataTypes)
// fire post event on every batch add
val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
carbonTable.getCarbonTableIdentifier,
carbonLoadModel
)
OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis())
CarbonAppendableStreamSink.LOGGER.info(
s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}")
}
}
/**
* if the directory size of current segment beyond the threshold, hand off new segment
*/
private def checkOrHandOffSegment(): Unit = {
// get streaming segment, if not exists, create new streaming segment
val segmentId = StreamSegment.open(carbonTable)
if (segmentId.equals(currentSegmentId)) {
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
val fileType = FileFactory.getFileType(segmentDir)
if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
currentSegmentId = newSegmentId
val newSegmentDir =
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
FileFactory.mkdirs(newSegmentDir, fileType)
// trigger hand off operation
if (enableAutoHandoff) {
StreamHandoffRDD.startStreamingHandoffThread(
carbonLoadModel,
operationContext,
sparkSession,
false)
}
}
} else {
currentSegmentId = segmentId
val newSegmentDir =
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
val fileType = FileFactory.getFileType(newSegmentDir)
FileFactory.mkdirs(newSegmentDir, fileType)
}
}
}
object CarbonAppendableStreamSink {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
/**
* package the hadoop configuration and it will be passed to executor side from driver side
*/
case class WriteDataFileJobDescription(
serializableHadoopConf: SerializableConfiguration,
batchId: Long,
segmentId: String)
/**
* Run a spark job to append the newly arrived data to the existing row format
* file directly.
* If there are failure in the task, spark will re-try the task and
* carbon will do recovery by HDFS file truncate. (see StreamSegment.tryRecoverFromTaskFault)
* If there are job level failure, every files in the stream segment will do truncate
* if necessary. (see StreamSegment.tryRecoverFromJobFault)
*/
def writeDataFileJob(
sparkSession: SparkSession,
carbonTable: CarbonTable,
batchId: Long,
segmentId: String,
queryExecution: QueryExecution,
committer: FileCommitProtocol,
hadoopConf: Configuration,
carbonLoadModel: CarbonLoadModel,
server: Option[DictionaryServer],
msrDataTypes: Array[DataType]): Unit = {
// create job
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
val jobId = CarbonInputFormatUtil.getJobId(new Date, batchId.toInt)
job.setJobID(jobId)
val description = WriteDataFileJobDescription(
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
batchId,
segmentId
)
// run write data file job
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
var result: Array[(TaskCommitMessage, StreamFileIndex)] = null
try {
committer.setupJob(job)
// initialize dictionary server
if (server.isDefined) {
server.get.initializeDictionaryGenerator(carbonTable)
}
val rowSchema = queryExecution.analyzed.schema
// write data file
result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
(taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
writeDataFileTask(
description,
carbonLoadModel,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.attemptNumber(),
committer,
iterator,
rowSchema
)
})
// write dictionary
if (server.isDefined) {
try {
server.get.writeTableDictionary(carbonTable.getCarbonTableIdentifier.getTableId)
} catch {
case _: Exception =>
LOGGER.error(
s"Error while writing dictionary file for ${carbonTable.getTableUniqueName}")
throw new Exception(
"Streaming ingest failed due to error while writing dictionary file")
}
}
// update data file info in index file
StreamSegment.updateIndexFile(
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId),
result.map(_._2), msrDataTypes)
} catch {
// catch fault of executor side
case t: Throwable =>
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
StreamSegment.recoverSegmentIfRequired(segmentDir)
LOGGER.error(s"Aborting job ${ job.getJobID }.", t)
committer.abortJob(job)
throw new CarbonStreamException("Job failed to write data file", t)
}
committer.commitJob(job, result.map(_._1))
LOGGER.info(s"Job ${ job.getJobID } committed.")
}
}
/**
* execute a task for each partition to write a data file
*/
def writeDataFileTask(
description: WriteDataFileJobDescription,
carbonLoadModel: CarbonLoadModel,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
rowSchema: StructType): (TaskCommitMessage, StreamFileIndex) = {
val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
// Set up the attempt context required to use in the output committer.
val taskAttemptContext: TaskAttemptContext = {
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId)
hadoopConf.set("mapred.job.id", jobId.toString)
hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapred.task.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapred.task.is.map", true)
hadoopConf.setInt("mapred.task.partition", 0)
new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}
committer.setupTask(taskAttemptContext)
try {
var blockIndex: StreamFileIndex = null
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
val parserName = taskAttemptContext.getConfiguration.get(
CarbonStreamParser.CARBON_STREAM_PARSER,
CarbonStreamParser.CARBON_STREAM_PARSER_DEFAULT)
val streamParser =
Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
blockIndex = StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
taskAttemptContext, carbonLoadModel)
})(catchBlock = {
committer.abortTask(taskAttemptContext)
LOGGER.error(s"Job $jobId aborted.")
})
(committer.commitTask(taskAttemptContext), blockIndex)
} catch {
case t: Throwable =>
throw new CarbonStreamException("Task failed while writing rows", t)
}
}
/**
* convert spark iterator to carbon iterator, so that java module can use it.
*/
class InputIterator(rddIter: Iterator[InternalRow], streamParser: CarbonStreamParser)
extends CarbonIterator[Array[Object]] {
override def hasNext: Boolean = rddIter.hasNext
override def next: Array[Object] = {
streamParser.parserRow(rddIter.next())
}
override def close(): Unit = {
streamParser.close()
}
}
}