blob: 284911baed229bd9c22fe5d2e9816483f89de3c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.jdbc.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.rpc.user.BlockingResultsListener;
import org.apache.drill.jdbc.DrillStatement;
import com.google.common.base.Stopwatch;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.Meta.Signature;
import org.apache.calcite.avatica.util.ArrayImpl.Factory;
import org.apache.calcite.avatica.util.Cursor;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
import org.apache.drill.jdbc.SchemaChangeListener;
import org.slf4j.Logger;
public class DrillCursor implements Cursor {
private static final Logger logger = getLogger(DrillCursor.class);
/** JDBC-specified string for unknown catalog, schema, and table names. */
private static final String UNKNOWN_NAME_STRING = "";
private final DrillConnectionImpl connection;
private final AvaticaStatement statement;
private final Meta.Signature signature;
/** Holds current batch of records (none before first load). */
private final RecordBatchLoader currentBatchHolder;
private final BlockingResultsListener resultsListener;
private SchemaChangeListener changeListener;
private final DrillAccessorList accessors = new DrillAccessorList();
/** Schema of current batch (null before first load). */
private BatchSchema schema;
/** ... corresponds to current schema. */
private DrillColumnMetaDataList columnMetaDataList;
/** Whether loadInitialSchema() has been called. */
private boolean initialSchemaLoaded;
/** Whether after first batch. (Re skipping spurious empty batches.) */
private boolean afterFirstBatch;
/**
* Whether the next call to {@code this.}{@link #next()} should just return
* {@code true} rather than calling {@link #nextRowInternally()} to try to
* advance to the next record.
* <p>
* Currently, can be true only for first call to {@link #next()}.
* </p>
* <p>
* (Relates to {@link #loadInitialSchema()}'s calling
* {@link #nextRowInternally()} one "extra" time (extra relative to number
* of {@link java.sql.ResultSet#next()} calls) at the beginning to get first batch
* and schema before {@code Statement.execute...(...)} even returns.)
* </p>
*/
private boolean returnTrueForNextCallToNext;
/** Whether cursor is after the end of the sequence of records/rows. */
private boolean afterLastRow;
private int currentRowNumber = -1;
/** Zero-based offset of current record in record batch.
* (Not <i>row</i> number.) */
private int currentRecordNumber = -1;
//Track timeout period
private long timeoutInMilliseconds;
private Stopwatch elapsedTimer;
/**
*
* @param statement
* @param signature
* @throws SQLException
*/
DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, Signature signature) throws SQLException {
this.connection = connection;
this.statement = statement;
this.signature = signature;
DrillClient client = connection.getClient();
final int batchQueueThrottlingThreshold =
client.getConfig().getInt(
ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
resultsListener = new BlockingResultsListener(this::getElapsedTimer,
this::getTimeoutInMilliseconds,
batchQueueThrottlingThreshold);
currentBatchHolder = new RecordBatchLoader(client.getAllocator());
// Set Query Timeout
logger.debug("Setting timeout as {}", this.statement.getQueryTimeout());
setTimeout(this.statement.getQueryTimeout());
}
protected int getCurrentRecordNumber() {
return currentRecordNumber;
}
public String getQueryId() {
if (resultsListener.getQueryId() != null) {
return QueryIdHelper.getQueryId(resultsListener.getQueryId());
} else {
return null;
}
}
public boolean isBeforeFirst() {
return currentRowNumber < 0;
}
public boolean isAfterLast() {
return afterLastRow;
}
// (Overly restrictive Avatica uses List<Accessor> instead of List<? extends
// Accessor>, so accessors/DrillAccessorList can't be of type
// List<AvaticaDrillSqlAccessor>, and we have to cast from Accessor to
// AvaticaDrillSqlAccessor in updateColumns().)
@Override
public List<Accessor> createAccessors(List<ColumnMetaData> types,
Calendar localCalendar, Factory factory) {
columnMetaDataList = (DrillColumnMetaDataList) types;
return accessors;
}
synchronized void cleanup() {
if (resultsListener.getQueryId() != null && ! resultsListener.isCompleted()) {
connection.getClient().cancelQuery(resultsListener.getQueryId());
}
resultsListener.close();
currentBatchHolder.clear();
}
long getTimeoutInMilliseconds() {
return timeoutInMilliseconds;
}
//Set the cursor's timeout in seconds
void setTimeout(int timeoutDurationInSeconds) {
this.timeoutInMilliseconds = TimeUnit.SECONDS.toMillis(timeoutDurationInSeconds);
//Starting the timer, since this is invoked via the ResultSet.execute() call
if (timeoutInMilliseconds > 0) {
elapsedTimer = Stopwatch.createStarted();
}
}
/**
* Updates column accessors and metadata from current record batch.
*/
private void updateColumns() {
// First update accessors and schema from batch:
accessors.generateAccessors(this, currentBatchHolder);
// Extract Java types from accessors for metadata's getColumnClassName:
final List<Class<?>> getObjectClasses = new ArrayList<>();
// (Can't use modern for loop because, for some incompletely clear reason,
// DrillAccessorList blocks iterator() (throwing exception).)
for (int ax = 0; ax < accessors.size(); ax++) {
final AvaticaDrillSqlAccessor accessor =
accessors.get(ax);
getObjectClasses.add(accessor.getObjectClass());
}
// Update metadata for result set.
columnMetaDataList.updateColumnMetaData(
InfoSchemaConstants.IS_CATALOG_NAME,
UNKNOWN_NAME_STRING, // schema name
UNKNOWN_NAME_STRING, // table name
schema,
getObjectClasses);
if (changeListener != null) {
changeListener.schemaChanged(schema);
}
}
/**
* ...
* <p>
* Is to be called (once) from {@link #loadInitialSchema} for
* {@link DrillResultSetImpl#execute()}, and then (repeatedly) from
* {@link #next()} for {@link org.apache.calcite.avatica.AvaticaResultSet#next()}.
* </p>
*
* @return whether cursor is positioned at a row (false when after end of
* results)
*/
private boolean nextRowInternally() throws SQLException {
if (currentRecordNumber + 1 < currentBatchHolder.getRecordCount()) {
// Have next row in current batch--just advance index and report "at a row."
currentRecordNumber++;
return true;
} else {
// No (more) records in any current batch--try to get first or next batch.
// (First call always takes this branch.)
try {
QueryDataBatch qrb = resultsListener.getNext();
// (Apparently:) Skip any spurious empty batches (batches that have
// zero rows and null data, other than the first batch (which carries
// the (initial) schema but no rows)).
if (afterFirstBatch) {
while (qrb != null
&& (qrb.getHeader().getRowCount() == 0 && qrb.getData() == null)) {
// Empty message--dispose of and try to get another.
logger.warn("Spurious batch read: {}", qrb);
qrb.release();
qrb = resultsListener.getNext();
}
}
afterFirstBatch = true;
if (qrb == null) {
// End of batches--clean up, set state to done, report after last row.
currentBatchHolder.clear(); // (We load it so we clear it.)
afterLastRow = true;
return false;
} else {
// Got next (or first) batch--reset record offset to beginning;
// assimilate schema if changed; set up return value for first call
// to next().
currentRecordNumber = 0;
if (qrb.getHeader().hasAffectedRowsCount()) {
int updateCount = qrb.getHeader().getAffectedRowsCount();
int currentUpdateCount = statement.getUpdateCount() == -1 ? 0 : statement.getUpdateCount();
((DrillStatement) statement).setUpdateCount(updateCount + currentUpdateCount);
((DrillStatement) statement).setResultSet(null);
}
final boolean schemaChanged;
try {
schemaChanged = currentBatchHolder.load(qrb.getHeader().getDef(), qrb.getData());
}
finally {
qrb.release();
}
schema = currentBatchHolder.getSchema();
if (schemaChanged) {
updateColumns();
}
if (returnTrueForNextCallToNext
&& currentBatchHolder.getRecordCount() == 0) {
returnTrueForNextCallToNext = false;
}
return true;
}
}
catch (UserException e) {
// A normally expected case--for any server-side error (e.g., syntax
// error in SQL statement).
// Construct SQLException with message text from the UserException.
// TODO: Map UserException error type to SQLException subclass (once
// error type is accessible, of course. :-()
throw new SQLException(e.getMessage(), e);
}
catch (InterruptedException e) {
// Not normally expected--Drill doesn't interrupt in this area (right?)--
// but JDBC client certainly could.
throw new SQLException("Interrupted.", e);
}
catch (RuntimeException e) {
throw new SQLException("Unexpected RuntimeException: " + e.toString(), e);
}
}
}
/**
* Advances to first batch to load schema data into result set metadata.
* <p>
* To be called once from {@link DrillResultSetImpl#execute()} before
* {@link #next()} is called from {@link org.apache.calcite.avatica.AvaticaResultSet#next()}.
* <p>
*/
void loadInitialSchema() throws SQLException {
if (initialSchemaLoaded) {
throw new IllegalStateException(
"loadInitialSchema() called a second time");
}
assert ! afterLastRow : "afterLastRow already true in loadInitialSchema()";
assert ! afterFirstBatch : "afterLastRow already true in loadInitialSchema()";
assert -1 == currentRecordNumber
: "currentRecordNumber not -1 (is " + currentRecordNumber
+ ") in loadInitialSchema()";
assert 0 == currentBatchHolder.getRecordCount()
: "currentBatchHolder.getRecordCount() not 0 (is "
+ currentBatchHolder.getRecordCount() + " in loadInitialSchema()";
final PreparedStatement preparedStatement;
if (statement instanceof DrillPreparedStatementImpl) {
DrillPreparedStatementImpl drillPreparedStatement = (DrillPreparedStatementImpl) statement;
preparedStatement = drillPreparedStatement.getPreparedStatementHandle();
} else {
preparedStatement = null;
}
if (preparedStatement != null) {
connection.getClient().executePreparedStatement(preparedStatement.getServerHandle(), resultsListener);
}
else {
connection.getClient().runQuery(QueryType.SQL, signature.sql, resultsListener);
}
try {
resultsListener.awaitFirstMessage();
} catch (InterruptedException e) {
// Preserve evidence that the interruption occurred so that code higher up
// on the call stack can learn of the interruption and respond to it if it
// wants to.
Thread.currentThread().interrupt();
// Not normally expected--Drill doesn't interrupt in this area (right?)--
// but JDBC client certainly could.
throw new SQLException("Interrupted", e);
}
returnTrueForNextCallToNext = true;
nextRowInternally();
initialSchemaLoaded = true;
}
/**
* Advances this cursor to the next row, if any, or to after the sequence of
* rows if no next row.
*
* @return whether cursor is positioned at a row (false when after end of
* results)
*/
@Override
public boolean next() throws SQLException {
if (! initialSchemaLoaded) {
throw new IllegalStateException(
"next() called but loadInitialSchema() was not called");
}
assert afterFirstBatch : "afterFirstBatch still false in next()";
if (afterLastRow) {
// We're already after end of rows/records--just report that after end.
return false;
}
else if (returnTrueForNextCallToNext) {
++currentRowNumber;
// We have a deferred "not after end" to report--reset and report that.
returnTrueForNextCallToNext = false;
return true;
}
else {
accessors.clearLastColumnIndexedInRow();
boolean res = nextRowInternally();
if (res) { ++ currentRowNumber; }
return res;
}
}
public void cancel() {
close();
}
@Override
public void close() {
// Clean up result set (to deallocate any buffers).
cleanup();
// TODO: CHECK: Something might need to set statement.openResultSet to
// null. Also, AvaticaResultSet.close() doesn't check whether already
// closed and skip calls to cursor.close(), statement.onResultSetClose()
}
@Override
public boolean wasNull() throws SQLException {
return accessors.wasNull();
}
public Stopwatch getElapsedTimer() {
return elapsedTimer;
}
}