blob: 8ce1fdb44f4ce9db20a4c26307d29ccc5c60b682 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
/**
* Implementation detail of {@link TextIO.Read}.
*
* <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
*
* <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code
* \r\n} as the delimiter. This source is not strict and supports decoding the last record even if
* it is not delimited. Finally, no records are decoded if the stream is empty.
*
* <p>This source supports reading from any arbitrary byte position within the stream. If the
* starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
* representing the beginning of the first record to be decoded.
*/
@VisibleForTesting
class TextSource extends FileBasedSource<String> {
byte[] delimiter;
TextSource(
ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
super(fileSpec, emptyMatchTreatment, 1L);
this.delimiter = delimiter;
}
private TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
super(metadata, 1L, start, end);
this.delimiter = delimiter;
}
@Override
protected FileBasedSource<String> createForSubrangeOfFile(
MatchResult.Metadata metadata, long start, long end) {
return new TextSource(metadata, start, end, delimiter);
}
@Override
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
return new TextBasedReader(this, delimiter);
}
@Override
public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
/**
* A {@link FileBasedReader FileBasedReader} which can decode records delimited by delimiter
* characters.
*
* <p>See {@link TextSource} for further details.
*/
@VisibleForTesting
static class TextBasedReader extends FileBasedReader<String> {
private static final int READ_BUFFER_SIZE = 8192;
private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
private ByteString buffer;
private int startOfDelimiterInBuffer;
private int endOfDelimiterInBuffer;
private long startOfRecord;
private volatile long startOfNextRecord;
private volatile boolean eof;
private volatile boolean elementIsPresent;
private @Nullable String currentValue;
private @Nullable ReadableByteChannel inChannel;
private @Nullable byte[] delimiter;
private TextBasedReader(TextSource source, byte[] delimiter) {
super(source);
buffer = ByteString.EMPTY;
this.delimiter = delimiter;
}
@Override
protected long getCurrentOffset() throws NoSuchElementException {
if (!elementIsPresent) {
throw new NoSuchElementException();
}
return startOfRecord;
}
@Override
public long getSplitPointsRemaining() {
if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
return isDone() ? 0 : 1;
}
return super.getSplitPointsRemaining();
}
@Override
public String getCurrent() throws NoSuchElementException {
if (!elementIsPresent) {
throw new NoSuchElementException();
}
return currentValue;
}
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
this.inChannel = channel;
// If the first offset is greater than zero, we need to skip bytes until we see our
// first delimiter.
long startOffset = getCurrentSource().getStartOffset();
if (startOffset > 0) {
checkState(
channel instanceof SeekableByteChannel,
"%s only supports reading from a SeekableByteChannel when given a start offset"
+ " greater than 0.",
TextSource.class.getSimpleName());
long requiredPosition = startOffset - 1;
if (delimiter != null && startOffset >= delimiter.length) {
// we need to move back the offset of at worse delimiter.size to be sure to see
// all the bytes of the delimiter in the call to findDelimiterBounds() below
requiredPosition = startOffset - delimiter.length;
}
((SeekableByteChannel) channel).position(requiredPosition);
findDelimiterBounds();
buffer = buffer.substring(endOfDelimiterInBuffer);
startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
endOfDelimiterInBuffer = 0;
startOfDelimiterInBuffer = 0;
}
}
/**
* Locates the start position and end position of the next delimiter. Will consume the channel
* till either EOF or the delimiter bounds are found.
*
* <p>This fills the buffer and updates the positions as follows:
*
* <pre>{@code
* ------------------------------------------------------
* | element bytes | delimiter bytes | unconsumed bytes |
* ------------------------------------------------------
* 0 start of end of buffer
* delimiter delimiter size
* in buffer in buffer
* }</pre>
*/
private void findDelimiterBounds() throws IOException {
int bytePositionInBuffer = 0;
while (true) {
if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
break;
}
byte currentByte = buffer.byteAt(bytePositionInBuffer);
if (delimiter == null) {
// default delimiter
if (currentByte == '\n') {
startOfDelimiterInBuffer = bytePositionInBuffer;
endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
break;
} else if (currentByte == '\r') {
startOfDelimiterInBuffer = bytePositionInBuffer;
endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
currentByte = buffer.byteAt(bytePositionInBuffer + 1);
if (currentByte == '\n') {
endOfDelimiterInBuffer += 1;
}
}
break;
}
} else {
// user defined delimiter
int i = 0;
// initialize delimiter not found
startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
while ((i <= delimiter.length - 1) && (currentByte == delimiter[i])) {
// read next byte
i++;
if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 1)) {
currentByte = buffer.byteAt(bytePositionInBuffer + i);
} else {
// corner case: delimiter truncated at the end of the file
startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
break;
}
}
if (i == delimiter.length) {
// all bytes of delimiter found
endOfDelimiterInBuffer = bytePositionInBuffer + i;
break;
}
}
// Move to the next byte in buffer.
bytePositionInBuffer += 1;
}
}
@Override
protected boolean readNextRecord() throws IOException {
startOfRecord = startOfNextRecord;
findDelimiterBounds();
// If we have reached EOF file and consumed all of the buffer then we know
// that there are no more records.
if (eof && buffer.isEmpty()) {
elementIsPresent = false;
return false;
}
decodeCurrentElement();
startOfNextRecord = startOfRecord + endOfDelimiterInBuffer;
return true;
}
/**
* Decodes the current element updating the buffer to only contain the unconsumed bytes.
*
* <p>This invalidates the currently stored {@code startOfDelimiterInBuffer} and {@code
* endOfDelimiterInBuffer}.
*/
private void decodeCurrentElement() throws IOException {
ByteString dataToDecode = buffer.substring(0, startOfDelimiterInBuffer);
currentValue = dataToDecode.toStringUtf8();
elementIsPresent = true;
buffer = buffer.substring(endOfDelimiterInBuffer);
}
/** Returns false if we were unable to ensure the minimum capacity by consuming the channel. */
private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
// While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
// attempt to read more bytes.
while (buffer.size() <= minCapacity && !eof) {
eof = inChannel.read(readBuffer) == -1;
readBuffer.flip();
buffer = buffer.concat(ByteString.copyFrom(readBuffer));
readBuffer.clear();
}
// Return true if we were able to honor the minimum buffer capacity request
return buffer.size() >= minCapacity;
}
}
}