blob: 5357f51a091b36be2ff882b66a8823688c1d9155 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.thirdparty.orc;
import com.facebook.presto.orc.DiskRange;
import com.facebook.presto.orc.OrcDataSource;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.BasicSliceInput;
import io.airlift.slice.FixedLengthSliceInput;
import io.airlift.units.DataSize;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* HDFS File data source class for Orc Reader
*
* Most of code is from Presto
*/
public class HdfsOrcDataSource
implements OrcDataSource
{
private final FSDataInputStream inputStream;
private final String path;
private final long size;
private final DataSize maxMergeDistance;
private final DataSize maxReadSize;
private long readTimeNanos;
public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size,
DataSize maxMergeDistance, DataSize maxReadSize)
{
this.path = checkNotNull(path, "path is null");
this.inputStream = checkNotNull(inputStream, "inputStream is null");
this.size = size;
checkArgument(size >= 0, "size is negative");
this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null");
}
@Override
public void close()
throws IOException
{
inputStream.close();
}
@Override
public long getReadTimeNanos()
{
return readTimeNanos;
}
@Override
public long getSize()
{
return size;
}
@Override
public void readFully(long position, byte[] buffer)
throws IOException
{
readFully(position, buffer, 0, buffer.length);
}
@Override
public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException
{
long start = System.nanoTime();
inputStream.readFully(position, buffer, bufferOffset, bufferLength);
readTimeNanos += System.nanoTime() - start;
}
@Override
public <K> Map<K, FixedLengthSliceInput> readFully(Map<K, DiskRange> diskRanges)
throws IOException
{
checkNotNull(diskRanges, "diskRanges is null");
if (diskRanges.isEmpty()) {
return ImmutableMap.of();
}
Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize);
// read ranges
Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
for (DiskRange mergedRange : mergedRanges) {
// read full range in one request
byte[] buffer = new byte[mergedRange.getLength()];
readFully(mergedRange.getOffset(), buffer);
buffers.put(mergedRange, buffer);
}
ImmutableMap.Builder<K, FixedLengthSliceInput> slices = ImmutableMap.builder();
diskRanges.forEach((K key, DiskRange range) ->
slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers))));
return slices.build();
}
@Override
public String toString()
{
return path;
}
}