blob: 461d427302172b755f1faf48f61aeb3d74530d5d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.user.UserClient.UserToBitConnection;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Encapsulates the future management of query submissions. This entails a
* potential race condition. Normal ordering is:
* <ul>
* <li>1. Submit query to be executed. </li>
* <li>2. Receive QueryHandle for buffer management. </li>
* <li>3. Start receiving results batches for query. </li>
* </ul>
* However, 3 could potentially occur before 2. Because of that, we need to
* handle this case and then do a switcheroo.
*/
public class QueryResultHandler {
private static final Logger logger =
LoggerFactory.getLogger(QueryResultHandler.class);
/**
* Current listener for results, for each active query.
* <p>
* Concurrency: Access by SubmissionLister for query-ID message vs.
* access by batchArrived is not otherwise synchronized.
* </p>
*/
private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
Maps.newConcurrentMap();
public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
return new SubmissionListener(resultsListener);
}
public RpcConnectionHandler<UserToBitConnection> getWrappedConnectionHandler(
RpcConnectionHandler<UserToBitConnection> handler) {
return new ChannelClosedHandler(handler);
}
/**
* Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
* handles data result messages
*/
public void resultArrived(ByteBuf pBody) throws RpcException {
QueryResult queryResult = RpcBus.get(pBody, QueryResult.PARSER);
QueryId queryId = queryResult.getQueryId();
QueryState queryState = queryResult.getQueryState();
if (logger.isDebugEnabled()) {
logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
QueryIdHelper.getQueryId(queryId));
}
assert queryResult.hasQueryState() : "received query result without QueryState";
boolean isFailureResult = QueryState.FAILED == queryState;
// CANCELED queries are handled the same way as COMPLETED
boolean isTerminalResult;
switch (queryState) {
case FAILED:
case CANCELED:
case COMPLETED:
isTerminalResult = true;
break;
default:
logger.error("Unexpected/unhandled QueryState " + queryState
+ " (for query " + queryId + ")");
isTerminalResult = false;
break;
}
assert isFailureResult || queryResult.getErrorCount() == 0
: "Error count for the query batch is non-zero but QueryState != FAILED";
UserResultsListener resultsListener = newUserResultsListener(queryId);
try {
if (isFailureResult) {
// Failure case--pass on via submissionFailed(...).
resultsListener.submissionFailed(new UserRemoteException(queryResult.getError(0)));
// Note: Listener is removed in finally below.
} else if (isTerminalResult) {
// A successful completion/canceled case--pass on via resultArrived
try {
resultsListener.queryCompleted(queryState);
} catch (Exception e) {
resultsListener.submissionFailed(UserException.systemError(e).build(logger));
}
} else {
logger.warn("queryState {} was ignored", queryState);
}
} finally {
if (isTerminalResult) {
// TODO: What exactly are we checking for? How should we really check
// for it?
if ((! (resultsListener instanceof BufferingResultsListener)
|| ((BufferingResultsListener) resultsListener).output != null)) {
queryIdToResultsListenersMap.remove(queryId, resultsListener);
}
}
}
}
/**
* Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
* handles query data messages
*/
public void batchArrived(ConnectionThrottle throttle,
ByteBuf pBody, ByteBuf dBody) throws RpcException {
QueryData queryData = RpcBus.get(pBody, QueryData.PARSER);
// Current batch coming in.
DrillBuf drillBuf = (DrillBuf) dBody;
QueryDataBatch batch = new QueryDataBatch(queryData, drillBuf);
QueryId queryId = queryData.getQueryId();
if (logger.isDebugEnabled()) {
logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
}
logger.trace("batchArrived: batch = {}", batch);
UserResultsListener resultsListener = newUserResultsListener(queryId);
// A data case--pass on via dataArrived
try {
resultsListener.dataArrived(batch, throttle);
// That releases batch if successful.
} catch (Exception e) {
try {
batch.release();
} catch (IllegalStateException e2) {
// Ignore, released twice
}
resultsListener.submissionFailed(UserException.systemError(e).build(logger));
}
}
/**
* Return {@link UserResultsListener} associated with queryId. Will create a
* new {@link BufferingResultsListener} if no listener found.
*
* @param queryId
* queryId we are getting the listener for
* @return {@link UserResultsListener} associated with queryId
*/
private UserResultsListener newUserResultsListener(QueryId queryId) {
UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId);
logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener);
if (null == resultsListener) {
// WHO?? didn't get query ID response and set submission listener yet,
// so install a buffering listener for now
BufferingResultsListener bl = new BufferingResultsListener();
resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl);
// If we had a successful insertion, use that reference. Otherwise, just
// throw away the new buffering listener.
if (null == resultsListener) {
resultsListener = bl;
}
}
return resultsListener;
}
private static class BufferingResultsListener implements UserResultsListener {
private final ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
private volatile UserException ex;
private volatile QueryState queryState;
private volatile UserResultsListener output;
private volatile ConnectionThrottle throttle;
public boolean transferTo(UserResultsListener l) {
synchronized (this) {
output = l;
for (QueryDataBatch r : results) {
l.dataArrived(r, throttle);
}
if (ex != null) {
l.submissionFailed(ex);
return true;
} else if (queryState != null) {
l.queryCompleted(queryState);
return true;
}
return false;
}
}
@Override
public void queryCompleted(QueryState state) {
assert queryState == null;
this.queryState = state;
synchronized (this) {
if (output != null) {
output.queryCompleted(state);
}
}
}
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
this.throttle = throttle;
synchronized (this) {
if (output == null) {
this.results.add(result);
} else {
output.dataArrived(result, throttle);
}
}
}
@Override
public void submissionFailed(UserException ex) {
assert queryState == null;
// there is one case when submissionFailed() is called even though the
// query didn't fail on the server side
// it happens when UserResultsListener.batchArrived() throws an exception
// that will be passed to
// submissionFailed() by QueryResultHandler.dataArrived()
queryState = QueryState.FAILED;
synchronized (this) {
if (output == null) {
this.ex = ex;
} else{
output.submissionFailed(ex);
}
}
}
@Override
public void queryIdArrived(QueryId queryId) { }
}
private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
private final UserResultsListener resultsListener;
private final AtomicBoolean isTerminal = new AtomicBoolean(false);
public SubmissionListener(UserResultsListener resultsListener) {
this.resultsListener = resultsListener;
}
@Override
public void failed(RpcException ex) {
if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
}
// Although query submission failed, results might have arrived for this query.
// However, the results could not be transferred to this resultListener because
// there is no query id mapped to this resultListener. Look out for the warning
// message from ChannelClosedHandler in the client logs.
// TODO(DRILL-4586)
resultsListener.submissionFailed(UserException.systemError(ex)
.addContext("Query submission to Drillbit failed.")
.build(logger));
}
@Override
public void success(QueryId queryId, ByteBuf buf) {
if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
}
resultsListener.queryIdArrived(queryId);
if (logger.isDebugEnabled()) {
logger.debug("Received QueryId {} successfully. Adding results listener {}.",
QueryIdHelper.getQueryId(queryId), resultsListener);
}
UserResultsListener oldListener =
queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);
// We need to deal with the situation where we already received results by
// the time we got the query id back. In that case, we'll need to
// transfer the buffering listener over, grabbing a lock against reception
// of additional results during the transition.
if (oldListener != null) {
logger.debug("Unable to place user results listener, buffering listener was already in place.");
if (oldListener instanceof BufferingResultsListener) {
boolean all = ((BufferingResultsListener) oldListener).transferTo(this.resultsListener);
// simply remove the buffering listener if we already have the last response.
if (all) {
queryIdToResultsListenersMap.remove(queryId);
} else {
boolean replaced = queryIdToResultsListenersMap.replace(queryId, oldListener, resultsListener);
if (!replaced) {
throw new IllegalStateException(); // TODO: Say what the problem is!
}
}
} else {
throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
}
}
}
@Override
public void interrupted(InterruptedException ex) {
if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
}
// TODO(DRILL-4586)
resultsListener.submissionFailed(UserException.systemError(ex)
.addContext("The client had been asked to wait as the Drillbit is potentially being over-utilized." +
" But the client was interrupted while waiting.")
.build(logger));
}
}
/**
* When a {@link UserToBitConnection connection} to a server is successfully
* created, this handler adds a listener to that connection that listens to
* connection closure. If the connection is closed, all active
* {@link UserResultsListener result listeners} are failed.
*/
private class ChannelClosedHandler implements RpcConnectionHandler<UserToBitConnection> {
private final RpcConnectionHandler<UserToBitConnection> parentHandler;
public ChannelClosedHandler(RpcConnectionHandler<UserToBitConnection> parentHandler) {
this.parentHandler = parentHandler;
}
@Override
public void connectionSucceeded(UserToBitConnection connection) {
connection.getChannel().closeFuture().addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
for (UserResultsListener listener : queryIdToResultsListenersMap.values()) {
listener.submissionFailed(UserException.connectionError()
.message("Connection %s closed unexpectedly. Drillbit down?",
connection.getName())
.build(logger));
if (listener instanceof BufferingResultsListener) {
// the appropriate listener will be failed by SubmissionListener#failed
logger.warn("Buffering listener failed before results were transferred to the actual listener.");
}
}
}
});
parentHandler.connectionSucceeded(connection);
}
@Override
public void connectionFailed(FailureType type, Throwable t) {
parentHandler.connectionFailed(type, t);
}
}
}