blob: 2f22b0bfc17faefec09947a597cd81aa603f4cf5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.spark.rdd.CarbonRDD
case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
class DataLoadCoalescedRDD[T: ClassTag](
@transient private val sparkSession: SparkSession,
@transient private var prev: RDD[T],
nodeList: Array[String])
extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) {
override def internalGetPartitions: Array[Partition] = {
new DataLoadPartitionCoalescer(prev, nodeList).run
}
override def internalCompute(split: Partition,
context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
new Iterator[DataLoadPartitionWrap[T]] {
val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
def hasNext = iter.hasNext
def next: DataLoadPartitionWrap[T] = {
DataLoadPartitionWrap(firstParent[T], iter.next())
}
}
}
override def getDependencies: Seq[Dependency[_]] = {
Seq(new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
})
}
override def clearDependencies() {
super.clearDependencies()
prev = null
}
/**
* Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
* then the preferred machine will be one which most parent splits prefer too.
* @param partition
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}