blob: a0baad09362b5fb4b4055147901432fb85e4b8f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import scala.collection.JavaConverters._
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.Partition
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.CarbonMergerMapping
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.CarbonInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.spark.MergeResult
/**
* IUD carbon merger RDD
* */
class CarbonIUDMergerRDD[K, V](
@transient private val ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping)
extends CarbonMergerRDD[K, V](ss,
result,
carbonLoadModel,
carbonMergerMapping) {
override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
val defaultParallelism = sparkContext.defaultParallelism
val noOfBlocks = 0
CarbonInputFormat.setSegmentsToAccess(
job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
CarbonInputFormat.setTableInfo(
job.getConfiguration,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
// get splits
val splits = format.getSplits(job)
val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
// group blocks by segment.
val splitsGroupedMySegment =
carbonInputSplits.groupBy(_.getSegmentId)
var i = -1
// No need to get a new SegmentUpdateStatus Manager as the Object is passed
// in CarbonLoadModel.
// val manager = new SegmentUpdateStatusManager(absoluteTableIdentifier)
val updateStatusManager = carbonLoadModel.getSegmentUpdateStatusManager
// make one spark partition for one segment
val resultSplits = splitsGroupedMySegment.map { entry =>
val (segName, splits) = (entry._1, entry._2)
val validSplits = splits.filter { inputSplit =>
CarbonDataMergerUtil
.checkUpdateDeltaMatchBlock(segName, inputSplit.getBlockPath, updateStatusManager)
}
if (validSplits.nonEmpty) {
val locations = validSplits.head.getLocations
i += 1
new CarbonSparkPartition(id, i,
new CarbonMultiBlockSplit(validSplits.asJava, locations))
} else {
null
}
}.filter( _ != null)
// max segment cardinality is calculated in executor for each segment
carbonMergerMapping.maxSegmentColumnSchemaList = null
// Log the distribution
val noOfTasks = resultSplits.size
logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
+ s"parallelism: $defaultParallelism , no.of.nodes: unknown, no.of.tasks: $noOfTasks"
)
logInfo("Time taken to identify Blocks to scan : " + (System
.currentTimeMillis() - startTime)
)
resultSplits.foreach { partition =>
logInfo(s"Node : " + partition.multiBlockSplit.getLocations.toSeq.mkString(",")
+ ", No.Of Blocks : " + partition.multiBlockSplit.getLength
)
}
resultSplits.toArray
}
}