blob: db25fd480c223a96e9fcdf0c38793cda2a096d30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.work.batch;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RawFragmentBatch;
public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
private enum BufferState {
INIT,
STREAMS_FINISHED,
KILLED
}
protected interface BufferQueue<T> {
void addOomBatch(RawFragmentBatch batch);
RawFragmentBatch poll() throws IOException, InterruptedException;
RawFragmentBatch take() throws IOException, InterruptedException;
RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException;
boolean checkForOutOfMemory();
int size();
boolean isEmpty();
void add(T obj);
}
protected BufferQueue<T> bufferQueue;
private volatile BufferState state = BufferState.INIT;
protected final int bufferSizePerSocket;
protected final AtomicBoolean outOfMemory = new AtomicBoolean(false);
private int streamCounter;
private final int fragmentCount;
protected final FragmentContext context;
public BaseRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
this.fragmentCount = fragmentCount;
this.streamCounter = fragmentCount;
this.context = context;
}
/**
* Return the fragment count from construction time.
*
* @return the fragment count
*/
protected int getFragmentCount() {
return fragmentCount;
}
@Override
public synchronized void enqueue(final RawFragmentBatch batch) throws IOException {
// if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
// ensure that tests run.
if (context != null && !context.getExecutorState().shouldContinue()) {
this.kill(context);
}
if (isTerminated()) {
if (state == BufferState.KILLED) {
// do not even enqueue just release and send ack back
batch.release();
batch.sendOk();
return;
} else {
throw new IOException("Attempted to enqueue batch after finished");
}
}
enqueueInner(batch);
}
/**
* implementation specific method to enqueue batch
*
* @param batch
* @throws IOException
*/
protected abstract void enqueueInner(final RawFragmentBatch batch) throws IOException;
// ## Add assertion that all acks have been sent. TODO
@Override
public void close() {
if (!isTerminated() && context.getExecutorState().shouldContinue()) {
final String msg = String.format("Cleanup before finished. %d out of %d streams have finished", completedStreams(), fragmentCount);
throw new IllegalStateException(msg);
}
if (!bufferQueue.isEmpty()) {
if (context.getExecutorState().shouldContinue()) {
context.getExecutorState().fail(new IllegalStateException("Batches still in queue during cleanup"));
logger.error("{} Batches in queue.", bufferQueue.size());
}
clearBufferWithBody();
}
}
@Override
public synchronized void kill(final FragmentContext context) {
state = BufferState.KILLED;
clearBufferWithBody();
}
/**
* Helper method to clear buffer with request bodies release also flushes ack queue - in case there are still
* responses pending
*/
private void clearBufferWithBody() {
RawFragmentBatch batch;
while (!bufferQueue.isEmpty()) {
batch = null;
try {
batch = bufferQueue.poll();
assertAckSent(batch);
} catch (IOException e) {
context.getExecutorState().fail(e);
continue;
} catch (InterruptedException e) {
context.getExecutorState().fail(e);
// keep the state that the thread is interrupted
Thread.currentThread().interrupt();
continue;
} finally {
if (batch != null && batch.getBody() != null) {
batch.getBody().release();
}
}
}
}
private void allStreamsFinished() {
if (state != BufferState.KILLED) {
state = BufferState.STREAMS_FINISHED;
}
if (!bufferQueue.isEmpty()) {
throw new IllegalStateException("buffer not empty when finished");
}
}
@Override
public RawFragmentBatch getNext() throws IOException {
if (outOfMemory.get()) {
if (bufferQueue.size() < 10) {
outOfMemory.set(false);
}
}
RawFragmentBatch b;
try {
b = bufferQueue.poll();
// if we didn't get a batch, block on waiting for queue.
if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
// We shouldn't block infinitely here. There can be a condition such that due to a failure FragmentExecutor
// state is changed to FAILED and queue is empty. Because of this the minor fragment main thread will block
// here waiting for next batch to arrive. Meanwhile when next batch arrived and was enqueued it sees
// FragmentExecutor failure state and doesn't enqueue the batch and cleans up the buffer queue. Hence this
// thread will stuck forever. So we pool for 5 seconds until we get a batch or FragmentExecutor state is in
// error condition.
while (b == null) {
b = bufferQueue.poll(5, TimeUnit.SECONDS);
if (!context.getExecutorState().shouldContinue()) {
kill(context);
if (b != null) {
assertAckSent(b);
if (b.getBody() != null) {
b.getBody().release();
}
b = null;
}
} // else b will be assigned a valid batch
}
}
} catch (final InterruptedException e) {
// We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
if (!context.getExecutorState().shouldContinue()) {
kill(context);
} else {
throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e);
}
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
return null;
}
if (context.getAllocator().isOverLimit()) {
outOfMemory.set(true);
}
if (b != null) {
upkeep(b);
if (b.getHeader().getIsLastBatch()) {
logger.debug("Got last batch from {}:{}", b.getHeader().getSendingMajorFragmentId(), b.getHeader()
.getSendingMinorFragmentId());
final int remainingStreams = decrementStreamCounter();
if (remainingStreams == 0) {
logger.debug("Streams finished");
allStreamsFinished();
}
}
} else {
if (!bufferQueue.isEmpty()) {
throw new IllegalStateException("Returning null when there are batches left in queue");
}
if (!isTerminated()) {
throw new IllegalStateException("Returning null when not finished");
}
}
assertAckSent(b);
return b;
}
private void assertAckSent(RawFragmentBatch batch) {
assert batch == null || batch.isAckSent() : "Ack not sent for batch";
}
private int decrementStreamCounter() {
streamCounter--;
return streamCounter;
}
private int completedStreams() {
return fragmentCount - streamCounter;
}
/**
* Handle miscellaneous tasks after batch retrieval
*/
protected abstract void upkeep(RawFragmentBatch batch);
protected boolean isTerminated() {
return (state == BufferState.KILLED || state == BufferState.STREAMS_FINISHED);
}
}