blob: 3afdc7e926a2f65841780100db2ff1b8d82ed5dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.mapred;
import static java.lang.Boolean.TRUE;
import static java.util.Arrays.asList;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.parquet.hadoop.Footer;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.ParquetRecordReader;
public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
protected ParquetInputFormat<V> realInputFormat = new ParquetInputFormat<V>();
@Override
public RecordReader<Void, Container<V>> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
return new RecordReaderWrapper<V>(split, job, reporter);
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
if (isTaskSideMetaData(job)) {
return super.getSplits(job, numSplits);
}
List<Footer> footers = getFooters(job);
List<ParquetInputSplit> splits = realInputFormat.getSplits(job, footers);
if (splits == null) {
return null;
}
InputSplit[] resultSplits = new InputSplit[splits.size()];
int i = 0;
for (ParquetInputSplit split : splits) {
resultSplits[i++] = new ParquetInputSplitWrapper(split);
}
return resultSplits;
}
public List<Footer> getFooters(JobConf job) throws IOException {
return realInputFormat.getFooters(job, asList(super.listStatus(job)));
}
private static class RecordReaderWrapper<V> implements RecordReader<Void, Container<V>> {
private ParquetRecordReader<V> realReader;
private long splitLen; // for getPos()
private Container<V> valueContainer = null;
private boolean firstRecord = false;
private boolean eof = false;
public RecordReaderWrapper(
InputSplit oldSplit, JobConf oldJobConf, Reporter reporter)
throws IOException {
splitLen = oldSplit.getLength();
try {
realReader = new ParquetRecordReader<V>(
ParquetInputFormat.<V>getReadSupportInstance(oldJobConf),
ParquetInputFormat.getFilter(oldJobConf));
if (oldSplit instanceof ParquetInputSplitWrapper) {
realReader.initialize(((ParquetInputSplitWrapper) oldSplit).realSplit, oldJobConf, reporter);
} else if (oldSplit instanceof FileSplit) {
realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
} else {
throw new IllegalArgumentException(
"Invalid split (not a FileSplit or ParquetInputSplitWrapper): " + oldSplit);
}
// read once to gain access to key and value objects
if (realReader.nextKeyValue()) {
firstRecord = true;
valueContainer = new Container<V>();
valueContainer.set(realReader.getCurrentValue());
} else {
eof = true;
}
} catch (InterruptedException e) {
Thread.interrupted();
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
realReader.close();
}
@Override
public Void createKey() {
return null;
}
@Override
public Container<V> createValue() {
return valueContainer;
}
@Override
public long getPos() throws IOException {
return (long) (splitLen * getProgress());
}
@Override
public float getProgress() throws IOException {
try {
return realReader.getProgress();
} catch (InterruptedException e) {
Thread.interrupted();
throw new IOException(e);
}
}
@Override
public boolean next(Void key, Container<V> value) throws IOException {
if (eof) {
return false;
}
if (firstRecord) { // key & value are already read.
value.set(valueContainer.get());
firstRecord = false;
return true;
}
try {
if (realReader.nextKeyValue()) {
if (value != null) value.set(realReader.getCurrentValue());
return true;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
eof = true; // strictly not required, just for consistency
return false;
}
}
public static boolean isTaskSideMetaData(JobConf job) {
return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, TRUE);
}
private static class ParquetInputSplitWrapper implements InputSplit {
ParquetInputSplit realSplit;
@SuppressWarnings("unused") // MapReduce instantiates this.
public ParquetInputSplitWrapper() {}
public ParquetInputSplitWrapper(ParquetInputSplit realSplit) {
this.realSplit = realSplit;
}
@Override
public long getLength() throws IOException {
return realSplit.getLength();
}
@Override
public String[] getLocations() throws IOException {
return realSplit.getLocations();
}
@Override
public void readFields(DataInput in) throws IOException {
realSplit = new ParquetInputSplit();
realSplit.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
realSplit.write(out);
}
}
}