blob: 9a8657718ca2bc5b06ea528aeda40354c6df782e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.indexserver
import java.util.concurrent.Executors
import scala.collection.JavaConverters._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.Duration
import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
class DataMapRDDPartition(rddId: Int,
idx: Int,
val inputSplit: Seq[InputSplit],
location: Array[String])
extends Partition {
override def index: Int = idx
override def hashCode(): Int = 41 * (41 + rddId) + idx
def getLocations: Array[String] = {
location
}
}
private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkSession,
dataMapFormat: DistributableDataMapFormat)
extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) {
@transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
.getName)
var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(String, ExtendedBlockletWrapper)] = {
val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId,
id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
SparkEnv.get.blockManager.blockManagerId.executorId
}"
if (dataMapFormat.isJobToClearDataMaps) {
// if job is to clear datamaps just clear datamaps from cache and pass empty iterator
DataMapStoreManager.getInstance().clearInvalidDataMaps(dataMapFormat.getCarbonTable,
inputSplits.map(_
.asInstanceOf[DataMapDistributableWrapper].getDistributable.getSegment.getSegmentNo)
.toList.asJava,
dataMapFormat
.getDataMapToClear)
val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
CacheProvider.getInstance().getCarbonCache.getCurrentSize
} else {
0L
}
val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
SparkEnv.get.blockManager.blockManagerId.executorId
}"
Iterator((executorIP + "_" + cacheSize, new ExtendedBlockletWrapper()))
} else {
if (dataMapFormat.getInvalidSegments.size > 0) {
// clear the segmentMap and from cache in executor when there are invalid segments
DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
dataMapFormat.getInvalidSegments)
}
val startTime = System.currentTimeMillis()
val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning
dataMapFormat.createDataMapChooser()
val service = Executors
.newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true))
implicit val ec: ExecutionContextExecutor = ExecutionContext
.fromExecutor(service)
val futures = if (inputSplits.length <= numOfThreads) {
inputSplits.map {
split => generateFuture(Seq(split), attemptContext)
}
} else {
DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map {
splits => generateFuture(splits, attemptContext)
}
}
// scalastyle:off
val f = Await.result(Future.sequence(futures), Duration.Inf).flatten
// scalastyle:on
service.shutdownNow()
val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName)
LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " +
(System.currentTimeMillis() - startTime))
val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
CacheProvider.getInstance().getCarbonCache.getCurrentSize
} else {
0L
}
val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
SparkEnv.get.blockManager.blockManagerId.executorId
}"
val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList
.asJava, dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId,
dataMapFormat.isWriteToFile, dataMapFormat.isCountStarJob))
Iterator(value)
}
}
private def generateFuture(split: Seq[InputSplit], attemptContextImpl: TaskAttemptContextImpl)
(implicit executionContext: ExecutionContext): Future[Seq[ExtendedBlocklet]] = {
Future {
split.flatMap { inputSplit =>
val blocklets = new java.util.ArrayList[ExtendedBlocklet]()
val reader = dataMapFormat.createRecordReader(inputSplit, attemptContextImpl)
reader.initialize(inputSplit, attemptContextImpl)
while (reader.nextKeyValue()) {
blocklets.add(reader.getCurrentValue)
}
blocklets.asScala
}
}
}
override protected def getPreferredLocations(split: Partition): Seq[String] = {
if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
} else {
Seq()
}
}
override protected def internalGetPartitions: Array[Partition] = {
val job = Job.getInstance(FileFactory.getConfiguration)
val splits = dataMapFormat.getSplits(job).asScala
val isDistributedPruningEnabled = CarbonProperties.getInstance()
.isDistributedPruningEnabled(dataMapFormat.getCarbonTable.getDatabaseName,
dataMapFormat.getCarbonTable.getTableName)
if (!isDistributedPruningEnabled || dataMapFormat.isFallbackJob || splits.isEmpty) {
splits.zipWithIndex.map {
f => new DataMapRDDPartition(id, f._2, List(f._1), f._1.getLocations)
}.toArray
} else {
val executorsList: Map[String, Seq[String]] = DistributionUtil
.getExecutors(ss.sparkContext)
val (response, time) = CarbonScalaUtil.logTime {
DistributedRDDUtils.getExecutors(splits.toArray, executorsList, dataMapFormat
.getCarbonTable.getTableUniqueName, id)
}
LOGGER.debug(s"Time taken to assign executors to ${ splits.length } is $time ms")
response.toArray
}
}
}