| |
| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tajo.storage.thirdparty.orc; |
| |
| import com.facebook.presto.orc.DiskRange; |
| import com.facebook.presto.orc.OrcDataSource; |
| import com.google.common.collect.ImmutableMap; |
| import io.airlift.slice.BasicSliceInput; |
| import io.airlift.slice.FixedLengthSliceInput; |
| import io.airlift.units.DataSize; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| |
| import java.io.IOException; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice; |
| import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges; |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| /** |
| * HDFS File data source class for Orc Reader |
| * |
| * Most of code is from Presto |
| */ |
| public class HdfsOrcDataSource |
| implements OrcDataSource |
| { |
| private final FSDataInputStream inputStream; |
| private final String path; |
| private final long size; |
| private final DataSize maxMergeDistance; |
| private final DataSize maxReadSize; |
| private long readTimeNanos; |
| |
| public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, |
| DataSize maxMergeDistance, DataSize maxReadSize) |
| { |
| this.path = checkNotNull(path, "path is null"); |
| this.inputStream = checkNotNull(inputStream, "inputStream is null"); |
| this.size = size; |
| checkArgument(size >= 0, "size is negative"); |
| |
| this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null"); |
| this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null"); |
| } |
| |
| @Override |
| public void close() |
| throws IOException |
| { |
| inputStream.close(); |
| } |
| |
| @Override |
| public long getReadTimeNanos() |
| { |
| return readTimeNanos; |
| } |
| |
| @Override |
| public long getSize() |
| { |
| return size; |
| } |
| |
| @Override |
| public void readFully(long position, byte[] buffer) |
| throws IOException |
| { |
| readFully(position, buffer, 0, buffer.length); |
| } |
| |
| @Override |
| public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) |
| throws IOException |
| { |
| long start = System.nanoTime(); |
| |
| inputStream.readFully(position, buffer, bufferOffset, bufferLength); |
| readTimeNanos += System.nanoTime() - start; |
| } |
| |
| @Override |
| public <K> Map<K, FixedLengthSliceInput> readFully(Map<K, DiskRange> diskRanges) |
| throws IOException |
| { |
| checkNotNull(diskRanges, "diskRanges is null"); |
| |
| if (diskRanges.isEmpty()) { |
| return ImmutableMap.of(); |
| } |
| |
| Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize); |
| |
| // read ranges |
| Map<DiskRange, byte[]> buffers = new LinkedHashMap<>(); |
| for (DiskRange mergedRange : mergedRanges) { |
| // read full range in one request |
| byte[] buffer = new byte[mergedRange.getLength()]; |
| readFully(mergedRange.getOffset(), buffer); |
| buffers.put(mergedRange, buffer); |
| } |
| |
| ImmutableMap.Builder<K, FixedLengthSliceInput> slices = ImmutableMap.builder(); |
| diskRanges.forEach((K key, DiskRange range) -> |
| slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers)))); |
| |
| return slices.build(); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return path; |
| } |
| } |
| |
| |