blob: 6b7f086678e9389bfc3128a351246d411ad214ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.input
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.conf.{Configurable => HConfigurable, Configuration}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.{CombineFileRecordReader, CombineFileSplit}
/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
*/
private[spark] trait Configurable extends HConfigurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
}
/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[Text, Text] with Configurable {
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
// True means the current file has been processed, then skip it.
private[this] var processed = false
private[this] val key: Text = new Text(path.toString)
private[this] var value: Text = null
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
override def close(): Unit = {}
override def getProgress: Float = if (processed) 1.0f else 0.0f
override def getCurrentKey: Text = key
override def getCurrentValue: Text = value
override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = new Configuration
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
val innerBuffer = if (codec != null) {
ByteStreams.toByteArray(codec.createInputStream(fileIn))
} else {
ByteStreams.toByteArray(fileIn)
}
value = new Text(innerBuffer)
Closeables.close(fileIn, false)
processed = true
true
} else {
false
}
}
}
/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
* that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
* RecordReaders.
*/
private[spark] class ConfigurableCombineFileRecordReader[K, V](
split: InputSplit,
context: TaskAttemptContext,
recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
extends CombineFileRecordReader[K, V](
split.asInstanceOf[CombineFileSplit],
context,
recordReaderClass
) with Configurable {
override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
}
r
}
}