blob: e4faaa201d4ed2cd724ed00904ce0588d093594e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.utils
import org.apache.druid.java.util.common.{FileUtils, ISE, StringUtils}
import org.apache.druid.segment.{IndexIO, QueryableIndexSegment, Segment, SegmentLazyLoadFailCallback}
import org.apache.druid.segment.loading.SegmentLoader
import org.apache.druid.spark.mixins.Logging
import org.apache.druid.spark.registries.SegmentReaderRegistry
import org.apache.druid.timeline.DataSegment
import org.apache.hadoop.conf.{Configuration => HConf}
import java.io.File
import scala.collection.mutable.{HashSet => MHashSet}
/**
* A SegmentLoader to manage loading segment files for a partition reader. For now, a segment loader is created per
* PartitionReader, which means it will only ever load a single segment. This is slightly wasteful, but avoids needing
* to manage the SegmentLoader's life cycle outside of a PartitionReader. If the input partition planning logic ever
* gets smarter than just assigning each segment to a partition, this design decision should be revisited.
*/
class PartitionReaderSegmentLoader(
val tmpDir: File,
val hadoopConf: HConf,
val indexIO: IndexIO
) extends SegmentLoader with Logging {
private val loadedSegments = new MHashSet[DataSegment]
override def isSegmentLoaded(segment: DataSegment): Boolean = loadedSegments.contains(segment)
override def getSegment(segment: DataSegment, `lazy`: Boolean, loadFailed: SegmentLazyLoadFailCallback): Segment = {
val segmentFile = getSegmentFiles(segment)
val index = indexIO.loadIndex(segmentFile, `lazy`, loadFailed)
logInfo(s"Loaded segment [${segment.getId}].")
new QueryableIndexSegment(index, segment.getId)
}
override def getSegmentFiles(segment: DataSegment): File = {
val segmentDir = new File(tmpDir, segment.getId.toString)
if (!segmentDir.exists) {
logInfo(
StringUtils.format(
"Fetching segment [%s] to [%s].", segment.getId, segmentDir
)
)
if (!segmentDir.mkdir) throw new ISE("Failed to make directory[%s]", segmentDir)
SegmentReaderRegistry.load(segment.getLoadSpec, segmentDir, hadoopConf)
loadedSegments += segment
}
segmentDir
}
override def cleanup(segment: DataSegment): Unit = {
if (isSegmentLoaded(segment)) {
loadedSegments -= segment
FileUtils.deleteDirectory(new File(tmpDir, segment.getId.toString))
}
}
}