blob: d9e8b2e6687cd3668507ce014070f58d01c4222c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PositionedReadable;
* This controls how an {@link AbstractBlockReader} reads a {@link BlockMetadata}.
* @param <STREAM> type of stream
* @since 2.1.0
public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
* Initializes the reader context.
* @param stream input stream
* @param blockMetadata block-metadata
* @param consecutiveBlock if current block was a consecutive block in the source. if it is then we continue reading
* from the last offset.
void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock);
* Reads an entity. Returns null when the block is processed.
* @return {@link Entity}. null when the work is done.
* @throws IOException
Entity next() throws IOException;
* Represents the total bytes used to construct the record.<br/>
* Used bytes can be different from the bytes in the record.
class Entity
private byte[] record;
private long usedBytes;
public void clear()
record = null;
usedBytes = -1;
public byte[] getRecord()
return record;
public void setRecord(byte[] record)
this.record = record;
public long getUsedBytes()
return usedBytes;
public void setUsedBytes(long usedBytes)
this.usedBytes = usedBytes;
* An Abstract reader context which assumes that a block boundary is never crossed if the last entity read is fully
* contained in the block.
* @param <STREAM> type of stream.
abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM>
protected transient long offset;
protected transient STREAM stream;
protected transient BlockMetadata blockMetadata;
protected final transient Entity entity;
protected AbstractReaderContext()
offset = -1;
entity = new Entity();
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
{ = stream;
this.blockMetadata = blockMetadata;
if (!consecutiveBlock) {
offset = blockMetadata.getOffset();
public Entity next() throws IOException
if (offset < blockMetadata.getLength()) {
Entity entity = readEntity();
offset += entity.usedBytes;
return entity;
return null;
protected abstract Entity readEntity() throws IOException;
* This reader context splits the block into entities on '\n' or '\r'.<br/>
* It will not read ahead of the block boundary if the last entity was completely contained in the block.<br/>
* Any records formed using this context will need a way to validate the start of the record.
* @param <STREAM> type of stream.
class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
protected int bufferSize;
* overflowBufferSize is the number of bytes fetched when a record overflows
* to consecutive block
protected int overflowBufferSize;
private final transient ByteArrayOutputStream lineBuilder;
private final transient ByteArrayOutputStream emptyBuilder;
private final transient ByteArrayOutputStream tmpBuilder;
protected transient byte[] buffer;
private transient String bufferStr;
private transient int posInStr;
private transient boolean overflowBlockRead;
public LineReaderContext()
bufferSize = 8192;
overflowBufferSize = 8192;
lineBuilder = new ByteArrayOutputStream();
emptyBuilder = new ByteArrayOutputStream();
tmpBuilder = new ByteArrayOutputStream();
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
overflowBlockRead = false;
posInStr = 0;
offset = blockMetadata.getOffset();
super.initialize(stream, blockMetadata, consecutiveBlock);
* Reads bytes from the stream starting from the offset into the buffer
* @param bytesFromCurrentOffset
* bytes read till now from current block
* @param bytesToFetch
* the number of bytes to be read from stream
* @return the number of bytes actually read, -1 if 0 bytes read
* @throws IOException
protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
if (buffer == null) {
buffer = new byte[bytesToFetch];
return + bytesFromCurrentOffset, buffer, 0, bytesToFetch);
* @param usedBytesFromOffset
* number of bytes the pointer is ahead of the offset
* @return true if end of stream reached, false otherwise
protected boolean checkEndOfStream(final long usedBytesFromOffset)
if (!overflowBlockRead) {
return (offset - blockMetadata.getOffset() + usedBytesFromOffset < bufferSize);
} else {
return (offset - blockMetadata.getOffset() + usedBytesFromOffset < overflowBufferSize);
* Gives the number of bytes to be fetched from the stream
* @param overflowBlockRead
* indicates whether we are reading main block or overflow block
* @return bytes to be fetched from stream
protected int calculateBytesToFetch()
return (overflowBlockRead ? overflowBufferSize : (bufferSize));
protected Entity readEntity() throws IOException
//Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block
// boundary and faced issues with duplicate records. Controlling the buffer size didn't help either.
boolean foundEOL = false;
int bytesRead = 0;
long usedBytes = 0;
while (!foundEOL) {
if (posInStr == 0) {
int bytesToFetch = calculateBytesToFetch();
overflowBlockRead = true;
bytesRead = readData(usedBytes, bytesToFetch);
if (bytesRead == -1) {
bufferStr = new String(buffer,0, bytesRead);
while (posInStr < bufferStr.length()) {
char c = bufferStr.charAt(posInStr);
if (c != '\r' && c != '\n') {
} else {
foundEOL = true;
byte[] subLine = tmpBuilder.toByteArray();
usedBytes += subLine.length;
if (foundEOL) {
while (posInStr < bufferStr.length()) {
char c = bufferStr.charAt(posInStr);
if (c == '\r' || c == '\n') {
} else {
usedBytes += emptyBuilder.toByteArray().length;
} else {
//end of stream reached
if (checkEndOfStream(usedBytes)) {
//read more bytes from the input stream
posInStr = 0;
//when end of stream is reached then bytesRead is -1
if (bytesRead == -1) {
return null;
entity.record = lineBuilder.toByteArray();
entity.usedBytes = usedBytes;
return entity;
* Sets the buffer size of read.
* @param bufferSize size of the buffer
public void setBufferSize(int bufferSize)
this.bufferSize = bufferSize;
* @return the buffer size of read.
public int getBufferSize()
return this.bufferSize;
* Sets the overflow buffer size of read.
* @param overflowBufferSize
* size of the overflow buffer
public void setOverflowBufferSize(int overflowBufferSize)
this.overflowBufferSize = overflowBufferSize;
* @param buffer
* the bytes read from the source
protected void setBuffer(byte[] buffer)
this.buffer = buffer;
* Sets whether to read overflow block during next fetch.
* @param overflowBlockRead
* boolean indicating whether to read overflow block during next read
public void setOverflowBlockRead(boolean overflowBlockRead)
this.overflowBlockRead = overflowBlockRead;
* Returns a boolean indicating whether to read overflow block during next read
* @returnoverflowBlockRead
protected boolean isOverflowBlockRead()
return overflowBlockRead;
* Another reader context that splits the block into records on '\n' or '\r'.<br/>
* This implementation doesn't need a way to validate the start of a record.<br/>
* <p/>
* This starts parsing the block (except the first block of the file) from the first eol character.
* It is a less optimized version of an {@link LineReaderContext} which always reads beyond the block
* boundary.
* @param <STREAM>
class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM>
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
super.initialize(stream, blockMetadata, consecutiveBlock);
//ignore first entity of all the blocks except the first one because those bytes
//were used during the parsing of the previous block.
if (blockMetadata.getPreviousBlockId() != -1 && blockMetadata.getOffset() != 0) {
try {
Entity entity = readEntity();
offset += entity.usedBytes;
} catch (IOException e) {
throw new RuntimeException("when reading first entity", e);
public Entity next() throws IOException
if (offset < blockMetadata.getLength() || (offset == blockMetadata.getLength() && !blockMetadata.isLastBlock())) {
Entity entity = readEntity();
offset += entity.usedBytes;
return entity;
return null;
protected int calculateBytesToFetch()
* With readAheadLineReaderContext, we always read at least one overflowBlock. Hence, fetch it in advance
return (this.isOverflowBlockRead() ? overflowBufferSize : (bufferSize + overflowBufferSize));
* This creates fixed sized entities.<br/>
* It doesn't read beyond the block boundary therefore the last byte-array could be smaller.<br/>
* @param <STREAM> type of stream.
class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM>
//When this field is null, it is initialized to default fs block size in setup.
protected Integer length;
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
if (length == null) {
length = (int)new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024);
LOG.debug("length init {}", length);
super.initialize(stream, blockMetadata, consecutiveBlock);
protected Entity readEntity() throws IOException
int bytesToRead = length;
if (offset + length >= blockMetadata.getLength()) {
bytesToRead = (int)(blockMetadata.getLength() - offset);
byte[] record = new byte[bytesToRead];
stream.readFully(offset, record, 0, bytesToRead);
entity.usedBytes = bytesToRead;
entity.record = record;
return entity;
* Sets the length of each record.
* @param length fixed length of each record.
public void setLength(Integer length)
this.length = length;
* @return the length of record.
public Integer getLength()
return this.length;
private static final Logger LOG = LoggerFactory.getLogger(FixedBytesReaderContext.class);