blob: 96c33c08d387ca2b6e64206daf5f9a1ae9f2b750 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.indexserver
import scala.collection.JavaConverters._
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
import org.apache.carbondata.core.indexstore.SegmentWrapper
import org.apache.carbondata.spark.rdd.CarbonRDD
class SegmentPruneRDD(@transient private val ss: SparkSession,
dataMapFormat: DistributableDataMapFormat)
extends CarbonRDD[(String, SegmentWrapper)](ss, Nil) {
override protected def internalGetPartitions: Array[Partition] = {
new DistributedPruneRDD(ss, dataMapFormat).partitions
}
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(String, SegmentWrapper)] = {
val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
val segments = inputSplits.map(_
.asInstanceOf[DataMapDistributableWrapper].getDistributable.getSegment)
segments.foreach(_.setReadCommittedScope(dataMapFormat.getReadCommittedScope))
if (dataMapFormat.getInvalidSegments.size > 0) {
// clear the segmentMap and from cache in executor when there are invalid segments
DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
dataMapFormat.getInvalidSegments)
}
val blockletMap = DataMapStoreManager.getInstance
.getDefaultDataMap(dataMapFormat.getCarbonTable)
val prunedSegments = blockletMap
.pruneSegments(segments.toList.asJava, dataMapFormat.getFilterResolverIntf)
val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
SparkEnv.get.blockManager.blockManagerId.executorId
}"
val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
CacheProvider.getInstance().getCarbonCache.getCurrentSize
} else {
0L
}
val value = (executorIP + "_" + cacheSize.toString, new SegmentWrapper(prunedSegments))
Iterator(value)
}
}