blob: 65f90db8d17d4e29476eeb3a0107ef1c0f7aac89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.utils.streaming;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.jetbrains.annotations.NotNull;
/**
* The InputStream into the CompactionIterator needs to be a blocking {@link java.io.InputStream},
* but we don't want to block on network calls, or buffer too much data in memory otherwise we will hit OOMs for large Data.db files.
* <p>
* This helper class uses the {@link SSTableSource} implementation provided to asynchronously read
* the T bytes on-demand, managing flow control if not ready for more bytes and buffering enough without reading entirely into memory.
* <p>
* The generic {@link SSTableSource} allows users to pass in their own implementations to read from any source.
* <p>
* This enables the Bulk Reader library to scale to read many SSTables without OOMing, and controls the flow by
* buffering more bytes on-demand as the data is drained.
* <p>
* This class expects the consumer thread to be single-threaded, and the producer thread to be single-threaded OR serialized to ensure ordering of events.
*
* @param <T> SSTable
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public class SSTableInputStream<T extends SSTable> extends InputStream implements StreamConsumer
{
private static final StreamBuffer.ByteArrayWrapper END_MARKER = StreamBuffer.wrap(new byte[0]);
private static final StreamBuffer.ByteArrayWrapper FINISHED_MARKER = StreamBuffer.wrap(new byte[0]);
private static final StreamBuffer.ByteArrayWrapper ERROR_MARKER = StreamBuffer.wrap(new byte[0]);
private enum StreamState
{
Init,
Reading,
NextBuffer,
End,
Closed
}
private final BlockingQueue<StreamBuffer> queue;
private final SSTableSource<T> source;
private final Stats stats;
private final long startTimeNanos;
// Variables accessed by both producer, consumer & timeout thread so must be volatile or atomic
private volatile Throwable throwable = null;
private volatile boolean activeRequest = false;
private volatile boolean closed = false;
private final AtomicLong bytesWritten = new AtomicLong(0L);
// Variables only used by the InputStream consumer thread so do not need to be volatile or atomic
private long rangeStart = 0L;
private long bytesRead = 0L;
private long timeBlockedNanos = 0L;
private boolean skipping = false;
private StreamState state = StreamState.Init;
private StreamBuffer currentBuffer = null;
private int position;
private int length;
/**
* @param source SSTableSource to async provide the bytes after {@link SSTableSource#request(long, long, StreamConsumer)} is called
*
* @param stats {@link Stats} implementation for recording instrumentation
*/
public SSTableInputStream(SSTableSource<T> source, Stats stats)
{
this.source = source;
this.queue = new LinkedBlockingQueue<>();
this.startTimeNanos = System.nanoTime();
this.stats = stats;
}
public long startTimeNanos()
{
return startTimeNanos;
}
public long timeBlockedNanos()
{
return timeBlockedNanos;
}
public long bytesWritten()
{
return bytesWritten.get();
}
public long bytesRead()
{
return bytesRead;
}
public long bytesBuffered()
{
return bytesWritten() - bytesRead();
}
public boolean isFinished()
{
return bytesWritten() >= source.size();
}
private boolean isClosed()
{
return state == StreamState.Closed;
}
/**
* Can request more bytes if:
* 1. a request not already in-flight
* 2. not in the middle of skip method call
* 3. the queue buffer is not full i.e. bytes in memory not greater than or equal to maxBufferSize
* 4. the InputStream is not closed
*
* @return true if can request more bytes
*/
private boolean canRequestMore()
{
return !(activeRequest || skipping || isBufferFull() || isClosed());
}
/**
* Maybe request more bytes if possible
*/
private void maybeRequestMore()
{
if (canRequestMore())
{
requestMore();
}
}
/**
* Request more bytes using {@link SSTableSource#request(long, long, StreamConsumer)} for the next range
*/
private void requestMore()
{
if (rangeStart >= source.size())
{
if (isFinished())
{
// If user skips to end of stream we still need to complete,
// otherwise read() blocks waiting for FINISHED_MARKER
queue.add(FINISHED_MARKER);
}
return; // Finished
}
long chunkSize = rangeStart == 0 ? source.headerChunkSize() : source.chunkBufferSize();
long rangeEnd = Math.min(source.size(), rangeStart + chunkSize);
if (rangeEnd >= rangeStart)
{
activeRequest = true;
source.request(rangeStart, rangeEnd, this);
rangeStart += chunkSize + 1; // Increment range start pointer for next request
}
else
{
throw new IllegalStateException(String.format("Tried to request invalid range start=%d end=%d",
rangeStart, rangeEnd));
}
}
/**
* The number of bytes buffered is greater than or equal to {@link SSTableSource#maxBufferSize()}
* so wait for queue to drain before requesting more
*
* @return true if queue is full
*/
public boolean isBufferFull()
{
return bytesBuffered() >= source.maxBufferSize();
}
// Timeout
/**
* @param timeout duration timeout
* @param nowNanos current time now in nanoseconds
* @param lastActivityNanos last activity time in nanoseconds
* @return the timeout remaining in nanoseconds, or less than or equal to 0 if timeout already expired
*/
public static long timeoutLeftNanos(Duration timeout, long nowNanos, long lastActivityNanos)
{
return Math.min(timeout.toNanos(), timeout.toNanos() - (nowNanos - lastActivityNanos));
}
private Throwable timeoutException(Duration timeout)
{
return new TimeoutException(String.format("No activity on SSTableInputStream for %d seconds",
timeout.getSeconds()));
}
private void timeoutError(Duration timeout)
{
onError(timeoutException(timeout));
}
/**
* {@link StreamConsumer} method implementations
*/
@Override
public void onRead(StreamBuffer buffer)
{
int length = buffer.readableBytes();
if (length <= 0 || closed)
{
return;
}
bytesWritten.addAndGet(length);
queue.add(buffer);
stats.inputStreamBytesWritten(source, length);
}
@Override
public void onEnd()
{
activeRequest = false;
if (isFinished())
{
queue.add(FINISHED_MARKER);
}
else
{
queue.add(END_MARKER);
}
}
@Override
public void onError(@NotNull Throwable throwable)
{
this.throwable = ThrowableUtils.rootCause(throwable);
activeRequest = false;
queue.add(ERROR_MARKER);
stats.inputStreamFailure(source, throwable);
}
/**
* {@link java.io.InputStream} method implementations
*/
@Override
public int available()
{
return Math.toIntExact(bytesBuffered());
}
@Override
public boolean markSupported()
{
return false;
}
/**
* If the schema contains large blobs that can be filtered, then we can more efficiently
* skip bytes by incrementing the startRange and avoid wastefully streaming bytes across the network
*
* @param count number of bytes
* @return number of bytes actually skipped
* @throws IOException IOException
*/
@Override
public long skip(long count) throws IOException
{
if (count <= 0)
{
return 0;
}
else if (count <= bytesBuffered())
{
long actual = super.skip(count);
stats.inputStreamBytesSkipped(source, actual, 0);
return actual;
}
skipping = true;
long remaining = count;
while (activeRequest || !queue.isEmpty())
{
// Drain any buffered bytes and block until active request completes and the queue is empty
remaining -= super.skip(remaining);
if (remaining <= 0)
{
break;
}
}
// Increment range start pointer to efficiently skip without reading bytes across the network unnecessarily
if (remaining > 0)
{
rangeStart += remaining;
bytesWritten.addAndGet(remaining);
bytesRead += remaining;
}
// Remove skip marker and resume requesting bytes
skipping = false;
switch (state)
{
case Reading:
case NextBuffer:
// Stream is active so request more bytes if queue is not full
maybeRequestMore();
break;
default:
// If skip() is called before calling read() the Stream will be in StreamState.Init,
// in this case we need to initialize the stream before request more bytes
checkState();
}
stats.inputStreamBytesSkipped(source, count - remaining, remaining);
return count;
}
/**
* Allows directly reading into ByteBuffer without intermediate copy
*
* @param buffer the ByteBuffer
* @throws EOFException if attempts to read beyond the end of the file
* @throws IOException for failure during I/O
*/
public void read(ByteBuffer buffer) throws IOException
{
for (int remainingLength = buffer.remaining(); 0 < remainingLength; remainingLength = buffer.remaining())
{
if (checkState() < 0)
{
throw new EOFException();
}
int readLength = Math.min(length - position, remainingLength);
if (0 < readLength)
{
currentBuffer.getBytes(position, buffer, readLength);
position += readLength;
bytesRead += readLength;
}
maybeReleaseBuffer();
}
}
// Copied from JDK11 jdk.internal.util.Preconditions.checkFromIndexSize()
private static <X extends RuntimeException> void checkFromIndexSize(int fromIndex, int size, int length)
{
if ((length | fromIndex | size) < 0 || size > length - fromIndex)
{
throw new IndexOutOfBoundsException(String.format("Index out of bounds fromIndex=%d, size=%d, length=%d",
fromIndex, size, length));
}
}
@Override
public int read(byte[] buffer, int offset, int length) throws IOException
{
SSTableInputStream.checkFromIndexSize(offset, length, buffer.length);
if (length == 0)
{
return 0;
}
if (checkState() < 0)
{
return -1;
}
int readLength = Math.min(this.length - position, length);
if (readLength > 0)
{
currentBuffer.getBytes(position, buffer, offset, readLength);
position += readLength;
bytesRead += readLength;
}
maybeReleaseBuffer();
return readLength;
}
@Override
public int read() throws IOException
{
do
{
if (checkState() < 0)
{
return -1;
}
if (currentBuffer.readableBytes() == 0)
{
// Current buffer might be empty, normally if it is a marker buffer e.g. END_MARKER
maybeReleaseBuffer();
}
} while (currentBuffer == null);
// Convert to unsigned byte
int unsigned = currentBuffer.getByte(position++) & 0xFF;
bytesRead++;
maybeReleaseBuffer();
return unsigned;
}
@Override
public void close()
{
if (state == StreamState.Closed)
{
return;
}
else if (state != StreamState.End)
{
end();
}
state = StreamState.Closed;
closed = true;
releaseBuffer();
queue.clear();
}
@Override
public void reset() throws IOException
{
throw new IOException("reset not supported");
}
// Internal Methods for java.io.InputStream
/**
* If position >= length, we have finished with this {@link SSTableInputStream#currentBuffer} so release and
* move to the State {@link StreamState#NextBuffer} so next buffer is popped from the {@link LinkedBlockingQueue}
* when {@link InputStream#read()} or {@link InputStream#read(byte[], int, int)} is next called
*/
private void maybeReleaseBuffer()
{
maybeRequestMore();
if (position < length)
{
// Still bytes remaining in the currentBuffer so keep reading
return;
}
releaseBuffer();
state = StreamState.NextBuffer;
stats.inputStreamByteRead(source, position, queue.size(), (int) (position * 100.0 / (double) source.size()));
}
/**
* Release current buffer
*/
private void releaseBuffer()
{
if (currentBuffer != null)
{
currentBuffer.release();
currentBuffer = null;
}
}
/**
* Pop next buffer from the queue, block on {@link LinkedBlockingQueue} until bytes are available
*
* @throws IOException exception on error
*/
private void nextBuffer() throws IOException
{
long startNanos = System.nanoTime();
try
{
// Block on queue until next buffer available
Duration timeout = source.timeout();
if (timeout != null && timeout.getSeconds() > 0)
{
currentBuffer = queue.poll(timeout.getSeconds(), TimeUnit.SECONDS);
if (currentBuffer == null)
{
throw new IOException(timeoutException(timeout));
}
}
else
{
currentBuffer = queue.take();
}
}
catch (InterruptedException exception)
{
Thread.currentThread().interrupt();
throw new RuntimeException(exception);
}
long nanosBlocked = System.nanoTime() - startNanos;
timeBlockedNanos += nanosBlocked; // Measure time spent blocking for monitoring
stats.inputStreamTimeBlocked(source, nanosBlocked);
length = currentBuffer.readableBytes();
state = StreamState.Reading;
position = 0;
if (currentBuffer == null)
{
throw new IOException("Obtained a null buffer from the queue");
}
}
/**
* When reading from the InputStream first check the state, if stream is already closed or we need to
* pop off the next buffer from the {@link LinkedBlockingQueue}
*
* @return -1 if we have reached the end of the InputStream or 0 if still open
* @throws IOException throw IOException if stream is already closed
*/
private int checkState() throws IOException
{
switch (state)
{
case Closed:
throw new IOException("Stream is closed");
case End:
return -1;
case Init:
// First request: start requesting bytes & schedule timeout
requestMore();
state = StreamState.NextBuffer;
case NextBuffer:
nextBuffer();
if (currentBuffer == END_MARKER)
{
return handleEndMarker();
}
else if (currentBuffer == FINISHED_MARKER)
{
return handleFinishedMarker();
}
else if (currentBuffer == ERROR_MARKER)
{
throw new IOException(throwable);
}
default:
// Do nothing
}
return 0;
}
/**
* Handle finished marker returned in the queue, indicating all bytes from source have been requested
* and input stream can close
*
* @return always return -1 as stream is closed
*/
private int handleFinishedMarker()
{
releaseBuffer();
end();
stats.inputStreamEndBuffer(source);
return -1;
}
/**
* Handle end marker returned in the queue, indicating previous request has finished
*
* @return -1 if we have reached the end of the InputStream or 0 if still open
* @throws IOException throw IOException if stream is already closed
*/
private int handleEndMarker() throws IOException
{
if (skipping)
{
return -1;
}
releaseBuffer();
maybeRequestMore();
state = StreamState.NextBuffer;
return checkState();
}
/**
* Reached the end of the InputStream and all bytes have been read
*/
private void end()
{
state = StreamState.End;
stats.inputStreamEnd(source, System.nanoTime() - startTimeNanos, timeBlockedNanos);
}
}