blob: 0ca2cfec05cd92f573725d710e2d7b96a4f54d61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.text.reader;
import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.CompressionInputStream;
import io.netty.buffer.DrillBuf;
import io.netty.util.internal.PlatformDependent;
/**
* Class that fronts an InputStream to provide a byte consumption interface.
* Also manages only reading lines to and from each split.
*/
final class TextInput {
private final byte[] lineSeparator;
private final byte normalizedLineSeparator;
private final TextParsingSettings settings;
private long lineCount;
private long charCount;
/**
* The starting position in the file.
*/
private final long startPos;
private final long endPos;
private long streamPos;
private final Seekable seekable;
private final FSDataInputStream inputFS;
private final InputStream input;
private final DrillBuf buffer;
private final ByteBuffer underlyingBuffer;
private final long bStart;
private final long bStartMinus1;
private final boolean bufferReadable;
/**
* Whether there was a possible partial line separator on the previous
* read so we dropped it and it should be appended to next read.
*/
private int remByte = -1;
/**
* The current position in the buffer.
*/
private int bufferPtr;
/**
* The quantity of valid data in the buffer.
*/
private int length = -1;
private boolean endFound = false;
/**
* Creates a new instance with the mandatory characters for handling newlines
* transparently. lineSeparator the sequence of characters that represent a
* newline, as defined in {@link TextParsingSettings#getNewLineDelimiter()}
* normalizedLineSeparator the normalized newline character (as defined in
* {@link TextParsingSettings#getNormalizedNewLine()}) that is used to replace any
* lineSeparator sequence found in the input.
*/
TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
this.lineSeparator = settings.getNewLineDelimiter();
byte normalizedLineSeparator = settings.getNormalizedNewLine();
Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
boolean isCompressed = input instanceof CompressionInputStream;
Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
// splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
if (isCompressed && endPos > 0) {
endPos = Long.MAX_VALUE;
}
this.input = input;
this.seekable = (Seekable) input;
this.settings = settings;
if (input instanceof FSDataInputStream) {
this.inputFS = (FSDataInputStream) input;
this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
} else {
this.inputFS = null;
this.bufferReadable = false;
}
this.startPos = startPos;
this.endPos = endPos;
this.normalizedLineSeparator = normalizedLineSeparator;
this.buffer = readBuffer;
this.bStart = buffer.memoryAddress();
this.bStartMinus1 = bStart -1;
this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
}
/**
* Test the input to position for read start. If the input is a non-zero split or
* splitFirstLine is enabled, input will move to appropriate complete line.
* @throws IOException for input file read errors
*/
final void start() throws IOException {
lineCount = 0;
if(startPos > 0){
seekable.seek(startPos);
}
updateBuffer();
if (length > 0) {
if (startPos > 0 || settings.isSkipFirstLine()) {
// move to next full record.
try {
skipLines(1);
} catch (StreamFinishedPseudoException e) {
// file does not have any more lines, ignore
}
}
}
}
/**
* Helper method to get the most recent characters consumed since the last record started.
* May get an incomplete string since we don't support stream rewind. Returns empty string for now.
*
* @return String of last few bytes.
*/
public String getStringSinceMarkForError() {
return " ";
}
long getPos() {
return streamPos + bufferPtr;
}
public void mark() { }
/**
* Read some more bytes from the stream. Uses the zero copy interface if available.
* Otherwise, does byte copy.
*
* @throws IOException for input file read errors
*/
private void read() throws IOException {
if (bufferReadable) {
if (remByte != -1) {
for (int i = 0; i <= remByte; i++) {
underlyingBuffer.put(lineSeparator[i]);
}
remByte = -1;
}
length = inputFS.read(underlyingBuffer);
} else {
byte[] b = new byte[underlyingBuffer.capacity()];
if (remByte != -1){
int remBytesNum = remByte + 1;
System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
length = input.read(b, remBytesNum, b.length - remBytesNum);
remByte = -1;
} else {
length = input.read(b);
}
underlyingBuffer.put(b);
}
}
/**
* Read more data into the buffer. Will also manage split end conditions.
*
* @throws IOException for input file read errors
*/
private void updateBuffer() throws IOException {
streamPos = seekable.getPos();
underlyingBuffer.clear();
if (endFound) {
length = -1;
return;
}
read();
// check our data read allowance.
if (streamPos + length >= this.endPos) {
updateLengthBasedOnConstraint();
}
charCount += bufferPtr;
bufferPtr = 1;
buffer.writerIndex(underlyingBuffer.limit());
buffer.readerIndex(underlyingBuffer.position());
}
/**
* Checks to see if we can go over the end of our bytes constraint on the data. If so,
* adjusts so that we can only read to the last character of the first line that crosses
* the split boundary.
*/
private void updateLengthBasedOnConstraint() {
final long max = bStart + length;
for (long m = bStart + (endPos - streamPos); m < max; m++) {
for (int i = 0; i < lineSeparator.length; i++) {
long mPlus = m + i;
if (mPlus < max) {
// we found a line separator and don't need to consult the next byte.
if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) {
length = (int) (mPlus - bStart) + 1;
endFound = true;
return;
}
} else {
// the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read.
remByte = i;
length = length - i;
return;
}
}
}
}
/**
* Get next byte from stream. Also maintains the current line count. Will throw a
* {@link StreamFinishedPseudoException} when the stream has run out of bytes.
*
* @return next byte from stream.
* @throws IOException for input file read errors
*/
public final byte nextChar() throws IOException {
byte byteChar = nextCharNoNewLineCheck();
int bufferPtrTemp = bufferPtr - 1;
if (byteChar == lineSeparator[0]) {
for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
return byteChar;
}
}
lineCount++;
byteChar = normalizedLineSeparator;
// we don't need to update buffer position if line separator is one byte long
if (lineSeparator.length > 1) {
bufferPtr += (lineSeparator.length - 1);
if (bufferPtr >= length) {
if (length != -1) {
updateBuffer();
} else {
throw StreamFinishedPseudoException.INSTANCE;
}
}
}
}
return byteChar;
}
/**
* Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException
* when the stream has run out of bytes.
*
* @return next byte from stream.
* @throws IOException for input file read errors
*/
public final byte nextCharNoNewLineCheck() throws IOException {
if (length == -1) {
throw StreamFinishedPseudoException.INSTANCE;
}
rangeCheck(buffer, bufferPtr - 1, bufferPtr);
byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
if (bufferPtr >= length) {
if (length != -1) {
updateBuffer();
bufferPtr--;
} else {
throw StreamFinishedPseudoException.INSTANCE;
}
}
bufferPtr++;
return byteChar;
}
/**
* Number of lines read since the start of this split.
* @return current line count
*/
public final long lineCount() {
return lineCount;
}
/**
* Skip forward the number of line delimiters. If you are in the middle of a line,
* a value of 1 will skip to the start of the next record.
*
* @param lines Number of lines to skip.
* @throws IOException for input file read errors
* @throws IllegalArgumentException if unable to skip the requested number
* of lines
*/
public final void skipLines(int lines) throws IOException {
if (lines < 1) {
return;
}
long expectedLineCount = this.lineCount + lines;
try {
do {
nextChar();
} while (lineCount < expectedLineCount);
if (lineCount < lines) {
throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
}
} catch (EOFException ex) {
throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
}
}
public final long charCount() {
return charCount + bufferPtr;
}
public void close() throws IOException{
input.close();
}
}