blob: 6813c228904bab43a45b861703f65f4c15b355c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc.user;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLTimeoutException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class BlockingResultsListener implements UserResultsListener {
private static final Logger logger = LoggerFactory.getLogger(BlockingResultsListener.class);
private static final AtomicInteger NEXT_INSTANCE_ID = new AtomicInteger(1);
private final int instanceId;
private final int batchQueueThrottlingThreshold;
private volatile UserBitShared.QueryId queryId;
private int lastReceivedBatchNumber;
private int lastDequeuedBatchNumber;
private volatile UserException executionFailureException;
private volatile boolean completed;
/**
* Whether throttling of incoming data is active.
*/
private final AtomicBoolean throttled = new AtomicBoolean(false);
private volatile ConnectionThrottle throttle;
private volatile boolean closed;
private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
private final LinkedBlockingDeque<QueryDataBatch> batchQueue =
Queues.newLinkedBlockingDeque();
private final Supplier<Stopwatch> elapsedTimer;
private final Supplier<Long> timeoutInMilliseconds;
public BlockingResultsListener(Supplier<Stopwatch> elapsedTimer, Supplier<Long> timeoutInMilliseconds,
int batchQueueThrottlingThreshold) {
this.elapsedTimer = elapsedTimer;
this.timeoutInMilliseconds = timeoutInMilliseconds;
this.instanceId = NEXT_INSTANCE_ID.getAndIncrement();
this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
logger.debug("[#{}] Query listener created.", instanceId);
}
/**
* Starts throttling if not currently throttling.
*
* @param throttle the "throttlable" object to throttle
* @return true if actually started (wasn't throttling already)
*/
private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
final boolean started = throttled.compareAndSet(false, true);
if (started) {
this.throttle = throttle;
throttle.setAutoRead(false);
}
return started;
}
/**
* Stops throttling if currently throttling.
*
* @return true if actually stopped (was throttling)
*/
private boolean stopThrottlingIfSo() {
final boolean stopped = throttled.compareAndSet(true, false);
if (stopped) {
throttle.setAutoRead(true);
throttle = null;
}
return stopped;
}
public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
//Check if a non-zero timeout has been set
if (timeoutInMilliseconds.get() > 0) {
//Identifying remaining in milliseconds to maintain a granularity close to integer value of
// timeout
long timeToTimeout =
timeoutInMilliseconds.get() - elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS);
if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds.get()) + " seconds");
}
} else {
firstMessageReceived.await();
}
}
private void releaseIfFirst() {
firstMessageReceived.countDown();
}
@Override
public void queryIdArrived(UserBitShared.QueryId queryId) {
logger.debug("[#{}] Received query ID: {}.",
instanceId, QueryIdHelper.getQueryId(queryId));
this.queryId = queryId;
}
@Override
public void submissionFailed(UserException ex) {
logger.debug("Received query failure: {} {}", instanceId, ex);
this.executionFailureException = ex;
this.completed = true;
close();
logger.info("[#{}] Query failed: ", instanceId, ex);
}
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
lastReceivedBatchNumber++;
logger.debug("[#{}] Received query data batch #{}: {}.",
instanceId, lastReceivedBatchNumber, result);
// If we're in a closed state, just release the message.
if (closed) {
result.release();
completed = true;
return;
}
// We're active; let's add to the queue.
batchQueue.add(result);
// Throttle server if queue size has exceed threshold.
if (batchQueue.size() > batchQueueThrottlingThreshold) {
if (startThrottlingIfNot(throttle)) {
logger.debug("[#{}] Throttling started at queue size {}.",
instanceId, batchQueue.size());
}
}
releaseIfFirst();
}
@Override
public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
logger.debug("[#{}] Received query completion: {}.", instanceId, state);
releaseIfFirst();
completed = true;
}
public UserBitShared.QueryId getQueryId() {
return queryId;
}
/**
* Gets the next batch of query results from the queue.
*
* @return the next batch, or {@code null} after last batch has been returned
* @throws UserException if the query failed
* @throws InterruptedException if waiting on the queue was interrupted
*/
public QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
while (true) {
if (executionFailureException != null) {
logger.debug("[#{}] Dequeued query failure exception: {}.",
instanceId, executionFailureException);
throw executionFailureException;
}
if (completed && batchQueue.isEmpty()) {
return null;
} else {
QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
if (qdb != null) {
lastDequeuedBatchNumber++;
logger.debug("[#{}] Dequeued query data batch #{}: {}.",
instanceId, lastDequeuedBatchNumber, qdb);
// Unthrottle server if queue size has dropped enough below threshold:
if (batchQueue.size() < batchQueueThrottlingThreshold / 2
|| batchQueue.size() == 0 // (in case threshold < 2)
) {
if (stopThrottlingIfSo()) {
logger.debug("[#{}] Throttling stopped at queue size {}.",
instanceId, batchQueue.size());
}
}
return qdb;
}
// Check and throw SQLTimeoutException
if (timeoutInMilliseconds.get() > 0 && elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS) >= timeoutInMilliseconds.get()) {
throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds.get()) + " seconds");
}
}
}
}
public void close() {
logger.debug("[#{}] Query listener closing.", instanceId);
closed = true;
if (stopThrottlingIfSo()) {
logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
instanceId, batchQueue.size());
}
while (!batchQueue.isEmpty()) {
// Don't bother with query timeout, we're closing the cursor
QueryDataBatch qdb = batchQueue.poll();
if (qdb != null && qdb.getData() != null) {
qdb.getData().release();
}
}
// Close may be called before the first result is received and therefore
// when the main thread is blocked waiting for the result. In that case
// we want to unblock the main thread.
releaseIfFirst();
completed = true;
}
public boolean isCompleted() {
return completed;
}
}