blob: ea32bdf3d02d0d96e0e239664e280ae3abac2c6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.spark.Partition
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.DataTypeConverter
import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
import org.apache.carbondata.spark.InitInputMetrics
/**
* It can get the deleted/updated records on any particular update version. It is useful to get the
* records changed on any particular update transaction.
*/
class CarbonDeltaRowScanRDD[T: ClassTag](
@transient private val spark: SparkSession,
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
@transient override val partitionNames: Seq[PartitionSpec],
override val columnProjection: CarbonProjection,
var filter: DataMapFilter,
identifier: AbsoluteTableIdentifier,
inputMetricsStats: InitInputMetrics,
override val dataTypeConverterClz: Class[_ <: DataTypeConverter] =
classOf[SparkDataTypeConverterImpl],
override val readSupportClz: Class[_ <: CarbonReadSupport[_]] =
SparkReadSupport.readSupportClass,
deltaVersionToRead: String) extends
CarbonScanRDD[T](
spark,
columnProjection,
filter,
identifier,
serializedTableInfo,
tableInfo,
inputMetricsStats,
partitionNames,
dataTypeConverterClz,
readSupportClz) {
override def internalGetPartitions: Array[Partition] = {
val table = CarbonTable.buildFromTableInfo(getTableInfo)
val updateStatusManager = new SegmentUpdateStatusManager(table, deltaVersionToRead)
val parts = super.internalGetPartitions
parts.map { p =>
val partition = p.asInstanceOf[CarbonSparkPartition]
val splits = partition.multiBlockSplit.getAllSplits.asScala.filter { s =>
updateStatusManager.getDetailsForABlock(
CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId, s.getBlockPath)) != null
}.asJava
new CarbonSparkPartition(partition.rddId, partition.index,
new CarbonMultiBlockSplit(splits, partition.multiBlockSplit.getLocations))
}.filter(p => p.multiBlockSplit.getAllSplits.size() > 0).asInstanceOf[Array[Partition]]
}
override def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
val format = super.createInputFormat(conf)
conf.set("updateDeltaVersion", deltaVersionToRead)
conf.set("readDeltaOnly", "true")
format
}
}