| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.orc.impl; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.ByteBuffer; |
| import java.security.InvalidAlgorithmParameterException; |
| import java.security.InvalidKeyException; |
| import java.security.Key; |
| import java.util.function.Consumer; |
| |
| import org.apache.hadoop.hive.common.io.DiskRangeList; |
| import org.apache.orc.CompressionCodec; |
| import org.apache.orc.EncryptionAlgorithm; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.protobuf.CodedInputStream; |
| |
| import javax.crypto.Cipher; |
| import javax.crypto.ShortBufferException; |
| import javax.crypto.spec.IvParameterSpec; |
| |
| public abstract class InStream extends InputStream { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(InStream.class); |
| public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB |
| |
| protected final Object name; |
| protected final long offset; |
| protected final long length; |
| protected DiskRangeList bytes; |
| // position in the stream (0..length) |
| protected long position; |
| |
| public InStream(Object name, long offset, long length) { |
| this.name = name; |
| this.offset = offset; |
| this.length = length; |
| } |
| |
| @Override |
| public String toString() { |
| return name.toString(); |
| } |
| |
| @Override |
| public abstract void close(); |
| |
| /** |
| * Set the current range |
| * @param newRange the block that is current |
| * @param isJump if this was a seek instead of a natural read |
| */ |
| abstract protected void setCurrent(DiskRangeList newRange, |
| boolean isJump); |
| /** |
| * Reset the input to a new set of data. |
| * @param input the input data |
| */ |
| protected void reset(DiskRangeList input) { |
| bytes = input; |
| while (input != null && |
| (input.getEnd() <= offset || |
| input.getOffset() > offset + length)) { |
| input = input.next; |
| } |
| if (input == null || input.getOffset() <= offset) { |
| position = 0; |
| } else { |
| position = input.getOffset() - offset; |
| } |
| setCurrent(input, true); |
| } |
| |
| public abstract void changeIv(Consumer<byte[]> modifier); |
| |
| static int getRangeNumber(DiskRangeList list, DiskRangeList current) { |
| int result = 0; |
| DiskRangeList range = list; |
| while (range != null && range != current) { |
| result += 1; |
| range = range.next; |
| } |
| return result; |
| } |
| |
| /** |
| * Implements a stream over an uncompressed stream. |
| */ |
| public static class UncompressedStream extends InStream { |
| protected ByteBuffer decrypted; |
| protected DiskRangeList currentRange; |
| protected long currentOffset; |
| |
| /** |
| * Create the stream without calling reset on it. |
| * This is used for the subclass that needs to do more setup. |
| * @param name name of the stream |
| * @param length the number of bytes for the stream |
| */ |
| public UncompressedStream(Object name, long offset, long length) { |
| super(name, offset, length); |
| } |
| |
| public UncompressedStream(Object name, |
| DiskRangeList input, |
| long offset, |
| long length) { |
| super(name, offset, length); |
| reset(input); |
| } |
| |
| @Override |
| public int read() { |
| if (decrypted == null || decrypted.remaining() == 0) { |
| if (position == length) { |
| return -1; |
| } |
| setCurrent(currentRange.next, false); |
| } |
| position += 1; |
| return 0xff & decrypted.get(); |
| } |
| |
| @Override |
| protected void setCurrent(DiskRangeList newRange, boolean isJump) { |
| currentRange = newRange; |
| if (newRange != null) { |
| // copy the buffer so that we don't change the BufferChunk |
| decrypted = newRange.getData().slice(); |
| currentOffset = newRange.getOffset(); |
| // Move the position in the ByteBuffer to match the currentOffset, |
| // which is relative to the stream. |
| int start = (int) (position + offset - currentOffset); |
| decrypted.position(start); |
| // make sure the end of the buffer doesn't go past our stream |
| decrypted.limit(start + (int) Math.min(decrypted.remaining(), |
| length - position)); |
| } |
| } |
| |
| @Override |
| public int read(byte[] data, int offset, int length) { |
| if (decrypted == null || decrypted.remaining() == 0) { |
| if (position == this.length) { |
| return -1; |
| } |
| setCurrent(currentRange.next, false); |
| } |
| int actualLength = Math.min(length, decrypted.remaining()); |
| decrypted.get(data, offset, actualLength); |
| position += actualLength; |
| return actualLength; |
| } |
| |
| @Override |
| public int available() { |
| if (decrypted != null && decrypted.remaining() > 0) { |
| return decrypted.remaining(); |
| } |
| return (int) (length - position); |
| } |
| |
| @Override |
| public void close() { |
| currentRange = null; |
| position = length; |
| // explicit de-ref of bytes[] |
| decrypted = null; |
| bytes = null; |
| } |
| |
| @Override |
| public void changeIv(Consumer<byte[]> modifier) { |
| // nothing to do |
| } |
| |
| @Override |
| public void seek(PositionProvider index) throws IOException { |
| seek(index.getNext()); |
| } |
| |
| public void seek(long desired) throws IOException { |
| if (desired == 0 && bytes == null) { |
| return; |
| } |
| // compute the position of the desired point in file |
| long positionFile = desired + offset; |
| // If we are seeking inside of the current range, just reposition. |
| if (currentRange != null && positionFile >= currentRange.getOffset() && |
| positionFile < currentRange.getEnd()) { |
| decrypted.position((int) (positionFile - currentOffset)); |
| position = desired; |
| } else { |
| for (DiskRangeList curRange = bytes; curRange != null; |
| curRange = curRange.next) { |
| if (curRange.getOffset() <= positionFile && |
| (curRange.next == null ? positionFile <= curRange.getEnd() : |
| positionFile < curRange.getEnd())) { |
| position = desired; |
| setCurrent(curRange, true); |
| return; |
| } |
| } |
| throw new IllegalArgumentException("Seek in " + name + " to " + |
| desired + " is outside of the data"); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "uncompressed stream " + name + " position: " + position + |
| " length: " + length + " range: " + getRangeNumber(bytes, currentRange) + |
| " offset: " + currentRange.getOffset() + |
| " position: " + (decrypted == null ? 0 : decrypted.position()) + |
| " limit: " + (decrypted == null ? 0 : decrypted.limit()); |
| } |
| } |
| |
| private static ByteBuffer allocateBuffer(int size, boolean isDirect) { |
| // TODO: use the same pool as the ORC readers |
| if (isDirect) { |
| return ByteBuffer.allocateDirect(size); |
| } else { |
| return ByteBuffer.allocate(size); |
| } |
| } |
| |
| /** |
| * Manage the state of the decryption, including the ability to seek. |
| */ |
| static class EncryptionState { |
| private final Object name; |
| private final Key key; |
| private final byte[] iv; |
| private final Cipher cipher; |
| private final long offset; |
| private ByteBuffer decrypted; |
| |
| EncryptionState(Object name, long offset, StreamOptions options) { |
| this.name = name; |
| this.offset = offset; |
| EncryptionAlgorithm algorithm = options.getAlgorithm(); |
| key = options.getKey(); |
| iv = options.getIv(); |
| cipher = algorithm.createCipher(); |
| } |
| |
| void changeIv(Consumer<byte[]> modifier) { |
| modifier.accept(iv); |
| updateIv(); |
| OutStream.logKeyAndIv(name, key, iv); |
| } |
| |
| private void updateIv() { |
| try { |
| cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv)); |
| } catch (InvalidKeyException e) { |
| throw new IllegalArgumentException("Invalid key on " + name, e); |
| } catch (InvalidAlgorithmParameterException e) { |
| throw new IllegalArgumentException("Invalid iv on " + name, e); |
| } |
| } |
| |
| /** |
| * We are seeking to a new range, so update the cipher to change the IV |
| * to match. This code assumes that we only support encryption in CTR mode. |
| * @param offset where we are seeking to in the stream |
| */ |
| void changeIv(long offset) { |
| int blockSize = cipher.getBlockSize(); |
| long encryptionBlocks = offset / blockSize; |
| long extra = offset % blockSize; |
| CryptoUtils.clearCounter(iv); |
| if (encryptionBlocks != 0) { |
| // Add the encryption blocks into the initial iv, to compensate for |
| // skipping over decrypting those bytes. |
| int posn = iv.length - 1; |
| while (encryptionBlocks > 0) { |
| long sum = (iv[posn] & 0xff) + encryptionBlocks; |
| iv[posn--] = (byte) sum; |
| encryptionBlocks = sum / 0x100; |
| } |
| } |
| updateIv(); |
| // If the range starts at an offset that doesn't match the encryption |
| // block, we need to advance some bytes within an encryption block. |
| if (extra > 0) { |
| try { |
| byte[] wasted = new byte[(int) extra]; |
| cipher.update(wasted, 0, wasted.length, wasted, 0); |
| } catch (ShortBufferException e) { |
| throw new IllegalArgumentException("Short buffer in " + name, e); |
| } |
| } |
| } |
| |
| /** |
| * Decrypt the given range into the decrypted buffer. It is assumed that |
| * the cipher is correctly initialized by changeIv before this is called. |
| * @param encrypted the bytes to decrypt |
| * @return a reused ByteBuffer, which is used by each call to decrypt |
| */ |
| ByteBuffer decrypt(ByteBuffer encrypted) { |
| int length = encrypted.remaining(); |
| if (decrypted == null || decrypted.capacity() < length) { |
| decrypted = ByteBuffer.allocate(length); |
| } else { |
| decrypted.clear(); |
| } |
| try { |
| int output = cipher.update(encrypted, decrypted); |
| if (output != length) { |
| throw new IllegalArgumentException("Problem decrypting " + name + |
| " at " + offset); |
| } |
| } catch (ShortBufferException e) { |
| throw new IllegalArgumentException("Problem decrypting " + name + |
| " at " + offset, e); |
| } |
| decrypted.flip(); |
| return decrypted; |
| } |
| |
| void close() { |
| decrypted = null; |
| } |
| } |
| |
| /** |
| * Implements a stream over an encrypted, but uncompressed stream. |
| */ |
| public static class EncryptedStream extends UncompressedStream { |
| private final EncryptionState encrypt; |
| |
| public EncryptedStream(Object name, DiskRangeList input, long offset, long length, |
| StreamOptions options) { |
| super(name, offset, length); |
| encrypt = new EncryptionState(name, offset, options); |
| reset(input); |
| } |
| |
| @Override |
| protected void setCurrent(DiskRangeList newRange, boolean isJump) { |
| currentRange = newRange; |
| if (newRange != null) { |
| // what is the position of the start of the newRange? |
| currentOffset = newRange.getOffset(); |
| ByteBuffer encrypted = newRange.getData().slice(); |
| if (currentOffset < offset) { |
| int ignoreBytes = (int) (offset - currentOffset); |
| encrypted.position(ignoreBytes); |
| currentOffset = offset; |
| } |
| if (isJump) { |
| encrypt.changeIv(currentOffset - offset); |
| } |
| if (encrypted.remaining() > length + offset - currentOffset) { |
| encrypted.limit((int) (length + offset - currentOffset)); |
| } |
| decrypted = encrypt.decrypt(encrypted); |
| decrypted.position((int) (position + offset - currentOffset)); |
| } |
| } |
| |
| @Override |
| public void close() { |
| super.close(); |
| encrypt.close(); |
| } |
| |
| @Override |
| public void changeIv(Consumer<byte[]> modifier) { |
| encrypt.changeIv(modifier); |
| } |
| |
| @Override |
| public String toString() { |
| return "encrypted " + super.toString(); |
| } |
| } |
| |
| private static class CompressedStream extends InStream { |
| private final int bufferSize; |
| private ByteBuffer uncompressed; |
| private final CompressionCodec codec; |
| protected ByteBuffer compressed; |
| protected DiskRangeList currentRange; |
| private boolean isUncompressedOriginal; |
| |
| /** |
| * Create the stream without resetting the input stream. |
| * This is used in subclasses so they can finish initializing before |
| * reset is called. |
| * @param name the name of the stream |
| * @param length the total number of bytes in the stream |
| * @param options the options used to read the stream |
| */ |
| public CompressedStream(Object name, |
| long offset, |
| long length, |
| StreamOptions options) { |
| super(name, offset, length); |
| this.codec = options.codec; |
| this.bufferSize = options.bufferSize; |
| } |
| |
| /** |
| * Create the stream and initialize the input for the stream. |
| * @param name the name of the stream |
| * @param input the input data |
| * @param length the total length of the stream |
| * @param options the options to read the data with |
| */ |
| public CompressedStream(Object name, |
| DiskRangeList input, |
| long offset, |
| long length, |
| StreamOptions options) { |
| super(name, offset, length); |
| this.codec = options.codec; |
| this.bufferSize = options.bufferSize; |
| reset(input); |
| } |
| |
| private void allocateForUncompressed(int size, boolean isDirect) { |
| uncompressed = allocateBuffer(size, isDirect); |
| } |
| |
| @Override |
| protected void setCurrent(DiskRangeList newRange, |
| boolean isJump) { |
| currentRange = newRange; |
| if (newRange != null) { |
| compressed = newRange.getData().slice(); |
| int pos = (int) (position + offset - newRange.getOffset()); |
| compressed.position(pos); |
| compressed.limit(pos + (int) Math.min(compressed.remaining(), |
| length - position)); |
| } |
| } |
| |
| private int readHeaderByte() { |
| while (currentRange != null && |
| (compressed == null || compressed.remaining() <= 0)) { |
| setCurrent(currentRange.next, false); |
| } |
| if (compressed != null && compressed.remaining() > 0) { |
| position += 1; |
| return compressed.get() & 0xff; |
| } else { |
| throw new IllegalStateException("Can't read header at " + this); |
| } |
| } |
| |
| private void readHeader() throws IOException { |
| int b0 = readHeaderByte(); |
| int b1 = readHeaderByte(); |
| int b2 = readHeaderByte(); |
| boolean isOriginal = (b0 & 0x01) == 1; |
| int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); |
| |
| if (chunkLength > bufferSize) { |
| throw new IllegalArgumentException("Buffer size too small. size = " + |
| bufferSize + " needed = " + chunkLength + " in " + name); |
| } |
| ByteBuffer slice = this.slice(chunkLength); |
| |
| if (isOriginal) { |
| uncompressed = slice; |
| isUncompressedOriginal = true; |
| } else { |
| if (isUncompressedOriginal) { |
| allocateForUncompressed(bufferSize, slice.isDirect()); |
| isUncompressedOriginal = false; |
| } else if (uncompressed == null) { |
| allocateForUncompressed(bufferSize, slice.isDirect()); |
| } else { |
| uncompressed.clear(); |
| } |
| codec.decompress(slice, uncompressed); |
| } |
| } |
| |
| @Override |
| public int read() throws IOException { |
| if (!ensureUncompressed()) { |
| return -1; |
| } |
| return 0xff & uncompressed.get(); |
| } |
| |
| @Override |
| public int read(byte[] data, int offset, int length) throws IOException { |
| if (!ensureUncompressed()) { |
| return -1; |
| } |
| int actualLength = Math.min(length, uncompressed.remaining()); |
| uncompressed.get(data, offset, actualLength); |
| return actualLength; |
| } |
| |
| private boolean ensureUncompressed() throws IOException { |
| while (uncompressed == null || uncompressed.remaining() == 0) { |
| if (position == this.length) { |
| return false; |
| } |
| readHeader(); |
| } |
| return true; |
| } |
| |
| @Override |
| public int available() throws IOException { |
| if (!ensureUncompressed()) { |
| return 0; |
| } |
| return uncompressed.remaining(); |
| } |
| |
| @Override |
| public void close() { |
| uncompressed = null; |
| compressed = null; |
| currentRange = null; |
| position = length; |
| bytes = null; |
| } |
| |
| @Override |
| public void changeIv(Consumer<byte[]> modifier) { |
| // nothing to do |
| } |
| |
| @Override |
| public void seek(PositionProvider index) throws IOException { |
| seek(index.getNext()); |
| long uncompressedBytes = index.getNext(); |
| if (uncompressedBytes != 0) { |
| readHeader(); |
| uncompressed.position(uncompressed.position() + |
| (int) uncompressedBytes); |
| } else if (uncompressed != null) { |
| // mark the uncompressed buffer as done |
| uncompressed.position(uncompressed.limit()); |
| } |
| } |
| |
| /* slices a read only contiguous buffer of chunkLength */ |
| private ByteBuffer slice(int chunkLength) throws IOException { |
| int len = chunkLength; |
| final DiskRangeList oldRange = currentRange; |
| final long oldPosition = position; |
| ByteBuffer slice; |
| if (compressed.remaining() >= len) { |
| slice = compressed.slice(); |
| // simple case |
| slice.limit(len); |
| position += len; |
| compressed.position(compressed.position() + len); |
| return slice; |
| } else if (currentRange.next == null) { |
| // nothing has been modified yet |
| throw new IOException("EOF in " + this + " while trying to read " + |
| chunkLength + " bytes"); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format( |
| "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)", |
| compressed.remaining(), len)); |
| } |
| |
| // we need to consolidate 2 or more buffers into 1 |
| // first copy out compressed buffers |
| ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); |
| position += compressed.remaining(); |
| len -= compressed.remaining(); |
| copy.put(compressed); |
| |
| while (currentRange.next != null) { |
| setCurrent(currentRange.next, false); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); |
| } |
| if (compressed.remaining() >= len) { |
| slice = compressed.slice(); |
| slice.limit(len); |
| copy.put(slice); |
| position += len; |
| compressed.position(compressed.position() + len); |
| copy.flip(); |
| return copy; |
| } |
| position += compressed.remaining(); |
| len -= compressed.remaining(); |
| copy.put(compressed); |
| } |
| |
| // restore offsets for exception clarity |
| position = oldPosition; |
| setCurrent(oldRange, true); |
| throw new IOException("EOF in " + this + " while trying to read " + |
| chunkLength + " bytes"); |
| } |
| |
| void seek(long desired) throws IOException { |
| if (desired == 0 && bytes == null) { |
| return; |
| } |
| long posn = desired + offset; |
| for (DiskRangeList range = bytes; range != null; range = range.next) { |
| if (range.getOffset() <= posn && |
| (range.next == null ? posn <= range.getEnd() : |
| posn < range.getEnd())) { |
| position = desired; |
| setCurrent(range, true); |
| return; |
| } |
| } |
| throw new IOException("Seek outside of data in " + this + " to " + desired); |
| } |
| |
| private String rangeString() { |
| StringBuilder builder = new StringBuilder(); |
| int i = 0; |
| for (DiskRangeList range = bytes; range != null; range = range.next){ |
| if (i != 0) { |
| builder.append("; "); |
| } |
| builder.append(" range "); |
| builder.append(i); |
| builder.append(" = "); |
| builder.append(range.getOffset()); |
| builder.append(" to "); |
| builder.append(range.getEnd()); |
| ++i; |
| } |
| return builder.toString(); |
| } |
| |
| @Override |
| public String toString() { |
| return "compressed stream " + name + " position: " + position + |
| " length: " + length + " range: " + getRangeNumber(bytes, currentRange) + |
| " offset: " + (compressed == null ? 0 : compressed.position()) + |
| " limit: " + (compressed == null ? 0 : compressed.limit()) + |
| rangeString() + |
| (uncompressed == null ? "" : |
| " uncompressed: " + uncompressed.position() + " to " + |
| uncompressed.limit()); |
| } |
| } |
| |
| private static class EncryptedCompressedStream extends CompressedStream { |
| private final EncryptionState encrypt; |
| |
| public EncryptedCompressedStream(Object name, |
| DiskRangeList input, |
| long offset, |
| long length, |
| StreamOptions options) { |
| super(name, offset, length, options); |
| encrypt = new EncryptionState(name, offset, options); |
| reset(input); |
| } |
| |
| @Override |
| protected void setCurrent(DiskRangeList newRange, boolean isJump) { |
| currentRange = newRange; |
| if (newRange != null) { |
| // what is the position of the start of the newRange? |
| long rangeOffset = newRange.getOffset(); |
| int ignoreBytes = 0; |
| ByteBuffer encrypted = newRange.getData().slice(); |
| if (rangeOffset < offset) { |
| ignoreBytes = (int) (offset - rangeOffset); |
| encrypted.position(ignoreBytes); |
| } |
| if (isJump) { |
| encrypt.changeIv(ignoreBytes + rangeOffset - offset); |
| } |
| encrypted.limit(ignoreBytes + |
| (int) Math.min(encrypted.remaining(), length)); |
| compressed = encrypt.decrypt(encrypted); |
| if (position + offset > rangeOffset + ignoreBytes) { |
| compressed.position((int) (position + offset - rangeOffset - ignoreBytes)); |
| } |
| } |
| } |
| |
| @Override |
| public void close() { |
| super.close(); |
| encrypt.close(); |
| } |
| |
| @Override |
| public void changeIv(Consumer<byte[]> modifier) { |
| encrypt.changeIv(modifier); |
| } |
| |
| @Override |
| public String toString() { |
| return "encrypted " + super.toString(); |
| } |
| } |
| |
| public abstract void seek(PositionProvider index) throws IOException; |
| |
| public static class StreamOptions implements Cloneable { |
| private CompressionCodec codec; |
| private int bufferSize; |
| private EncryptionAlgorithm algorithm; |
| private Key key; |
| private byte[] iv; |
| |
| public StreamOptions(StreamOptions other) { |
| codec = other.codec; |
| bufferSize = other.bufferSize; |
| algorithm = other.algorithm; |
| key = other.key; |
| iv = other.iv == null ? null : other.iv.clone(); |
| } |
| |
| public StreamOptions() { |
| } |
| |
| public StreamOptions withCodec(CompressionCodec value) { |
| this.codec = value; |
| return this; |
| } |
| |
| public StreamOptions withBufferSize(int value) { |
| bufferSize = value; |
| return this; |
| } |
| |
| public StreamOptions withEncryption(EncryptionAlgorithm algorithm, |
| Key key, |
| byte[] iv) { |
| this.algorithm = algorithm; |
| this.key = key; |
| this.iv = iv; |
| return this; |
| } |
| |
| public boolean isCompressed() { |
| return codec != null; |
| } |
| |
| public CompressionCodec getCodec() { |
| return codec; |
| } |
| |
| public int getBufferSize() { |
| return bufferSize; |
| } |
| |
| public EncryptionAlgorithm getAlgorithm() { |
| return algorithm; |
| } |
| |
| public Key getKey() { |
| return key; |
| } |
| |
| public byte[] getIv() { |
| return iv; |
| } |
| |
| @Override |
| public StreamOptions clone() { |
| try { |
| StreamOptions clone = (StreamOptions) super.clone(); |
| if (clone.codec != null) { |
| // Make sure we don't share the same codec between two readers. |
| clone.codec = OrcCodecPool.getCodec(codec.getKind()); |
| } |
| return clone; |
| } catch (CloneNotSupportedException e) { |
| throw new UnsupportedOperationException("uncloneable", e); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder buffer = new StringBuilder(); |
| buffer.append("compress: "); |
| buffer.append(codec == null ? "none" : codec.getKind()); |
| buffer.append(", buffer size: "); |
| buffer.append(bufferSize); |
| if (key != null) { |
| buffer.append(", encryption: "); |
| buffer.append(algorithm); |
| } |
| return buffer.toString(); |
| } |
| } |
| |
| public static StreamOptions options() { |
| return new StreamOptions(); |
| } |
| |
| /** |
| * Create an input stream from a list of disk ranges with data. |
| * @param name the name of the stream |
| * @param input the list of ranges of bytes for the stream; from disk or cache |
| * @param offset the first byte offset of the stream |
| * @param length the length in bytes of the stream |
| * @param options the options to read with |
| * @return an input stream |
| */ |
| public static InStream create(Object name, |
| DiskRangeList input, |
| long offset, |
| long length, |
| StreamOptions options) { |
| LOG.debug("Reading {} with {} from {} for {}", name, options, offset, |
| length); |
| if (options == null || options.codec == null) { |
| if (options == null || options.key == null) { |
| return new UncompressedStream(name, input, offset, length); |
| } else { |
| OutStream.logKeyAndIv(name, options.getKey(), options.getIv()); |
| return new EncryptedStream(name, input, offset, length, options); |
| } |
| } else if (options.key == null) { |
| return new CompressedStream(name, input, offset, length, options); |
| } else { |
| OutStream.logKeyAndIv(name, options.getKey(), options.getIv()); |
| return new EncryptedCompressedStream(name, input, offset, length, options); |
| } |
| } |
| |
| /** |
| * Create an input stream from a list of disk ranges with data. |
| * @param name the name of the stream |
| * @param input the list of ranges of bytes for the stream; from disk or cache |
| * @param length the length in bytes of the stream |
| * @return an input stream |
| */ |
| public static InStream create(Object name, |
| DiskRangeList input, |
| long offset, |
| long length) { |
| return create(name, input, offset, length, null); |
| } |
| |
| /** |
| * Creates coded input stream (used for protobuf message parsing) with higher |
| * message size limit. |
| * |
| * @param inStream the stream to wrap. |
| * @return coded input stream |
| */ |
| public static CodedInputStream createCodedInputStream(InStream inStream) { |
| CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream); |
| codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT); |
| return codedInputStream; |
| } |
| } |