blob: 933ec154494438df090fdf3a6d0ec830bd5a2e27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.indexserver
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.Partition
import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
object DistributedRDDUtils {
// Segment number to executorNode mapping
val tableToExecutorMapping: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] =
new ConcurrentHashMap[String, ConcurrentHashMap[String, String]]()
// executorNode to segmentSize mapping
val executorToCacheSizeMapping: ConcurrentHashMap[String, ConcurrentHashMap[String, Long]] =
new ConcurrentHashMap[String, ConcurrentHashMap[String, Long]]()
def getExecutors(segment: Array[InputSplit], executorsList : Map[String, Seq[String]],
tableUniqueName: String, rddId: Int): Seq[Partition] = {
// sort the partitions in increasing order of index size.
val (segments, legacySegments) = segment.span(split => split
.asInstanceOf[DataMapDistributableWrapper].getDistributable.getSegment.getIndexSize > 0)
val sortedPartitions = segments.sortWith(_.asInstanceOf[DataMapDistributableWrapper]
.getDistributable.getSegment.getIndexSize >
_.asInstanceOf[DataMapDistributableWrapper]
.getDistributable.getSegment.getIndexSize)
val executorCache = DistributedRDDUtils.executorToCacheSizeMapping
// check if any executor is dead.
val invalidHosts = executorCache.keySet().asScala.diff(executorsList.keySet)
if (invalidHosts.nonEmpty) {
// extract the dead executor host name
DistributedRDDUtils.invalidateHosts(invalidHosts.toSeq)
}
val invalidExecutorIds = executorsList.collect {
case (host, executors) if executorCache.get(host) != null =>
val toBeRemovedExecutors = executorCache.get(host).keySet().asScala.diff(executors.toSet)
if (executors.size == toBeRemovedExecutors.size) {
DistributedRDDUtils.invalidateHosts(Seq(host))
Seq()
} else {
toBeRemovedExecutors.map(executor => host + "_" + executor)
}
}.flatten
if (invalidExecutorIds.nonEmpty) {
DistributedRDDUtils.invalidateExecutors(invalidExecutorIds.toSeq)
}
val groupedPartitions = convertToPartition(sortedPartitions,
legacySegments,
tableUniqueName,
executorsList).groupBy {
partition =>
partition.getLocations.head
}
groupedPartitions.zipWithIndex.map {
case ((location, splitList), index) =>
new DataMapRDDPartition(rddId,
index, splitList,
Array(location))
}.toArray.sortBy(_.index)
}
private def convertToPartition(segments: Seq[InputSplit], legacySegments: Seq[InputSplit],
tableUniqueName: String,
executorList: Map[String, Seq[String]]): Seq[InputSplit] = {
if (legacySegments.nonEmpty) {
val validExecutorIds = executorList.flatMap {
case (host, executors) => executors.map {
executor => s"${host}_$executor"
}
}.toSeq
legacySegments.zipWithIndex.map {
case (legacySegment, index) =>
val wrapper: DataMapDistributable = legacySegment
.asInstanceOf[DataMapDistributableWrapper].getDistributable
val executor = validExecutorIds(index % validExecutorIds.length)
wrapper.setLocations(Array("executor_" + executor))
legacySegment
}
} else { Seq() } ++ segments.map { partition =>
val wrapper: DataMapDistributable = partition.asInstanceOf[DataMapDistributableWrapper]
.getDistributable
wrapper.setLocations(Array(DistributedRDDUtils
.assignExecutor(tableUniqueName, wrapper.getSegment, executorList)))
partition
}
}
/**
* Update the cache size returned by the executors to the driver mapping.
*/
def updateExecutorCacheSize(cacheSizes: Set[String]): Unit = {
synchronized {
cacheSizes.foreach {
executorCacheSize =>
// executorCacheSize would be in the form of 127.0.0.1_10024 where the left of '_'
// would be the executor IP and the right would be the cache that executor is holding.
val hostAndExecutor = executorCacheSize.substring(0,
executorCacheSize.lastIndexOf('_'))
val (host, executor) = (hostAndExecutor
.substring(0, hostAndExecutor.lastIndexOf('_')), hostAndExecutor
.substring(hostAndExecutor.lastIndexOf('_') + 1, hostAndExecutor.length))
val size = executorCacheSize.substring(executorCacheSize.lastIndexOf('_') + 1,
executorCacheSize.length)
val executorMapping = executorToCacheSizeMapping.get(host)
if (executorMapping != null) {
executorMapping.put(executor, size.toLong)
executorToCacheSizeMapping.put(host, executorMapping)
}
}
}
}
/**
* Remove the invalid segment mapping from index server when segments are compacted or become
* invalid.
*/
def invalidateSegmentMapping(tableUniqueName: String,
invalidSegmentList: Seq[String]): Unit = {
synchronized {
if (tableToExecutorMapping.get(tableUniqueName) != null) {
invalidSegmentList.foreach {
invalidSegment => tableToExecutorMapping.get(tableUniqueName).remove(invalidSegment)
}
if (tableToExecutorMapping.get(tableUniqueName).isEmpty) {
invalidateTableMapping(tableUniqueName)
}
}
}
}
/**
* Remove the table mapping from index server when the table is dropped.
*/
def invalidateTableMapping(tableUniqueName: String): Unit = {
synchronized {
tableToExecutorMapping.remove(tableUniqueName)
}
}
/**
* Invalidate the dead executors from the mapping and assign the segments to some other
* executor, so that the query can load the segments to the new assigned executor.
*/
def invalidateHosts(invalidHosts: Seq[String]): Unit = {
synchronized {
val validInvalidExecutors: Map[String, String] = invalidHosts.flatMap {
host =>
val invalidExecutorToSizeMapping = executorToCacheSizeMapping.remove(host)
invalidExecutorToSizeMapping.asScala.map {
case (invalidExecutor, size) =>
getLeastLoadedExecutor match {
case Some((reassignedHost, reassignedExecutorId)) =>
val existingExecutorMapping = executorToCacheSizeMapping.get(reassignedHost)
if (existingExecutorMapping != null) {
val existingSize = existingExecutorMapping.get(reassignedExecutorId)
existingExecutorMapping.put(reassignedExecutorId, existingSize + size)
} else {
existingExecutorMapping.put(reassignedExecutorId, size)
}
executorToCacheSizeMapping.put(reassignedHost, existingExecutorMapping)
s"${host}_$invalidExecutor" -> s"${ reassignedHost }_$reassignedExecutorId"
case None => "" -> ""
}
}
}.toMap
updateTableMappingForInvalidExecutors(validInvalidExecutors)
}
}
private def updateTableMappingForInvalidExecutors(validInvalidExecutors: Map[String, String]) {
// remove all invalidExecutor mapping from cache.
for ((tableName: String, segmentToExecutorMapping) <- tableToExecutorMapping.asScala) {
// find the invalid executor in cache.
val newSegmentToExecutorMapping = new ConcurrentHashMap[String, String]()
val existingMapping = tableToExecutorMapping.get(tableName)
segmentToExecutorMapping.asScala.collect {
case (segmentNumber, executorUniqueName) if validInvalidExecutors
.contains(executorUniqueName) =>
val newExecutorId = validInvalidExecutors(executorUniqueName)
// remove mapping for the invalid executor.
val executorIdSplits = newExecutorId.split("_")
val (host, executorId) = (executorIdSplits(0), executorIdSplits(1))
// find a new executor for the segment
newSegmentToExecutorMapping
.put(segmentNumber, s"${ host }_$executorId")
}
existingMapping.putAll(newSegmentToExecutorMapping)
tableToExecutorMapping.put(tableName, existingMapping)
}
}
def invalidateExecutors(invalidExecutors: Seq[String]): Unit = {
synchronized {
val validInvalidExecutors: Map[String, String] = invalidExecutors.map {
invalidExecutor =>
val executorIdSplits = invalidExecutor.split("_")
val (host, executor) = (executorIdSplits(0), executorIdSplits(1))
val invalidExecutorSize = executorToCacheSizeMapping.get(host).remove(executor)
getLeastLoadedExecutor match {
case Some((reassignedHost, reassignedExecutorId)) =>
val existingExecutorMapping = executorToCacheSizeMapping.get(reassignedHost)
if (existingExecutorMapping != null) {
val existingSize = existingExecutorMapping.get(reassignedExecutorId)
existingExecutorMapping
.put(reassignedExecutorId, existingSize + invalidExecutorSize)
} else {
existingExecutorMapping.put(reassignedExecutorId, invalidExecutorSize)
}
executorToCacheSizeMapping.put(reassignedHost, existingExecutorMapping)
invalidExecutor -> s"${ reassignedHost }_$reassignedExecutorId"
case None => "" -> ""
}
}.toMap
updateTableMappingForInvalidExecutors(validInvalidExecutors)
}
}
/**
* Sorts the executor cache based on the size each one is handling and returns the least of them.
*
* @return
*/
private def getLeastLoadedExecutor: Option[(String, String)] = {
val leastHostExecutor = executorToCacheSizeMapping.asScala.flatMap {
case (host, executorToCacheMap) =>
executorToCacheMap.asScala.map {
case (executor, size) =>
(host, executor, size)
}
}.toSeq.sortWith(_._3 < _._3).toList
leastHostExecutor match {
case head :: _ =>
Some(head._1, head._2)
case _ => None
}
}
private def checkForUnassignedExecutors(validExecutorIds: Seq[String]): Option[String] = {
val usedExecutorIds = executorToCacheSizeMapping.asScala.flatMap {
case (host, executorMap) =>
executorMap.keySet().asScala.map {
executor => s"${ host }_$executor"
}
}
val unassignedExecutor = validExecutorIds.diff(usedExecutorIds.toSeq)
unassignedExecutor.headOption
}
/**
* Assign a executor for the current segment. If a executor was previously assigned to the
* segment then the same would be returned.
*
* @return
*/
def assignExecutor(tableUniqueName: String,
segment: Segment,
validExecutors: Map[String, Seq[String]]): String = {
val segmentMapping = tableToExecutorMapping.get(tableUniqueName)
lazy val executor = segmentMapping.get(segment.getSegmentNo)
if (segmentMapping != null && executor != null) {
s"executor_$executor"
} else {
// check if any executor is not assigned. If yes then give priority to that executor
// otherwise get the executor which has handled the least size.
val validExecutorIds = validExecutors.flatMap {
case (host, executors) => executors.map {
executor => s"${host}_$executor"
}
}.toSeq
val unassignedExecutor = checkForUnassignedExecutors(validExecutorIds)
val (newHost, newExecutor) = if (unassignedExecutor.nonEmpty) {
val freeExecutor = unassignedExecutor.get.split("_")
(freeExecutor(0), freeExecutor(1))
} else {
getLeastLoadedExecutor match {
case Some((host, executorID)) => (host, executorID)
case None => throw new RuntimeException("Could not find any alive executors.")
}
}
val existingExecutorMapping = executorToCacheSizeMapping.get(newHost)
if (existingExecutorMapping != null) {
val existingSize = existingExecutorMapping.get(newExecutor)
if (existingSize != null) {
existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize
.toInt)
} else {
existingExecutorMapping.put(newExecutor, segment.getIndexSize
.toInt)
}
} else {
val newExecutorMapping = new ConcurrentHashMap[String, Long]()
newExecutorMapping.put(newExecutor, segment.getIndexSize)
executorToCacheSizeMapping.put(newHost, newExecutorMapping)
}
val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
if (existingSegmentMapping == null) {
val newSegmentMapping = new ConcurrentHashMap[String, String]()
newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
} else {
existingSegmentMapping.putIfAbsent(segment.getSegmentNo, s"${newHost}_$newExecutor")
tableToExecutorMapping.putIfAbsent(tableUniqueName, existingSegmentMapping)
}
s"executor_${newHost}_$newExecutor"
}
}
}