blob: c8b80aff56ee66cde6e291a266cd7747d101ccfc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.indexserver
import java.util.concurrent.Executors
import scala.collection.JavaConverters._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.Duration
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapreduce.{InputSplit, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
import org.apache.carbondata.spark.rdd.CarbonRDD
/**
* An RDD which will get the count for the table.
*/
class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: DistributableDataMapFormat)
extends CarbonRDD[(String, String)](ss, Nil) {
@transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
.getName)
override protected def getPreferredLocations(split: Partition): Seq[String] = {
if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
} else {
Seq()
}
}
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(String, String)] = {
val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId,
id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning
val service = Executors
.newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true))
implicit val ec: ExecutionContextExecutor = ExecutionContext
.fromExecutor(service)
if (dataMapFormat.ifAsyncCall()) {
// to clear cache of invalid segments during pre-priming in index server
DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
dataMapFormat.getInvalidSegments)
}
val futures = if (inputSplits.length <= numOfThreads) {
inputSplits.map {
split => generateFuture(Seq(split))
}
} else {
DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map {
splits => generateFuture(splits)
}
}
// scalastyle:off awaitresult
val results = Await.result(Future.sequence(futures), Duration.Inf).flatten
// scalastyle:on awaitresult
val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
SparkEnv.get.blockManager.blockManagerId.executorId
}"
val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
CacheProvider.getInstance().getCarbonCache.getCurrentSize
} else {
0L
}
Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString))
}
override protected def internalGetPartitions: Array[Partition] = {
new DistributedPruneRDD(ss, dataMapFormat).partitions
}
private def generateFuture(split: Seq[InputSplit])
(implicit executionContext: ExecutionContext) = {
Future {
val segments = split.map { inputSplit =>
val distributable = inputSplit.asInstanceOf[DataMapDistributableWrapper]
distributable.getDistributable.getSegment
.setReadCommittedScope(dataMapFormat.getReadCommittedScope)
distributable.getDistributable.getSegment
}
val defaultDataMap = DataMapStoreManager.getInstance
.getDataMap(dataMapFormat.getCarbonTable, split.head
.asInstanceOf[DataMapDistributableWrapper].getDistributable.getDataMapSchema)
defaultDataMap.getBlockRowCount(segments.toList.asJava, dataMapFormat
.getPartitions, defaultDataMap).asScala
}
}
}