blob: 181f6712d85ffa67dea6b81ac6912799d58a7e67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc.odbc;
import java.sql.BatchUpdateException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.processors.authentication.AuthorizationContext;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.apache.ignite.transactions.TransactionMixedModeException;
import org.apache.ignite.transactions.TransactionSerializationException;
import org.apache.ignite.transactions.TransactionUnsupportedConcurrencyException;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_COLS;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_PARAMS;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_TBLS;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.MORE_RESULTS;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_CLOSE;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_EXEC;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_EXEC_BATCH;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_FETCH;
import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.STREAMING_BATCH;
/**
* SQL query handler.
*/
public class OdbcRequestHandler implements ClientListenerRequestHandler {
/** Query ID sequence. */
private static final AtomicLong QRY_ID_GEN = new AtomicLong();
/** Kernel context. */
private final GridKernalContext ctx;
/** Client context. */
private final SqlClientContext cliCtx;
/** Logger. */
private final IgniteLogger log;
/** Busy lock. */
private final GridSpinBusyLock busyLock;
/** Worker. */
private final OdbcRequestHandlerWorker worker;
/** Maximum allowed cursors. */
private final int maxCursors;
/** Current queries cursors. */
private final ConcurrentHashMap<Long, OdbcQueryResults> qryResults = new ConcurrentHashMap<>();
/** Nested transaction behaviour. */
private final NestedTxMode nestedTxMode;
/** Authentication context */
private final AuthorizationContext actx;
/** Client version. */
private ClientListenerProtocolVersion ver;
/** Ordered batches queue. */
private final PriorityQueue<OdbcStreamingBatchRequest> orderedBatchesQueue = new PriorityQueue<>();
/** Ordered batches mutex. */
private final Object orderedBatchesMux = new Object();
/** Response sender. */
private final ClientListenerResponseSender sender;
/**
* Constructor.
* @param ctx Context.
* @param busyLock Shutdown latch.
* @param sender Results sender.
* @param maxCursors Maximum allowed cursors.
* @param distributedJoins Distributed joins flag.
* @param enforceJoinOrder Enforce join order flag.
* @param replicatedOnly Replicated only flag.
* @param collocated Collocated flag.
* @param lazy Lazy flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
* @param nestedTxMode Nested transaction mode.
* @param actx Authentication context.
* @param ver Client protocol version.
*/
public OdbcRequestHandler(
GridKernalContext ctx,
GridSpinBusyLock busyLock,
ClientListenerResponseSender sender,
int maxCursors,
boolean distributedJoins,
boolean enforceJoinOrder,
boolean replicatedOnly,
boolean collocated,
boolean lazy,
boolean skipReducerOnUpdate,
AuthorizationContext actx, NestedTxMode nestedTxMode, ClientListenerProtocolVersion ver) {
this.ctx = ctx;
Factory<GridWorker> orderedFactory = new Factory<GridWorker>() {
@Override public GridWorker create() {
return new OrderedBatchWorker();
}
};
this.cliCtx = new SqlClientContext(
ctx,
orderedFactory,
distributedJoins,
enforceJoinOrder,
collocated,
replicatedOnly,
lazy,
skipReducerOnUpdate,
null,
null
);
this.busyLock = busyLock;
this.sender = sender;
this.maxCursors = maxCursors;
this.actx = actx;
this.nestedTxMode = nestedTxMode;
this.ver = ver;
log = ctx.log(getClass());
// TODO IGNITE-9484 Do not create worker if there is a possibility to unbind TX from threads.
worker = new OdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, this, ctx);
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handle(ClientListenerRequest req0) {
assert req0 != null;
assert req0 instanceof OdbcRequest;
OdbcRequest req = (OdbcRequest)req0;
if (!MvccUtils.mvccEnabled(ctx))
return doHandle(req);
else {
GridFutureAdapter<ClientListenerResponse> fut = worker.process(req);
try {
return fut.get();
}
catch (IgniteCheckedException e) {
return exceptionToResult(e);
}
}
}
/**
* Start worker, if it's present.
*/
void start() {
if (worker != null)
worker.start();
}
/**
* Handle ODBC request.
* @param req ODBC request.
* @return Response.
*/
public ClientListenerResponse doHandle(OdbcRequest req) {
if (!busyLock.enterBusy())
return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to handle ODBC request because node is stopping: " + req);
if (actx != null)
AuthorizationContext.context(actx);
try {
switch (req.command()) {
case QRY_EXEC:
return executeQuery((OdbcQueryExecuteRequest)req);
case QRY_EXEC_BATCH:
return executeBatchQuery((OdbcQueryExecuteBatchRequest)req);
case STREAMING_BATCH:
return dispatchBatchOrdered((OdbcStreamingBatchRequest)req);
case QRY_FETCH:
return fetchQuery((OdbcQueryFetchRequest)req);
case QRY_CLOSE:
return closeQuery((OdbcQueryCloseRequest)req);
case META_COLS:
return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req);
case META_TBLS:
return getTablesMeta((OdbcQueryGetTablesMetaRequest)req);
case META_PARAMS:
return getParamsMeta((OdbcQueryGetParamsMetaRequest)req);
case MORE_RESULTS:
return moreResults((OdbcQueryMoreResultsRequest)req);
}
return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Unsupported ODBC request: " + req);
}
finally {
AuthorizationContext.clear();
busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handleException(Exception e, ClientListenerRequest req) {
return exceptionToResult(e);
}
/** {@inheritDoc} */
@Override public void writeHandshake(BinaryWriterExImpl writer) {
writer.writeBoolean(true);
}
/**
* Called whenever client is disconnected due to correct connection close
* or due to {@code IOException} during network operations.
*/
public void onDisconnect() {
if (busyLock.enterBusy()) {
if (worker != null) {
worker.cancel();
try {
worker.join();
}
catch (InterruptedException e) {
// No-op.
}
}
try {
for (OdbcQueryResults res : qryResults.values())
res.closeAll();
U.close(cliCtx, log);
}
finally {
busyLock.leaveBusy();
}
}
}
/** {@inheritDoc} */
@Override public boolean isCancellationCommand(int cmdId) {
return false;
}
/** {@inheritDoc} */
@Override public boolean isCancellationSupported() {
return false;
}
/** {@inheritDoc} */
@Override public void registerRequest(long reqId, int cmdType) {
// No-op.
}
/** {@inheritDoc} */
@Override public void unregisterRequest(long reqId) {
// No-op.
}
/** {@inheritDoc} */
@Override public ClientListenerProtocolVersion protocolVersion() {
return ver;
}
/**
* Make query considering handler configuration.
* @param schema Schema.
* @param sql SQL request.
* @param args Arguments.
* @param autoCommit Autocommit transaction.
* @param timeout Query timeout.
* @return Query instance.
*/
private SqlFieldsQueryEx makeQuery(String schema, String sql, Object[] args, int timeout, boolean autoCommit) {
SqlFieldsQueryEx qry = makeQuery(schema, sql);
qry.setArgs(args);
qry.setAutoCommit(autoCommit);
if (timeout >= 0)
qry.setTimeout(timeout, TimeUnit.SECONDS);
return qry;
}
/**
* Make query considering handler configuration.
* @param schema Schema.
* @param sql SQL request.
* @return Query instance.
*/
private SqlFieldsQueryEx makeQuery(String schema, String sql) {
SqlFieldsQueryEx qry = new SqlFieldsQueryEx(sql, null);
qry.setDistributedJoins(cliCtx.isDistributedJoins());
qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
qry.setCollocated(cliCtx.isCollocated());
qry.setLazy(cliCtx.isLazy());
qry.setSchema(OdbcUtils.prepareSchema(schema));
qry.setSkipReducerOnUpdate(cliCtx.isSkipReducerOnUpdate());
qry.setNestedTxMode(nestedTxMode);
return qry;
}
/**
* {@link OdbcQueryExecuteRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private ClientListenerResponse executeQuery(OdbcQueryExecuteRequest req) {
int cursorCnt = qryResults.size();
if (maxCursors > 0 && cursorCnt >= maxCursors)
return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close " +
"other open cursors or increase the limit through " +
"ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors +
", current=" + cursorCnt + ']');
long qryId = QRY_ID_GEN.getAndIncrement();
assert !cliCtx.isStream();
try {
String sql = OdbcEscapeUtils.parse(req.sqlQuery());
if (log.isDebugEnabled())
log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() +
", parsed=" + sql + ']');
SqlFieldsQuery qry = makeQuery(req.schema(), sql, req.arguments(), req.timeout(), req.autoCommit());
List<FieldsQueryCursor<List<?>>> cursors = ctx.query().querySqlFields(null, qry, cliCtx, true, false);
OdbcQueryResults results = new OdbcQueryResults(cursors, ver);
Collection<OdbcColumnMeta> fieldsMeta;
OdbcResultSet set = results.currentResultSet();
if (set == null)
fieldsMeta = new ArrayList<>();
else {
fieldsMeta = results.currentResultSet().fieldsMeta();
if (log.isDebugEnabled()) {
for (OdbcColumnMeta meta : fieldsMeta)
log.debug("Meta - " + meta.toString());
}
}
if (!results.hasUnfetchedRows())
results.closeAll();
else
qryResults.put(qryId, results);
OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, fieldsMeta, results.rowsAffected());
return new OdbcResponse(res);
}
catch (Exception e) {
qryResults.remove(qryId);
U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryExecuteBatchRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private ClientListenerResponse executeBatchQuery(OdbcQueryExecuteBatchRequest req) {
try {
String sql = OdbcEscapeUtils.parse(req.sqlQuery());
if (log.isDebugEnabled())
log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() +
", parsed=" + sql + ']');
SqlFieldsQueryEx qry = makeQuery(req.schema(), sql, null, req.timeout(), req.autoCommit());
Object[][] paramSet = req.arguments();
if (paramSet.length <= 0)
throw new IgniteException("Batch execute request with non-positive batch length. [len="
+ paramSet.length + ']');
// Getting meta and do the checks for the first execution.
for (Object[] set : paramSet)
qry.addBatchedArgs(set);
List<FieldsQueryCursor<List<?>>> qryCurs =
ctx.query().querySqlFields(null, qry, cliCtx, true, true);
long[] rowsAffected = new long[req.arguments().length];
for (int i = 0; i < qryCurs.size(); ++i)
rowsAffected[i] = OdbcUtils.rowsAffected(qryCurs.get(i));
OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(rowsAffected);
return new OdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToBatchResult(e);
}
}
/**
* @param req Ordered batch request.
* @return Response.
*/
private ClientListenerResponse dispatchBatchOrdered(OdbcStreamingBatchRequest req) {
if (!cliCtx.isStreamOrdered())
processStreamingBatchOrdered(req);
else {
synchronized (orderedBatchesMux) {
orderedBatchesQueue.add(req);
orderedBatchesMux.notifyAll();
}
}
return null;
}
/**
* @param req Ordered batch request.
*/
private void processStreamingBatchOrdered(OdbcStreamingBatchRequest req) {
try {
if (req.last())
cliCtx.waitTotalProcessedOrderedRequests(req.order());
sender.send(processStreamingBatch(req));
} catch (Exception e) {
U.error(null, "Error processing file batch", e);
sender.send(new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e));
}
cliCtx.orderedRequestProcessed();
}
/**
* @param req Request.
* @return Response.
*/
private ClientListenerResponse processStreamingBatch(OdbcStreamingBatchRequest req) {
assert cliCtx.isStream();
// Send back only the first error. Others will be written to the log.
IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>();
SqlFieldsQueryEx qry = null;
for (OdbcQuery q : req.queries()) {
if (q.sql() != null) { // If we have a new query string in the batch,
if (qry != null) // then execute the previous sub-batch and create a new SqlFieldsQueryEx.
processStreamingBatch(qry, firstErr);
qry = makeQuery(req.schemaName(), q.sql());
}
assert qry != null;
qry.addBatchedArgs(q.args());
}
if (qry != null)
processStreamingBatch(qry, firstErr);
if (req.last())
cliCtx.disableStreaming();
if (firstErr.isEmpty())
return new OdbcResponse(new OdbcStreamingBatchResult(req.order()));
else
{
assert firstErr.getKey() != null;
return new OdbcResponse(new OdbcStreamingBatchResult(firstErr.getKey(), firstErr.getValue(), req.order()));
}
}
/**
* Executes query and updates result counters.
*
* @param qry Query.
* @param err First error data - code and message.
*/
private void processStreamingBatch(SqlFieldsQueryEx qry, IgniteBiTuple<Integer, String> err) {
try {
assert cliCtx.isStream();
ctx.query().streamBatchedUpdateQuery(
OdbcUtils.prepareSchema(qry.getSchema()),
cliCtx,
qry.getSql(),
qry.batchedArguments()
);
}
catch (Exception e) {
U.error(log, "Failed to execute batch query [qry=" + qry + ']', e);
extractBatchError(e, null, err);
}
}
/**
* {@link OdbcQueryCloseRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private ClientListenerResponse closeQuery(OdbcQueryCloseRequest req) {
long queryId = req.queryId();
try {
OdbcQueryResults results = qryResults.get(queryId);
if (results == null)
return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query with ID: " + queryId);
CloseCursor(results, queryId);
OdbcQueryCloseResult res = new OdbcQueryCloseResult(queryId);
return new OdbcResponse(res);
}
catch (Exception e) {
qryResults.remove(queryId);
U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + queryId + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryFetchRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private ClientListenerResponse fetchQuery(OdbcQueryFetchRequest req) {
try {
long queryId = req.queryId();
OdbcQueryResults results = qryResults.get(queryId);
if (results == null)
return new OdbcResponse(ClientListenerResponse.STATUS_FAILED,
"Failed to find query with ID: " + queryId);
OdbcResultSet set = results.currentResultSet();
List<Object> items = set.fetch(req.pageSize());
boolean lastPage = !set.hasUnfetchedRows();
// Automatically closing cursor if no more data is available.
if (!results.hasUnfetchedRows())
CloseCursor(results, queryId);
OdbcQueryFetchResult res = new OdbcQueryFetchResult(queryId, items, lastPage);
return new OdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryGetColumnsMetaRequest} command handler.
*
* @param req Get columns metadata request.
* @return Response.
*/
private ClientListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) {
try {
List<OdbcColumnMeta> meta = new ArrayList<>();
String schemaPattern;
String tablePattern;
if (req.tablePattern().contains(".")) {
// Parsing two-part table name.
String[] parts = req.tablePattern().split("\\.");
schemaPattern = parts[0];
tablePattern = parts[1];
}
else {
schemaPattern = req.schemaPattern();
tablePattern = req.tablePattern();
}
schemaPattern = OdbcUtils.removeQuotationMarksIfNeeded(schemaPattern);
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (!matches(table.schemaName(), schemaPattern) ||
!matches(table.tableName(), tablePattern))
continue;
for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
if (!matches(field.getKey(), req.columnPattern()))
continue;
GridQueryProperty prop = table.property(field.getKey());
OdbcColumnMeta columnMeta = new OdbcColumnMeta(table.schemaName(), table.tableName(),
field.getKey(), field.getValue(), prop.precision(), prop.scale(), ver);
if (!meta.contains(columnMeta))
meta.add(columnMeta);
}
}
}
OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta);
return new OdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryGetTablesMetaRequest} command handler.
*
* @param req Get tables metadata request.
* @return Response.
*/
private ClientListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) {
try {
List<OdbcTableMeta> meta = new ArrayList<>();
String schemaPattern = OdbcUtils.removeQuotationMarksIfNeeded(req.schema());
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (!matches(table.schemaName(), schemaPattern) ||
!matches(table.tableName(), req.table()) ||
!matchesTableType("TABLE", req.tableType()))
continue;
OdbcTableMeta tableMeta = new OdbcTableMeta(null, table.schemaName(), table.tableName(), "TABLE");
if (!meta.contains(tableMeta))
meta.add(tableMeta);
}
}
OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta);
return new OdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryGetParamsMetaRequest} command handler.
*
* @param req Get params metadata request.
* @return Response.
*/
private ClientListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) {
try {
String sql = OdbcEscapeUtils.parse(req.query());
String schema = OdbcUtils.prepareSchema(req.schema());
SqlFieldsQueryEx qry = makeQuery(schema, sql);
List<JdbcParameterMeta> params = ctx.query().getIndexing().parameterMetaData(schema, qry);
byte[] typeIds = new byte[params.size()];
for (int i = 0; i < params.size(); ++i) {
int sqlType = params.get(i).type();
typeIds[i] = sqlTypeToBinary(sqlType);
}
OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds);
return new OdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryMoreResultsRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private ClientListenerResponse moreResults(OdbcQueryMoreResultsRequest req) {
try {
long queryId = req.queryId();
OdbcQueryResults results = qryResults.get(queryId);
if (results == null)
return new OdbcResponse(ClientListenerResponse.STATUS_FAILED,
"Failed to find query with ID: " + queryId);
results.nextResultSet();
OdbcResultSet set = results.currentResultSet();
List<Object> items = set.fetch(req.pageSize());
boolean lastPage = !set.hasUnfetchedRows();
// Automatically closing cursor if no more data is available.
if (!results.hasUnfetchedRows())
CloseCursor(results, queryId);
OdbcQueryMoreResultsResult res = new OdbcQueryMoreResultsResult(queryId, items, lastPage);
return new OdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get more SQL query results [reqId=" +
req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Close cursor.
* @param results Query map element.
* @param queryId Query ID.
*/
private void CloseCursor(OdbcQueryResults results, long queryId) {
assert (results != null);
results.closeAll();
qryResults.remove(queryId);
}
/**
* Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants).
*
* @param sqlType SQL type.
* @return Binary type.
*/
private static byte sqlTypeToBinary(int sqlType) {
switch (sqlType) {
case Types.BIGINT:
return GridBinaryMarshaller.LONG;
case Types.BOOLEAN:
return GridBinaryMarshaller.BOOLEAN;
case Types.DATE:
return GridBinaryMarshaller.DATE;
case Types.DOUBLE:
return GridBinaryMarshaller.DOUBLE;
case Types.FLOAT:
case Types.REAL:
return GridBinaryMarshaller.FLOAT;
case Types.NUMERIC:
case Types.DECIMAL:
return GridBinaryMarshaller.DECIMAL;
case Types.INTEGER:
return GridBinaryMarshaller.INT;
case Types.SMALLINT:
return GridBinaryMarshaller.SHORT;
case Types.TIME:
return GridBinaryMarshaller.TIME;
case Types.TIMESTAMP:
return GridBinaryMarshaller.TIMESTAMP;
case Types.TINYINT:
return GridBinaryMarshaller.BYTE;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGNVARCHAR:
return GridBinaryMarshaller.STRING;
case Types.NULL:
return GridBinaryMarshaller.NULL;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
default:
return GridBinaryMarshaller.BYTE_ARR;
}
}
/**
* Checks whether string matches table type pattern.
*
* @param str String.
* @param ptrn Pattern.
* @return Whether string matches pattern.
*/
private static boolean matchesTableType(String str, String ptrn) {
if (F.isEmpty(ptrn))
return true;
if (str == null)
return false;
String pattern = OdbcUtils.preprocessPattern(ptrn);
String[] types = pattern.split(",");
for (String type0 : types) {
String type = OdbcUtils.removeQuotationMarksIfNeeded(type0.trim());
if (str.toUpperCase().matches(type))
return true;
}
return false;
}
/**
* Checks whether string matches SQL pattern.
*
* @param str String.
* @param ptrn Pattern.
* @return Whether string matches pattern.
*/
private static boolean matches(String str, String ptrn) {
if (F.isEmpty(ptrn))
return true;
if (str == null)
return false;
String pattern = OdbcUtils.preprocessPattern(ptrn);
return str.toUpperCase().matches(pattern);
}
/**
* Create {@link OdbcResponse} bearing appropriate Ignite specific result code if possible
* from given {@link Exception}.
*
* @param e Exception to convert.
* @return resulting {@link OdbcResponse}.
*/
private static OdbcResponse exceptionToBatchResult(Exception e) {
IgniteBiTuple<Integer, String> err = new IgniteBiTuple<>();
List<Long> rowsAffected = new ArrayList<>();
extractBatchError(e, rowsAffected, err);
OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(
U.toLongArray(rowsAffected), -1, err.get1(), err.get2());
return new OdbcResponse(res);
}
/**
* Extract batching error from general exception.
* @param e Exception
* @param rowsAffected List containing the number of affected rows for every query in batch.
* @param err Error tuple containing error code and error message.
*/
private static void extractBatchError(Exception e, List<Long> rowsAffected, IgniteBiTuple<Integer, String> err) {
if (e instanceof IgniteSQLException) {
BatchUpdateException batchCause = X.cause(e, BatchUpdateException.class);
if (batchCause != null) {
if (rowsAffected != null) {
for (long cnt : batchCause.getLargeUpdateCounts())
rowsAffected.add(cnt);
}
err.set(batchCause.getErrorCode(), batchCause.getMessage());
}
else
err.set(((IgniteSQLException)e).statusCode(), OdbcUtils.tryRetrieveH2ErrorMessage(e));
}
else
err.set(IgniteQueryErrorCode.UNKNOWN, e.getMessage());
}
/**
* Create {@link OdbcResponse} bearing appropriate Ignite specific result code if possible
* from given {@link Exception}.
*
* @param e Exception to convert.
* @return resulting {@link OdbcResponse}.
*/
private static OdbcResponse exceptionToResult(Exception e) {
String msg = OdbcUtils.tryRetrieveH2ErrorMessage(e);
if (e instanceof TransactionSerializationException)
return new OdbcResponse(IgniteQueryErrorCode.TRANSACTION_SERIALIZATION_ERROR, msg);
if (e instanceof TransactionAlreadyCompletedException)
return new OdbcResponse(IgniteQueryErrorCode.TRANSACTION_COMPLETED, msg);
if (e instanceof TransactionMixedModeException)
return new OdbcResponse(IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH, msg);
if (e instanceof TransactionUnsupportedConcurrencyException)
return new OdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION, msg);
if (e instanceof TransactionDuplicateKeyException)
return new OdbcResponse(IgniteQueryErrorCode.DUPLICATE_KEY, msg);
return new OdbcResponse(OdbcUtils.tryRetrieveSqlErrorCode(e), msg);
}
/**
* Ordered batch worker.
*/
private class OrderedBatchWorker extends GridWorker {
/**
* Constructor.
*/
OrderedBatchWorker() {
super(ctx.igniteInstanceName(), "ordered-batch", OdbcRequestHandler.this.log);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
long nextBatchOrder = 0;
while (true) {
if (!cliCtx.isStream())
return;
OdbcStreamingBatchRequest req;
synchronized (orderedBatchesMux) {
req = orderedBatchesQueue.peek();
if (req == null || req.order() != nextBatchOrder) {
orderedBatchesMux.wait();
continue;
}
else
orderedBatchesQueue.poll();
}
processStreamingBatchOrdered(req);
nextBatchOrder++;
}
}
}
}