blob: e0482080004665152d177ccfcb2a3c51d49984aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc.jdbc;
import java.sql.BatchUpdateException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteVersionUtils;
import org.apache.ignite.internal.ThinProtocolFeature;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.jdbc.thin.JdbcThinPartitionAwarenessMappingGroup;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponseSender;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.apache.ignite.transactions.TransactionMixedModeException;
import org.apache.ignite.transactions.TransactionSerializationException;
import org.apache.ignite.transactions.TransactionUnsupportedConcurrencyException;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_7_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_8_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_9_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_GET;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_NAME_GET;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_NAME_PUT;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BINARY_TYPE_PUT;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.CACHE_PARTITIONS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PARAMS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PRIMARY_KEYS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_SCHEMAS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_TABLES;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CANCEL;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_META;
/**
* JDBC request handler.
*/
public class JdbcRequestHandler implements ClientListenerRequestHandler {
/** Jdbc query cancelled response. */
private static final JdbcResponse JDBC_QUERY_CANCELLED_RESPONSE =
new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG);
/** JDBC connection context. */
private final JdbcConnectionContext connCtx;
/** Client context. */
private final SqlClientContext cliCtx;
/** Logger. */
private final IgniteLogger log;
/** Busy lock. */
private final GridSpinBusyLock busyLock;
/** Worker. */
private final JdbcRequestHandlerWorker worker;
/** Maximum allowed cursors. */
private final int maxCursors;
/** Current JDBC cursors. */
private final ConcurrentHashMap<Long, JdbcCursor> jdbcCursors = new ConcurrentHashMap<>();
/** Ordered batches queue. */
private final PriorityQueue<JdbcOrderedBatchExecuteRequest> orderedBatchesQueue = new PriorityQueue<>();
/** Ordered batches mutex. */
private final Object orderedBatchesMux = new Object();
/** Request mutex. */
private final Object reqMux = new Object();
/** Response sender. */
private final ClientListenerResponseSender sender;
/** Automatic close of cursors. */
private final boolean autoCloseCursors;
/** Nested transactions handling mode. */
private final NestedTxMode nestedTxMode;
/** Protocol version. */
private final ClientListenerProtocolVersion protocolVer;
/** Facade that hides transformations internal cache api entities -> jdbc metadata. */
private final JdbcMetadataInfo meta;
/** Register that keeps non-cancelled requests. */
private Map<Long, JdbcQueryDescriptor> reqRegister = new HashMap<>();
/**
* Constructor.
* @param busyLock Shutdown latch.
* @param sender Results sender.
* @param maxCursors Maximum allowed cursors.
* @param distributedJoins Distributed joins flag.
* @param enforceJoinOrder Enforce join order flag.
* @param collocated Collocated flag.
* @param replicatedOnly Replicated only flag.
* @param autoCloseCursors Flag to automatically close server cursors.
* @param lazy Lazy query execution flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
* @param dataPageScanEnabled Enable scan data page mode.
* @param updateBatchSize Size of internal batch for DML queries.
* @param protocolVer Protocol version.
* @param connCtx Jdbc connection context.
*/
public JdbcRequestHandler(
GridSpinBusyLock busyLock,
ClientListenerResponseSender sender,
int maxCursors,
boolean distributedJoins,
boolean enforceJoinOrder,
boolean collocated,
boolean replicatedOnly,
boolean autoCloseCursors,
boolean lazy,
boolean skipReducerOnUpdate,
NestedTxMode nestedTxMode,
@Nullable Boolean dataPageScanEnabled,
@Nullable Integer updateBatchSize,
ClientListenerProtocolVersion protocolVer,
JdbcConnectionContext connCtx
) {
this.connCtx = connCtx;
this.sender = sender;
meta = new JdbcMetadataInfo(connCtx.kernalContext());
Factory<GridWorker> orderedFactory = new Factory<GridWorker>() {
@Override public GridWorker create() {
return new OrderedBatchWorker();
}
};
cliCtx = new SqlClientContext(
connCtx.kernalContext(),
orderedFactory,
distributedJoins,
enforceJoinOrder,
collocated,
replicatedOnly,
lazy,
skipReducerOnUpdate,
dataPageScanEnabled,
updateBatchSize
);
this.busyLock = busyLock;
this.maxCursors = maxCursors;
this.autoCloseCursors = autoCloseCursors;
this.nestedTxMode = nestedTxMode;
this.protocolVer = protocolVer;
log = connCtx.kernalContext().log(getClass());
// TODO IGNITE-9484 Do not create worker if there is a possibility to unbind TX from threads.
worker = new JdbcRequestHandlerWorker(connCtx.kernalContext().igniteInstanceName(), log, this,
connCtx.kernalContext());
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handle(ClientListenerRequest req0) {
assert req0 != null;
assert req0 instanceof JdbcRequest;
JdbcRequest req = (JdbcRequest)req0;
if (!MvccUtils.mvccEnabled(connCtx.kernalContext()))
return doHandle(req);
else {
GridFutureAdapter<ClientListenerResponse> fut = worker.process(req);
try {
return fut.get();
}
catch (IgniteCheckedException e) {
return exceptionToResult(e);
}
}
}
/** {@inheritDoc} */
@Override public boolean isCancellationCommand(int cmdId) {
return cmdId == JdbcRequest.QRY_CANCEL;
}
/** {@inheritDoc} */
@Override public void registerRequest(long reqId, int cmdType) {
assert reqId != 0;
synchronized (reqMux) {
if (isCancellationSupported() && (cmdType == QRY_EXEC || cmdType == BATCH_EXEC))
reqRegister.put(reqId, new JdbcQueryDescriptor());
}
}
/** {@inheritDoc} */
@Override public void unregisterRequest(long reqId) {
assert reqId != 0;
synchronized (reqMux) {
if (isCancellationSupported())
reqRegister.remove(reqId);
}
}
/**
* Start worker, if it's present.
*/
void start() {
if (worker != null)
worker.start();
}
/**
* Actually handle the request.
* @param req Request.
* @return Request handling result.
*/
JdbcResponse doHandle(JdbcRequest req) {
if (!busyLock.enterBusy())
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to handle JDBC request because node is stopping.");
JdbcResponse resp;
try {
switch (req.type()) {
case QRY_EXEC:
resp = executeQuery((JdbcQueryExecuteRequest)req);
break;
case QRY_FETCH:
resp = fetchQuery((JdbcQueryFetchRequest)req);
break;
case QRY_CLOSE:
resp = closeQuery((JdbcQueryCloseRequest)req);
break;
case QRY_META:
resp = getQueryMeta((JdbcQueryMetadataRequest)req);
break;
case BATCH_EXEC:
resp = executeBatch((JdbcBatchExecuteRequest)req);
break;
case BATCH_EXEC_ORDERED:
resp = dispatchBatchOrdered((JdbcOrderedBatchExecuteRequest)req);
break;
case META_TABLES:
resp = getTablesMeta((JdbcMetaTablesRequest)req);
break;
case META_COLUMNS:
resp = getColumnsMeta((JdbcMetaColumnsRequest)req);
break;
case META_INDEXES:
resp = getIndexesMeta((JdbcMetaIndexesRequest)req);
break;
case META_PARAMS:
resp = getParametersMeta((JdbcMetaParamsRequest)req);
break;
case META_PRIMARY_KEYS:
resp = getPrimaryKeys((JdbcMetaPrimaryKeysRequest)req);
break;
case META_SCHEMAS:
resp = getSchemas((JdbcMetaSchemasRequest)req);
break;
case BULK_LOAD_BATCH:
resp = processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req);
break;
case QRY_CANCEL:
resp = cancelQuery((JdbcQueryCancelRequest)req);
break;
case CACHE_PARTITIONS:
resp = getCachePartitions((JdbcCachePartitionsRequest)req);
break;
case BINARY_TYPE_NAME_PUT:
resp = registerBinaryType((JdbcBinaryTypeNamePutRequest)req);
break;
case BINARY_TYPE_NAME_GET:
resp = getBinaryTypeName((JdbcBinaryTypeNameGetRequest)req);
break;
case BINARY_TYPE_PUT:
resp = putBinaryType((JdbcBinaryTypePutRequest)req);
break;
case BINARY_TYPE_GET:
resp = getBinaryType((JdbcBinaryTypeGetRequest)req);
break;
default:
resp = new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION,
"Unsupported JDBC request [req=" + req + ']');
}
if (resp != null)
resp.activeTransaction(connCtx.kernalContext().cache().context().tm().inUserTx());
return resp;
}
finally {
busyLock.leaveBusy();
}
}
/**
* @param req Ordered batch request.
* @return Response.
*/
private JdbcResponse dispatchBatchOrdered(JdbcOrderedBatchExecuteRequest req) {
if (!cliCtx.isStreamOrdered())
executeBatchOrdered(req);
else {
synchronized (orderedBatchesMux) {
orderedBatchesQueue.add(req);
orderedBatchesMux.notifyAll();
}
}
return null;
}
/**
* @param req Ordered batch request.
* @return Response.
*/
private ClientListenerResponse executeBatchOrdered(JdbcOrderedBatchExecuteRequest req) {
try {
if (req.isLastStreamBatch())
cliCtx.waitTotalProcessedOrderedRequests(req.order());
JdbcResponse resp = executeBatch(req);
if (resp.response() instanceof JdbcBatchExecuteResult) {
resp = new JdbcResponse(
new JdbcOrderedBatchExecuteResult((JdbcBatchExecuteResult)resp.response(), req.order()));
}
sender.send(resp);
} catch (Exception e) {
U.error(null, "Error processing file batch", e);
sender.send(new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e));
}
cliCtx.orderedRequestProcessed();
return null;
}
/**
* Processes a file batch sent from client as part of bulk load COPY command.
*
* @param req Request object with a batch of a file received from client.
* @return Response to send to the client.
*/
private JdbcResponse processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) {
if (connCtx.kernalContext() == null)
return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, "Unknown query ID: "
+ req.cursorId() + ". Bulk load session may have been reclaimed due to timeout.");
JdbcBulkLoadProcessor processor = (JdbcBulkLoadProcessor)jdbcCursors.get(req.cursorId());
if (!prepareQueryCancellationMeta(processor))
return JDBC_QUERY_CANCELLED_RESPONSE;
boolean unregisterReq = false;
try {
processor.processBatch(req);
switch (req.cmd()) {
case CMD_FINISHED_ERROR:
case CMD_FINISHED_EOF:
jdbcCursors.remove(req.cursorId());
processor.close();
unregisterReq = true;
break;
case CMD_CONTINUE:
break;
default:
throw new IllegalArgumentException();
}
return resultToResonse(new JdbcQueryExecuteResult(req.cursorId(), processor.updateCnt(), null));
}
catch (Exception e) {
U.error(null, "Error processing file batch", e);
processor.onFail(e);
if (X.cause(e, QueryCancelledException.class) != null)
return exceptionToResult(new QueryCancelledException());
else
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e);
}
finally {
cleanupQueryCancellationMeta(unregisterReq, processor.requestId());
}
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handleException(Throwable e, ClientListenerRequest req) {
return exceptionToResult(e);
}
/** {@inheritDoc} */
@Override public void writeHandshake(BinaryWriterExImpl writer) {
// Handshake OK.
writer.writeBoolean(true);
// Write server version.
writer.writeByte(IgniteVersionUtils.VER.major());
writer.writeByte(IgniteVersionUtils.VER.minor());
writer.writeByte(IgniteVersionUtils.VER.maintenance());
writer.writeString(IgniteVersionUtils.VER.stage());
writer.writeLong(IgniteVersionUtils.VER.revisionTimestamp());
writer.writeByteArray(IgniteVersionUtils.VER.revisionHash());
// Write node id.
if (protocolVer.compareTo(VER_2_8_0) >= 0)
writer.writeUuid(connCtx.kernalContext().localNodeId());
// Write all features supported by the node.
if (protocolVer.compareTo(VER_2_9_0) >= 0)
writer.writeByteArray(ThinProtocolFeature.featuresAsBytes(connCtx.protocolContext().features()));
}
/**
* Called whenever client is disconnected due to correct connection close
* or due to {@code IOException} during network operations.
*/
public void onDisconnect() {
if (worker != null) {
worker.cancel();
try {
worker.join();
}
catch (InterruptedException e) {
// No-op.
}
}
for (JdbcCursor cursor : jdbcCursors.values())
U.close(cursor, log);
jdbcCursors.clear();
synchronized (reqMux) {
reqRegister.clear();
}
U.close(cliCtx, log);
}
/**
* {@link JdbcQueryExecuteRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
@SuppressWarnings("unchecked")
private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) {
GridQueryCancel cancel = null;
boolean unregisterReq = false;
if (isCancellationSupported()) {
synchronized (reqMux) {
JdbcQueryDescriptor desc = reqRegister.get(req.requestId());
// Query was already cancelled and unregistered.
if (desc == null)
return null;
cancel = desc.cancelHook();
desc.incrementUsageCount();
}
}
try {
int cursorCnt = jdbcCursors.size();
if (maxCursors > 0 && cursorCnt >= maxCursors)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close other " +
"open cursors or increase the limit through " +
"ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors +
", current=" + cursorCnt + ']');
assert !cliCtx.isStream();
String sql = req.sqlQuery();
SqlFieldsQueryEx qry;
switch (req.expectedStatementType()) {
case ANY_STATEMENT_TYPE:
qry = new SqlFieldsQueryEx(sql, null);
break;
case SELECT_STATEMENT_TYPE:
qry = new SqlFieldsQueryEx(sql, true);
break;
default:
assert req.expectedStatementType() == JdbcStatementType.UPDATE_STMT_TYPE;
qry = new SqlFieldsQueryEx(sql, false);
if (cliCtx.isSkipReducerOnUpdate())
((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true);
}
setupQuery(qry, prepareSchemaName(req.schemaName()));
qry.setArgs(req.arguments());
qry.setAutoCommit(req.autoCommit());
if (req.explicitTimeout()) {
// Timeout is handled on a client side, do not handle it on a server side.
qry.setTimeout(0, TimeUnit.MILLISECONDS);
}
if (req.pageSize() <= 0)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid fetch size: " + req.pageSize());
qry.setPageSize(req.pageSize());
String schemaName = req.schemaName();
if (F.isEmpty(schemaName))
schemaName = QueryUtils.DFLT_SCHEMA;
qry.setSchema(schemaName);
List<FieldsQueryCursor<List<?>>> results = connCtx.kernalContext().query().querySqlFields(null, qry,
cliCtx, true, protocolVer.compareTo(VER_2_3_0) < 0, cancel);
FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
if (fieldsCur instanceof BulkLoadContextCursor) {
BulkLoadContextCursor blCur = (BulkLoadContextCursor)fieldsCur;
BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor();
BulkLoadAckClientParameters clientParams = blCur.clientParams();
JdbcBulkLoadProcessor processor = new JdbcBulkLoadProcessor(blProcessor, req.requestId());
jdbcCursors.put(processor.cursorId(), processor);
// responses for the same query on the client side
return resultToResonse(new JdbcBulkLoadAckResult(processor.cursorId(), clientParams));
}
if (results.size() == 1) {
JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(),
(QueryCursorImpl)fieldsCur, req.requestId());
jdbcCursors.put(cur.cursorId(), cur);
cur.openIterator();
JdbcQueryExecuteResult res;
PartitionResult partRes = ((QueryCursorImpl<List<?>>)fieldsCur).partitionResult();
if (cur.isQuery())
res = new JdbcQueryExecuteResult(cur.cursorId(), cur.fetchRows(), !cur.hasNext(),
isClientPartitionAwarenessApplicable(req.partitionResponseRequest(), partRes) ?
partRes :
null);
else {
List<List<Object>> items = cur.fetchRows();
assert items != null && items.size() == 1 && items.get(0).size() == 1
&& items.get(0).get(0) instanceof Long :
"Invalid result set for not-SELECT query. [qry=" + sql +
", res=" + S.toString(List.class, items) + ']';
res = new JdbcQueryExecuteResult(cur.cursorId(), (Long)items.get(0).get(0),
isClientPartitionAwarenessApplicable(req.partitionResponseRequest(), partRes) ?
partRes :
null);
}
if (res.last() && (!res.isQuery() || autoCloseCursors)) {
jdbcCursors.remove(cur.cursorId());
unregisterReq = true;
cur.close();
}
return resultToResonse(res);
}
else {
List<JdbcResultInfo> jdbcResults = new ArrayList<>(results.size());
List<List<Object>> items = null;
boolean last = true;
for (FieldsQueryCursor<List<?>> c : results) {
QueryCursorImpl qryCur = (QueryCursorImpl)c;
JdbcResultInfo jdbcRes;
if (qryCur.isQuery()) {
JdbcQueryCursor cur = new JdbcQueryCursor(req.pageSize(), req.maxRows(), qryCur, req.requestId());
jdbcCursors.put(cur.cursorId(), cur);
jdbcRes = new JdbcResultInfo(true, -1, cur.cursorId());
cur.openIterator();
if (items == null) {
items = cur.fetchRows();
last = cur.hasNext();
}
}
else
jdbcRes = new JdbcResultInfo(false, (Long)((List<?>)qryCur.getAll().get(0)).get(0), -1);
jdbcResults.add(jdbcRes);
}
return resultToResonse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last));
}
}
catch (Exception e) {
// Trying to close all cursors of current request.
clearCursors(req.requestId());
unregisterReq = true;
U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
if (X.cause(e, QueryCancelledException.class) != null)
return exceptionToResult(new QueryCancelledException());
else
return exceptionToResult(e);
}
finally {
cleanupQueryCancellationMeta(unregisterReq, req.requestId());
}
}
/**
* {@link JdbcQueryCloseRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private JdbcResponse closeQuery(JdbcQueryCloseRequest req) {
JdbcCursor cur = jdbcCursors.get(req.cursorId());
if (!prepareQueryCancellationMeta(cur))
return new JdbcResponse(null);
try {
cur = jdbcCursors.remove(req.cursorId());
if (cur == null)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query cursor with ID: " + req.cursorId());
cur.close();
return new JdbcResponse(null);
}
catch (Exception e) {
jdbcCursors.remove(req.cursorId());
U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
if (X.cause(e, QueryCancelledException.class) != null)
return new JdbcResponse(null);
else
return exceptionToResult(e);
}
finally {
if (isCancellationSupported()) {
boolean clearCursors = false;
synchronized (reqMux) {
assert cur != null;
JdbcQueryDescriptor desc = reqRegister.get(cur.requestId());
if (desc != null) {
// Query was cancelled during execution.
if (desc.isCanceled()) {
clearCursors = true;
unregisterRequest(req.requestId());
}
else {
tryUnregisterRequest(cur.requestId());
desc.decrementUsageCount();
}
}
}
if (clearCursors)
clearCursors(cur.requestId());
}
}
}
/**
* {@link JdbcQueryFetchRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private JdbcResponse fetchQuery(JdbcQueryFetchRequest req) {
final JdbcQueryCursor cur = (JdbcQueryCursor)jdbcCursors.get(req.cursorId());
if (!prepareQueryCancellationMeta(cur))
return JDBC_QUERY_CANCELLED_RESPONSE;
boolean unregisterReq = false;
try {
if (cur == null)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query cursor with ID: " + req.cursorId());
if (req.pageSize() <= 0)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Invalid fetch size : [fetchSize=" + req.pageSize() + ']');
cur.pageSize(req.pageSize());
JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext());
if (res.last() && (!cur.isQuery() || autoCloseCursors)) {
jdbcCursors.remove(req.cursorId());
unregisterReq = true;
cur.close();
}
return resultToResonse(res);
}
catch (Exception e) {
U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
if (X.cause(e, QueryCancelledException.class) != null)
return exceptionToResult(new QueryCancelledException());
else
return exceptionToResult(e);
}
finally {
assert cur != null;
cleanupQueryCancellationMeta(unregisterReq, cur.requestId());
}
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse getQueryMeta(JdbcQueryMetadataRequest req) {
final JdbcQueryCursor cur = (JdbcQueryCursor)jdbcCursors.get(req.cursorId());
if (!prepareQueryCancellationMeta(cur))
return JDBC_QUERY_CANCELLED_RESPONSE;
try {
if (cur == null)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query cursor with ID: " + req.cursorId());
JdbcQueryMetadataResult res = new JdbcQueryMetadataResult(req.cursorId(),
cur.meta());
return resultToResonse(res);
}
catch (Exception e) {
U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
finally {
assert cur != null;
cleanupQueryCancellationMeta(false, cur.requestId());
}
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse executeBatch(JdbcBatchExecuteRequest req) {
GridQueryCancel cancel = null;
// Skip request register check for ORDERED batches (JDBC streams)
// because ordered batch requests are processed asynchronously at the
// separate thread.
if (isCancellationSupported() && req.type() == BATCH_EXEC) {
synchronized (reqMux) {
JdbcQueryDescriptor desc = reqRegister.get(req.requestId());
// Query was already cancelled and unregisterd.
if (desc == null)
return null;
cancel = desc.cancelHook();
desc.incrementUsageCount();
}
}
try {
String schemaName = prepareSchemaName(req.schemaName());
int qryCnt = req.queries().size();
List<Integer> updCntsAcc = new ArrayList<>(qryCnt);
// Send back only the first error. Others will be written to the log.
IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>();
SqlFieldsQueryEx qry = null;
for (JdbcQuery q : req.queries()) {
if (q.sql() != null) { // If we have a new query string in the batch,
if (qry != null) // then execute the previous sub-batch and create a new SqlFieldsQueryEx.
executeBatchedQuery(qry, updCntsAcc, firstErr, cancel);
qry = new SqlFieldsQueryEx(q.sql(), false);
setupQuery(qry, schemaName);
qry.setAutoCommit(req.autoCommit());
}
assert qry != null;
qry.addBatchedArgs(q.args());
}
if (qry != null)
executeBatchedQuery(qry, updCntsAcc, firstErr, cancel);
if (req.isLastStreamBatch())
cliCtx.disableStreaming();
int updCnts[] = U.toIntArray(updCntsAcc);
return firstErr.isEmpty() ?
resultToResonse(
new JdbcBatchExecuteResult(updCnts, ClientListenerResponse.STATUS_SUCCESS, null)) :
resultToResonse(new JdbcBatchExecuteResult(updCnts, firstErr.getKey(), firstErr.getValue()));
}
catch (QueryCancelledException e) {
return exceptionToResult(e);
}
finally {
cleanupQueryCancellationMeta(true, req.requestId());
}
}
/**
* Normalize schema name.
*
* @param schemaName Schema name.
* @return Normalized schema name.
*/
private static String prepareSchemaName(@Nullable String schemaName) {
if (F.isEmpty(schemaName))
schemaName = QueryUtils.DFLT_SCHEMA;
return schemaName;
}
/**
* Sets up query object with settings from current client context state and handler state.
*
* @param qry Query to setup.
* @param schemaName Schema name.
*/
private void setupQuery(SqlFieldsQueryEx qry, String schemaName) {
qry.setDistributedJoins(cliCtx.isDistributedJoins());
qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
qry.setCollocated(cliCtx.isCollocated());
qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
qry.setLazy(cliCtx.isLazy());
qry.setNestedTxMode(nestedTxMode);
qry.setSchema(schemaName);
qry.setQueryInitiatorId(connCtx.clientDescriptor());
if (cliCtx.updateBatchSize() != null)
qry.setUpdateBatchSize(cliCtx.updateBatchSize());
}
/**
* Executes query and updates result counters.
*
* @param qry Query.
* @param updCntsAcc Per query rows updates counter.
* @param firstErr First error data - code and message.
* @param cancel Hook for query cancellation.
* @throws QueryCancelledException If query was cancelled during execution.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach"})
private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> updCntsAcc,
IgniteBiTuple<Integer, String> firstErr, GridQueryCancel cancel) throws QueryCancelledException {
try {
if (cliCtx.isStream()) {
List<Long> cnt = connCtx.kernalContext().query().streamBatchedUpdateQuery(
qry.getSchema(),
cliCtx,
qry.getSql(),
qry.batchedArguments(),
connCtx.clientDescriptor()
);
for (int i = 0; i < cnt.size(); i++)
updCntsAcc.add(cnt.get(i).intValue());
return;
}
List<FieldsQueryCursor<List<?>>> qryRes = connCtx.kernalContext().query().querySqlFields(
null, qry, cliCtx, true, true, cancel);
for (FieldsQueryCursor<List<?>> cur : qryRes) {
if (cur instanceof BulkLoadContextCursor)
throw new IgniteSQLException("COPY command cannot be executed in batch mode.");
assert !((QueryCursorImpl)cur).isQuery();
Iterator<List<?>> it = cur.iterator();
if (it.hasNext()) {
int val = ((Long)it.next().get(0)).intValue();
updCntsAcc.add(val);
}
}
}
catch (Exception e) {
int code;
String msg;
if (X.cause(e, QueryCancelledException.class) != null)
throw new QueryCancelledException();
else if (e instanceof IgniteSQLException) {
BatchUpdateException batchCause = X.cause(e, BatchUpdateException.class);
if (batchCause != null) {
int[] updCntsOnErr = batchCause.getUpdateCounts();
for (int i = 0; i < updCntsOnErr.length; i++)
updCntsAcc.add(updCntsOnErr[i]);
msg = batchCause.getMessage();
code = batchCause.getErrorCode();
}
else {
for (int i = 0; i < qry.batchedArguments().size(); i++)
updCntsAcc.add(Statement.EXECUTE_FAILED);
msg = e.getMessage();
code = ((IgniteSQLException)e).statusCode();
}
}
else {
for (int i = 0; i < qry.batchedArguments().size(); i++)
updCntsAcc.add(Statement.EXECUTE_FAILED);
msg = e.getMessage();
code = IgniteQueryErrorCode.UNKNOWN;
}
if (firstErr.isEmpty())
firstErr.set(code, msg);
else
U.error(log, "Failed to execute batch query [qry=" + qry + ']', e);
}
}
/**
* @param req Get tables metadata request.
* @return Response.
*/
private JdbcResponse getTablesMeta(JdbcMetaTablesRequest req) {
try {
List<JdbcTableMeta> tabMetas = meta.getTablesMeta(req.schemaName(), req.tableName(), req.tableTypes());
JdbcMetaTablesResult res = new JdbcMetaTablesResult(tabMetas);
return resultToResonse(res);
}
catch (Exception e) {
U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Get columns metadata request.
* @return Response.
*/
private JdbcResponse getColumnsMeta(JdbcMetaColumnsRequest req) {
try {
Collection<JdbcColumnMeta> colsMeta =
meta.getColumnsMeta(protocolVer, req.schemaName(), req.tableName(), req.columnName());
JdbcMetaColumnsResult res;
if (protocolVer.compareTo(VER_2_7_0) >= 0)
res = new JdbcMetaColumnsResultV4(colsMeta);
else if (protocolVer.compareTo(VER_2_4_0) >= 0)
res = new JdbcMetaColumnsResultV3(colsMeta);
else if (protocolVer.compareTo(VER_2_3_0) >= 0)
res = new JdbcMetaColumnsResultV2(colsMeta);
else
res = new JdbcMetaColumnsResult(colsMeta);
return resultToResonse(res);
}
catch (Exception e) {
U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Handler for updating binary type requests.
*
* @param req Incoming request.
* @return Acknowledgement in case of successful updating.
*/
private JdbcResponse putBinaryType(JdbcBinaryTypePutRequest req) {
try {
getBinaryCtx().updateMetadata(req.meta().typeId(), req.meta(), false);
return resultToResonse(new JdbcUpdateBinarySchemaResult(req.requestId(), true));
}
catch (Exception e) {
U.error(log, "Failed to update binary schema [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Handler for querying binary type requests.
*
* @param req Incoming request.
* @return Response with binary type schema.
*/
private JdbcResponse getBinaryType(JdbcBinaryTypeGetRequest req) {
try {
BinaryTypeImpl type = (BinaryTypeImpl)connCtx.kernalContext().cacheObjects().binary().type(req.typeId());
return resultToResonse(new JdbcBinaryTypeGetResult(req.requestId(), type != null ? type.metadata() : null));
}
catch (Exception e) {
U.error(log, "Failed to get binary type name [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Handler for querying binary type name requests.
*
* @param req Incoming request.
* @return Response with binary type name.
*/
private JdbcResponse getBinaryTypeName(JdbcBinaryTypeNameGetRequest req) {
try {
String name = getMarshallerCtx().getClassName(req.platformId(), req.typeId());
return resultToResonse(new JdbcBinaryTypeNameGetResult(req.requestId(), name));
}
catch (Exception e) {
U.error(log, "Failed to get binary type name [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Handler for register new binary type requests.
*
* @param req Incoming request.
* @return Acknowledgement in case of successful registration.
*/
private JdbcResponse registerBinaryType(JdbcBinaryTypeNamePutRequest req) {
try {
boolean res = getMarshallerCtx().registerClassName(req.platformId(), req.typeId(), req.typeName(), false);
return resultToResonse(new JdbcUpdateBinarySchemaResult(req.requestId(), res));
}
catch (Exception e) {
U.error(log, "Failed to register new type [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Get marshaller context from connection context.
*
* @return Marshaller context.
*/
private MarshallerContext getMarshallerCtx() {
return connCtx.kernalContext().marshallerContext();
}
/**
* Get binary context from connection context.
*
* @return Binary context.
*/
private BinaryContext getBinaryCtx() {
return ((CacheObjectBinaryProcessorImpl)connCtx.kernalContext().cacheObjects()).binaryContext();
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse getIndexesMeta(JdbcMetaIndexesRequest req) {
try {
Collection<JdbcIndexMeta> idxInfos = meta.getIndexesMeta(req.schemaName(), req.tableName());
return resultToResonse(new JdbcMetaIndexesResult(idxInfos));
}
catch (Exception e) {
U.error(log, "Failed to get parameters metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse getParametersMeta(JdbcMetaParamsRequest req) {
String schemaName = prepareSchemaName(req.schemaName());
SqlFieldsQueryEx qry = new SqlFieldsQueryEx(req.sql(), null);
setupQuery(qry, schemaName);
try {
List<JdbcParameterMeta> meta = connCtx.kernalContext().query().getIndexing().
parameterMetaData(schemaName, qry);
JdbcMetaParamsResult res = new JdbcMetaParamsResult(meta);
return resultToResonse(res);
}
catch (Exception e) {
U.error(log, "Failed to get parameters metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse getPrimaryKeys(JdbcMetaPrimaryKeysRequest req) {
try {
Collection<JdbcPrimaryKeyMeta> pkMeta = meta.getPrimaryKeys(req.schemaName(), req.tableName());
return resultToResonse(new JdbcMetaPrimaryKeysResult(pkMeta));
}
catch (Exception e) {
U.error(log, "Failed to get parameters metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse getSchemas(JdbcMetaSchemasRequest req) {
try {
String schemaPtrn = req.schemaName();
SortedSet<String> schemas = meta.getSchemasMeta(schemaPtrn);
return resultToResonse(new JdbcMetaSchemasResult(schemas));
}
catch (Exception e) {
U.error(log, "Failed to get schemas metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Create {@link JdbcResponse} bearing appropriate Ignite specific result code if possible
* from given {@link Exception}.
*
* @param e Exception to convert.
* @return resulting {@link JdbcResponse}.
*/
private JdbcResponse exceptionToResult(Throwable e) {
if (e instanceof QueryCancelledException)
return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, e.getMessage());
if (e instanceof TransactionSerializationException)
return new JdbcResponse(IgniteQueryErrorCode.TRANSACTION_SERIALIZATION_ERROR, e.getMessage());
if (e instanceof TransactionAlreadyCompletedException)
return new JdbcResponse(IgniteQueryErrorCode.TRANSACTION_COMPLETED, e.getMessage());
if (e instanceof TransactionDuplicateKeyException)
return new JdbcResponse(IgniteQueryErrorCode.DUPLICATE_KEY, e.getMessage());
if (e instanceof TransactionMixedModeException)
return new JdbcResponse(IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH, e.getMessage());
if (e instanceof TransactionUnsupportedConcurrencyException)
return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION, e.getMessage());
if (e instanceof IgniteSQLException)
return new JdbcResponse(((IgniteSQLException)e).statusCode(), e.getMessage());
else
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, e.getMessage());
}
/**
* Ordered batch worker.
*/
private class OrderedBatchWorker extends GridWorker {
/**
* Constructor.
*/
OrderedBatchWorker() {
super(connCtx.kernalContext().igniteInstanceName(), "ordered-batch", JdbcRequestHandler.this.log);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
long nextBatchOrder = 0;
while (true) {
if (!cliCtx.isStream())
return;
JdbcOrderedBatchExecuteRequest req;
synchronized (orderedBatchesMux) {
req = orderedBatchesQueue.peek();
if (req == null || req.order() != nextBatchOrder) {
orderedBatchesMux.wait();
continue;
}
else
orderedBatchesQueue.poll();
}
executeBatchOrdered(req);
nextBatchOrder++;
}
}
}
/**
* Cancels query with specified request id;
*
* @param req Query cancellation request;
* @return <code>QueryCancelledException</code> wrapped with <code>JdbcResponse</code>
*/
private JdbcResponse cancelQuery(JdbcQueryCancelRequest req) {
boolean clearCursors = false;
GridQueryCancel cancelHook;
synchronized (reqMux) {
JdbcQueryDescriptor desc = reqRegister.get(req.requestIdToBeCancelled());
// Query was already executed.
if (desc == null)
return null;
// Query was registered, however execution didn't start yet.
else if (!desc.isExecutionStarted()) {
unregisterRequest(req.requestId());
return exceptionToResult(new QueryCancelledException());
}
else {
cancelHook = desc.cancelHook();
desc.markCancelled();
if (desc.usageCount() == 0) {
clearCursors = true;
unregisterRequest(req.requestIdToBeCancelled());
}
}
}
cancelHook.cancel();
if (clearCursors)
clearCursors(req.requestIdToBeCancelled());
return null;
}
/**
* Retrieve cache partitions distributions for given cache Ids.
*
* @param req <code>JdbcCachePartitionsRequest</code> that incapsulates set of cache Ids.
* @return Partitions distributions.
*/
private JdbcResponse getCachePartitions(JdbcCachePartitionsRequest req) {
List<JdbcThinPartitionAwarenessMappingGroup> mappings = new ArrayList<>();
AffinityTopologyVersion topVer = connCtx.kernalContext().cache().context().exchange().readyAffinityVersion();
for (int cacheId : req.cacheIds()) {
Map<UUID, Set<Integer>> partitionsMap = getPartitionsMap(
connCtx.kernalContext().cache().cacheDescriptor(cacheId),
topVer);
mappings.add(new JdbcThinPartitionAwarenessMappingGroup(cacheId, partitionsMap));
}
return new JdbcResponse(new JdbcCachePartitionsResult(mappings), topVer);
}
/**
* Get partition map for a cache.
* @param cacheDesc Cache descriptor.
* @return Partitions mapping for cache.
*/
private Map<UUID, Set<Integer>> getPartitionsMap(DynamicCacheDescriptor cacheDesc, AffinityTopologyVersion affVer) {
GridCacheContext cacheCtx = connCtx.kernalContext().cache().context().cacheContext(cacheDesc.cacheId());
AffinityAssignment assignment = cacheCtx.affinity().assignment(affVer);
Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
HashMap<UUID, Set<Integer>> res = new HashMap<>(nodes.size());
for (ClusterNode node : nodes) {
UUID nodeId = node.id();
Set<Integer> parts = assignment.primaryPartitions(nodeId);
res.put(nodeId, parts);
}
return res;
}
/**
* Checks whether query cancellation is supported whithin given version of protocal.
*
* @return True if supported, false otherwise.
*/
@Override public boolean isCancellationSupported() {
return (protocolVer.compareTo(VER_2_8_0) >= 0);
}
/**
* Unregisters request if there are no cursors binded to it.
*
* @param reqId Reuest to unregist.
*/
private void tryUnregisterRequest(long reqId) {
assert isCancellationSupported();
boolean unregisterReq = true;
for (JdbcCursor cursor : jdbcCursors.values()) {
if (cursor.requestId() == reqId) {
unregisterReq = false;
break;
}
}
if (unregisterReq)
unregisterRequest(reqId);
}
/**
* Tries to close all cursors of request with given id and removes them from jdbcCursors map.
*
* @param reqId Request ID.
*/
private void clearCursors(long reqId) {
for (Iterator<Map.Entry<Long, JdbcCursor>> it = jdbcCursors.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Long, JdbcCursor> entry = it.next();
JdbcCursor cursor = entry.getValue();
if (cursor.requestId() == reqId) {
try {
cursor.close();
}
catch (Exception e) {
U.error(log, "Failed to close cursor [reqId=" + reqId + ", cursor=" + cursor + ']', e);
}
it.remove();
}
}
}
/**
* Checks whether query was cancelled - returns null if true, otherwise increments query descriptor usage count.
*
* @param cur Jdbc Cursor.
* @return False, if query was already cancelled.
*/
private boolean prepareQueryCancellationMeta(JdbcCursor cur) {
if (isCancellationSupported()) {
// Nothing to do - cursor was already removed.
if (cur == null)
return false;
synchronized (reqMux) {
JdbcQueryDescriptor desc = reqRegister.get(cur.requestId());
// Query was already cancelled and unregisterd.
if (desc == null)
return false;
desc.incrementUsageCount();
}
}
return true;
}
/**
* Cleanups cursors or processors and unregistered request if necessary.
*
* @param unregisterReq Flag, that detecs whether it's necessary to unregister request.
* @param reqId Request Id.
*/
private void cleanupQueryCancellationMeta(boolean unregisterReq, long reqId) {
if (isCancellationSupported()) {
boolean clearCursors = false;
synchronized (reqMux) {
JdbcQueryDescriptor desc = reqRegister.get(reqId);
if (desc != null) {
// Query was cancelled during execution.
if (desc.isCanceled()) {
clearCursors = true;
unregisterReq = true;
}
else
desc.decrementUsageCount();
if (unregisterReq)
unregisterRequest(reqId);
}
}
if (clearCursors)
clearCursors(reqId);
}
}
/**
* Create {@link JdbcResponse} wrapping given result and attaching affinity topology version to it if chaned.
*
* @param res Jdbc result.
* @return esulting {@link JdbcResponse}.
*/
private JdbcResponse resultToResonse(JdbcResult res) {
return new JdbcResponse(res, connCtx.getAffinityTopologyVersionIfChanged());
}
/**
* @param partResRequested Boolean flag that signals whether client requested partiton result.
* @param partRes Direved partition result.
* @return True if applicable to jdbc thin client side partition awareness:
* 1. Partitoin result was requested;
* 2. Partition result either null or
* a. Rendezvous affinity function without map filters was used;
* b. Partition result tree neither PartitoinAllNode nor PartitionNoneNode;
*/
private static boolean isClientPartitionAwarenessApplicable(boolean partResRequested, PartitionResult partRes) {
return partResRequested && (partRes == null || partRes.isClientPartitionAwarenessApplicable());
}
/** {@inheritDoc} */
@Override public ClientListenerProtocolVersion protocolVersion() {
return protocolVer;
}
}