blob: 520ddf6bdf401cf81e6d6de7f2558c9c5428e324 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.Text;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/**
* A class that provides a line reader from an input stream.
* Depending on the constructor used, lines will either be terminated by:
* <ul>
* <li>one of the following: '\n' (LF) , '\r' (CR),
* or '\r\n' (CR+LF).</li>
* <li><em>or</em>, a custom byte sequence delimiter</li>
* </ul>
* In both cases, EOF also terminates an otherwise unterminated
* line.
*/
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class LineReader implements Closeable, IOStatisticsSource {
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
// the number of bytes of real data in the buffer
private int bufferLength = 0;
// the current position in the buffer
private int bufferPosn = 0;
private static final byte CR = '\r';
private static final byte LF = '\n';
// The line delimiter
private final byte[] recordDelimiterBytes;
/**
* Create a line reader that reads from the given stream using the
* default buffer-size (64k).
* @param in The input stream
*/
public LineReader(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
/**
* Create a line reader that reads from the given stream using the
* given buffer-size.
* @param in The input stream
* @param bufferSize Size of the read buffer
*/
public LineReader(InputStream in, int bufferSize) {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = null;
}
/**
* Create a line reader that reads from the given stream using the
* <code>io.file.buffer.size</code> specified in the given
* <code>Configuration</code>.
* @param in input stream
* @param conf configuration
* @throws IOException
*/
public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE));
}
/**
* Create a line reader that reads from the given stream using the
* default buffer-size, and using a custom delimiter of array of
* bytes.
* @param in The input stream
* @param recordDelimiterBytes The delimiter
*/
public LineReader(InputStream in, byte[] recordDelimiterBytes) {
this.in = in;
this.bufferSize = DEFAULT_BUFFER_SIZE;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Create a line reader that reads from the given stream using the
* given buffer-size, and using a custom delimiter of array of
* bytes.
* @param in The input stream
* @param bufferSize Size of the read buffer
* @param recordDelimiterBytes The delimiter
*/
public LineReader(InputStream in, int bufferSize,
byte[] recordDelimiterBytes) {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Create a line reader that reads from the given stream using the
* <code>io.file.buffer.size</code> specified in the given
* <code>Configuration</code>, and using a custom delimiter of array of
* bytes.
* @param in input stream
* @param conf configuration
* @param recordDelimiterBytes The delimiter
* @throws IOException
*/
public LineReader(InputStream in, Configuration conf,
byte[] recordDelimiterBytes) throws IOException {
this.in = in;
this.bufferSize = conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Close the underlying stream.
* @throws IOException
*/
public void close() throws IOException {
in.close();
}
/**
* Return any IOStatistics provided by the source.
* @return IO stats from the input stream.
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(in);
}
/**
* Read one line from the InputStream into the given Text.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str;
* the rest of the line is silently discarded.
* @param maxBytesToConsume the maximum number of bytes to consume
* in this call. This is only a hint, because if the line cross
* this threshold, we allow it to happen. It can overshoot
* potentially by as much as one buffer length.
*
* @return the number of bytes read including the (longest) newline
* found.
*
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
return in.read(buffer);
}
/**
* Read a line terminated by one of CR, LF, or CRLF.
*/
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it's LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
/**
* Read a line terminated by a custom delimiter.
*/
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from inputStream, but the head of the stream may be
* already captured in the previous buffer, so we have several cases:
*
* 1. The buffer tail does not contain any character sequence which
* matches with the head of delimiter. We count it as a
* ambiguous byte count = 0
*
* 2. The buffer tail contains a X number of characters,
* that forms a sequence, which matches with the
* head of delimiter. We count ambiguous byte count = X
*
* // *** eg: A segment of input file is as follows
*
* " record 1792: I found this bug very interesting and
* I have completely read about it. record 1793: This bug
* can be solved easily record 1794: This ."
*
* delimiter = "record";
*
* supposing:- String at the end of buffer =
* "I found this bug very interesting and I have completely re"
* There for next buffer = "ad about it. record 179 ...."
*
* The matching characters in the input
* buffer tail and delimiter head = "re"
* Therefore, ambiguous byte count = 2 **** //
*
* 2.1 If the following bytes are the remaining characters of
* the delimiter, then we have to capture only up to the starting
* position of delimiter. That means, we need not include the
* ambiguous characters in str.
*
* 2.2 If the following bytes are not the remaining characters of
* the delimiter ( as mentioned in the example ),
* then we have to include the ambiguous characters in str.
*/
str.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0;
int delPosn = 0;
int ambiguousByteCount=0; // To capture the ambiguous characters count
do {
int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += ambiguousByteCount;
}
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
delPosn++;
if (delPosn >= recordDelimiterBytes.length) {
bufferPosn++;
break;
}
} else if (delPosn != 0) {
bufferPosn -= delPosn;
if(bufferPosn < -1) {
bufferPosn = -1;
}
delPosn = 0;
}
}
int readLength = bufferPosn - startPosn;
bytesConsumed += readLength;
int appendLength = readLength - delPosn;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
bytesConsumed += ambiguousByteCount;
if (appendLength >= 0 && ambiguousByteCount > 0) {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
// since it is now certain that the split did not split a delimiter we
// should not read the next record: clear the flag otherwise duplicate
// records could be generated
unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
if (bufferPosn >= bufferLength) {
if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
ambiguousByteCount = delPosn;
bytesConsumed -= ambiguousByteCount; //to be consumed in next
}
}
} while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
}
return (int) bytesConsumed;
}
/**
* Read from the InputStream into the given Text.
* @param str the object to store the given line
* @param maxLineLength the maximum number of bytes to store into str.
* @return the number of bytes read including the newline
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}
/**
* Read from the InputStream into the given Text.
* @param str the object to store the given line
* @return the number of bytes read including the newline
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
protected int getBufferPosn() {
return bufferPosn;
}
protected int getBufferSize() {
return bufferSize;
}
protected void unsetNeedAdditionalRecordAfterSplit() {
// needed for custom multi byte line delimiters only
// see MAPREDUCE-6549 for details
}
}