blob: 97ce20ac30cf478cdb3e697622834d4eab7e8bac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.odbc.jdbc;
import java.sql.BatchUpdateException;
import java.sql.ParameterMetaData;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteVersionUtils;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.processors.authentication.AuthorizationContext;
import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.odbc.OdbcQueryGetColumnsMetaRequest;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_7_0;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PARAMS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PRIMARY_KEYS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_SCHEMAS;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_TABLES;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH;
import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_META;
/**
* JDBC request handler.
*/
public class JdbcRequestHandler implements ClientListenerRequestHandler {
/** Query ID sequence. */
private static final AtomicLong QRY_ID_GEN = new AtomicLong();
/** Kernel context. */
private final GridKernalContext ctx;
/** Client context. */
private final SqlClientContext cliCtx;
/** Logger. */
private final IgniteLogger log;
/** Busy lock. */
private final GridSpinBusyLock busyLock;
/** Worker. */
private final JdbcRequestHandlerWorker worker;
/** Maximum allowed cursors. */
private final int maxCursors;
/** Current queries cursors. */
private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new ConcurrentHashMap<>();
/** Current bulk load processors. */
private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> bulkLoadRequests = new ConcurrentHashMap<>();
/** Ordered batches queue. */
private final PriorityQueue<JdbcOrderedBatchExecuteRequest> orderedBatchesQueue = new PriorityQueue<>();
/** Ordered batches mutex. */
private final Object orderedBatchesMux = new Object();
/** Response sender. */
private final JdbcResponseSender sender;
/** Automatic close of cursors. */
private final boolean autoCloseCursors;
/** Nested transactions handling mode. */
private final NestedTxMode nestedTxMode;
/** Protocol version. */
private ClientListenerProtocolVersion protocolVer;
/** Authentication context */
private AuthorizationContext actx;
/**
* Constructor.
* @param ctx Context.
* @param busyLock Shutdown latch.
* @param sender Results sender.
* @param maxCursors Maximum allowed cursors.
* @param distributedJoins Distributed joins flag.
* @param enforceJoinOrder Enforce join order flag.
* @param collocated Collocated flag.
* @param replicatedOnly Replicated only flag.
* @param autoCloseCursors Flag to automatically close server cursors.
* @param lazy Lazy query execution flag.
* @param skipReducerOnUpdate Skip reducer on update flag.
* @param actx Authentication context.
* @param protocolVer Protocol version.
*/
public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock,
JdbcResponseSender sender, int maxCursors,
boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly,
boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate, NestedTxMode nestedTxMode,
AuthorizationContext actx, ClientListenerProtocolVersion protocolVer) {
this.ctx = ctx;
this.sender = sender;
Factory<GridWorker> orderedFactory = new Factory<GridWorker>() {
@Override public GridWorker create() {
return new OrderedBatchWorker();
}
};
this.cliCtx = new SqlClientContext(
ctx,
orderedFactory,
distributedJoins,
enforceJoinOrder,
collocated,
replicatedOnly,
lazy,
skipReducerOnUpdate
);
this.busyLock = busyLock;
this.maxCursors = maxCursors;
this.autoCloseCursors = autoCloseCursors;
this.nestedTxMode = nestedTxMode;
this.protocolVer = protocolVer;
this.actx = actx;
log = ctx.log(getClass());
// TODO IGNITE-9484 Do not create worker if there is a possibility to unbind TX from threads.
worker = new JdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, this, ctx);
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handle(ClientListenerRequest req0) {
assert req0 != null;
assert req0 instanceof JdbcRequest;
JdbcRequest req = (JdbcRequest)req0;
if (!MvccUtils.mvccEnabled(ctx))
return doHandle(req);
else {
GridFutureAdapter<ClientListenerResponse> fut = worker.process(req);
try {
return fut.get();
}
catch (IgniteCheckedException e) {
return exceptionToResult(e);
}
}
}
/**
* Start worker, if it's present.
*/
void start() {
if (worker != null)
worker.start();
}
/**
* Actually handle the request.
* @param req Request.
* @return Request handling result.
*/
ClientListenerResponse doHandle(JdbcRequest req) {
if (!busyLock.enterBusy())
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to handle JDBC request because node is stopping.");
if (actx != null)
AuthorizationContext.context(actx);
try {
switch (req.type()) {
case QRY_EXEC:
return executeQuery((JdbcQueryExecuteRequest)req);
case QRY_FETCH:
return fetchQuery((JdbcQueryFetchRequest)req);
case QRY_CLOSE:
return closeQuery((JdbcQueryCloseRequest)req);
case QRY_META:
return getQueryMeta((JdbcQueryMetadataRequest)req);
case BATCH_EXEC:
return executeBatch((JdbcBatchExecuteRequest)req);
case BATCH_EXEC_ORDERED:
return dispatchBatchOrdered((JdbcOrderedBatchExecuteRequest)req);
case META_TABLES:
return getTablesMeta((JdbcMetaTablesRequest)req);
case META_COLUMNS:
return getColumnsMeta((JdbcMetaColumnsRequest)req);
case META_INDEXES:
return getIndexesMeta((JdbcMetaIndexesRequest)req);
case META_PARAMS:
return getParametersMeta((JdbcMetaParamsRequest)req);
case META_PRIMARY_KEYS:
return getPrimaryKeys((JdbcMetaPrimaryKeysRequest)req);
case META_SCHEMAS:
return getSchemas((JdbcMetaSchemasRequest)req);
case BULK_LOAD_BATCH:
return processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req);
}
return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION,
"Unsupported JDBC request [req=" + req + ']');
}
finally {
AuthorizationContext.clear();
busyLock.leaveBusy();
}
}
/**
* @param req Ordered batch request.
* @return Response.
*/
private ClientListenerResponse dispatchBatchOrdered(JdbcOrderedBatchExecuteRequest req) {
synchronized (orderedBatchesMux) {
orderedBatchesQueue.add(req);
orderedBatchesMux.notify();
}
if (!cliCtx.isStreamOrdered())
executeBatchOrdered(req);
return null;
}
/**
* @param req Ordered batch request.
* @return Response.
*/
private ClientListenerResponse executeBatchOrdered(JdbcOrderedBatchExecuteRequest req) {
try {
if (req.isLastStreamBatch())
cliCtx.waitTotalProcessedOrderedRequests(req.order());
JdbcResponse resp = (JdbcResponse)executeBatch(req);
if (resp.response() instanceof JdbcBatchExecuteResult) {
resp = new JdbcResponse(
new JdbcOrderedBatchExecuteResult((JdbcBatchExecuteResult)resp.response(), req.order()));
}
sender.send(resp);
} catch (Exception e) {
U.error(null, "Error processing file batch", e);
sender.send(new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e));
}
synchronized (orderedBatchesMux) {
orderedBatchesQueue.poll();
}
cliCtx.orderedRequestProcessed();
return null;
}
/**
* Processes a file batch sent from client as part of bulk load COPY command.
*
* @param req Request object with a batch of a file received from client.
* @return Response to send to the client.
*/
private ClientListenerResponse processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) {
JdbcBulkLoadProcessor processor = bulkLoadRequests.get(req.queryId());
if (ctx == null)
return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, "Unknown query ID: "
+ req.queryId() + ". Bulk load session may have been reclaimed due to timeout.");
try {
processor.processBatch(req);
switch (req.cmd()) {
case CMD_FINISHED_ERROR:
case CMD_FINISHED_EOF:
bulkLoadRequests.remove(req.queryId());
processor.close();
break;
case CMD_CONTINUE:
break;
default:
throw new IllegalArgumentException();
}
return new JdbcResponse(new JdbcQueryExecuteResult(req.queryId(), processor.updateCnt()));
}
catch (Exception e) {
U.error(null, "Error processing file batch", e);
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e);
}
}
/** {@inheritDoc} */
@Override public ClientListenerResponse handleException(Exception e, ClientListenerRequest req) {
return exceptionToResult(e);
}
/** {@inheritDoc} */
@Override public void writeHandshake(BinaryWriterExImpl writer) {
// Handshake OK.
writer.writeBoolean(true);
// Write server version.
writer.writeByte(IgniteVersionUtils.VER.major());
writer.writeByte(IgniteVersionUtils.VER.minor());
writer.writeByte(IgniteVersionUtils.VER.maintenance());
writer.writeString(IgniteVersionUtils.VER.stage());
writer.writeLong(IgniteVersionUtils.VER.revisionTimestamp());
writer.writeByteArray(IgniteVersionUtils.VER.revisionHash());
}
/**
* Called whenever client is disconnected due to correct connection close
* or due to {@code IOException} during network operations.
*/
public void onDisconnect() {
if (busyLock.enterBusy())
{
if (worker != null) {
worker.cancel();
try {
worker.join();
}
catch (InterruptedException e) {
// No-op.
}
}
try
{
for (JdbcQueryCursor cursor : qryCursors.values())
cursor.close();
for (JdbcBulkLoadProcessor processor : bulkLoadRequests.values()) {
try {
processor.close();
}
catch (Exception e) {
U.error(null, "Error closing JDBC bulk load processor.", e);
}
}
bulkLoadRequests.clear();
U.close(cliCtx, log);
}
finally {
busyLock.leaveBusy();
}
}
}
/**
* {@link JdbcQueryExecuteRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
@SuppressWarnings("unchecked")
private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) {
int cursorCnt = qryCursors.size();
if (maxCursors > 0 && cursorCnt >= maxCursors)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close other " +
"open cursors or increase the limit through " +
"ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors +
", current=" + cursorCnt + ']');
long qryId = QRY_ID_GEN.getAndIncrement();
assert !cliCtx.isStream();
try {
String sql = req.sqlQuery();
SqlFieldsQueryEx qry;
switch(req.expectedStatementType()) {
case ANY_STATEMENT_TYPE:
qry = new SqlFieldsQueryEx(sql, null);
break;
case SELECT_STATEMENT_TYPE:
qry = new SqlFieldsQueryEx(sql, true);
break;
default:
assert req.expectedStatementType() == JdbcStatementType.UPDATE_STMT_TYPE;
qry = new SqlFieldsQueryEx(sql, false);
if (cliCtx.isSkipReducerOnUpdate())
((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true);
}
qry.setArgs(req.arguments());
qry.setDistributedJoins(cliCtx.isDistributedJoins());
qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
qry.setCollocated(cliCtx.isCollocated());
qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
qry.setLazy(cliCtx.isLazy());
qry.setNestedTxMode(nestedTxMode);
qry.setAutoCommit(req.autoCommit());
if (req.pageSize() <= 0)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid fetch size: " + req.pageSize());
qry.setPageSize(req.pageSize());
String schemaName = req.schemaName();
if (F.isEmpty(schemaName))
schemaName = QueryUtils.DFLT_SCHEMA;
qry.setSchema(schemaName);
List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFields(null, qry, cliCtx, true,
protocolVer.compareTo(VER_2_3_0) < 0);
FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
if (fieldsCur instanceof BulkLoadContextCursor) {
BulkLoadContextCursor blCur = (BulkLoadContextCursor) fieldsCur;
BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor();
BulkLoadAckClientParameters clientParams = blCur.clientParams();
bulkLoadRequests.put(qryId, new JdbcBulkLoadProcessor(blProcessor));
return new JdbcResponse(new JdbcBulkLoadAckResult(qryId, clientParams));
}
if (results.size() == 1) {
JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(),
(QueryCursorImpl)fieldsCur);
JdbcQueryExecuteResult res;
if (cur.isQuery())
res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext());
else {
List<List<Object>> items = cur.fetchRows();
assert items != null && items.size() == 1 && items.get(0).size() == 1
&& items.get(0).get(0) instanceof Long :
"Invalid result set for not-SELECT query. [qry=" + sql +
", res=" + S.toString(List.class, items) + ']';
res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0));
}
if (res.last() && (!res.isQuery() || autoCloseCursors))
cur.close();
else
qryCursors.put(qryId, cur);
return new JdbcResponse(res);
}
else {
List<JdbcResultInfo> jdbcResults = new ArrayList<>(results.size());
List<List<Object>> items = null;
boolean last = true;
for (FieldsQueryCursor<List<?>> c : results) {
QueryCursorImpl qryCur = (QueryCursorImpl)c;
JdbcResultInfo jdbcRes;
if (qryCur.isQuery()) {
jdbcRes = new JdbcResultInfo(true, -1, qryId);
JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), qryCur);
qryCursors.put(qryId, cur);
qryId = QRY_ID_GEN.getAndIncrement();
if (items == null) {
items = cur.fetchRows();
last = cur.hasNext();
}
}
else
jdbcRes = new JdbcResultInfo(false, (Long)((List<?>)qryCur.getAll().get(0)).get(0), -1);
jdbcResults.add(jdbcRes);
}
return new JdbcResponse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last));
}
}
catch (Exception e) {
qryCursors.remove(qryId);
U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link JdbcQueryCloseRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private JdbcResponse closeQuery(JdbcQueryCloseRequest req) {
try {
JdbcQueryCursor cur = qryCursors.remove(req.queryId());
if (cur == null)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query cursor with ID: " + req.queryId());
cur.close();
return new JdbcResponse(null);
}
catch (Exception e) {
qryCursors.remove(req.queryId());
U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link JdbcQueryFetchRequest} command handler.
*
* @param req Execute query request.
* @return Response.
*/
private JdbcResponse fetchQuery(JdbcQueryFetchRequest req) {
try {
JdbcQueryCursor cur = qryCursors.get(req.queryId());
if (cur == null)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query cursor with ID: " + req.queryId());
if (req.pageSize() <= 0)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Invalid fetch size : [fetchSize=" + req.pageSize() + ']');
cur.pageSize(req.pageSize());
JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext());
if (res.last() && (!cur.isQuery() || autoCloseCursors)) {
qryCursors.remove(req.queryId());
cur.close();
}
return new JdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private JdbcResponse getQueryMeta(JdbcQueryMetadataRequest req) {
try {
JdbcQueryCursor cur = qryCursors.get(req.queryId());
if (cur == null)
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN,
"Failed to find query with ID: " + req.queryId());
JdbcQueryMetadataResult res = new JdbcQueryMetadataResult(req.queryId(),
cur.meta());
return new JdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private ClientListenerResponse executeBatch(JdbcBatchExecuteRequest req) {
String schemaName = req.schemaName();
if (F.isEmpty(schemaName))
schemaName = QueryUtils.DFLT_SCHEMA;
int qryCnt = req.queries().size();
List<Integer> updCntsAcc = new ArrayList<>(qryCnt);
// Send back only the first error. Others will be written to the log.
IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>();
SqlFieldsQueryEx qry = null;
for (JdbcQuery q : req.queries()) {
if (q.sql() != null) { // If we have a new query string in the batch,
if (qry != null) // then execute the previous sub-batch and create a new SqlFieldsQueryEx.
executeBatchedQuery(qry, updCntsAcc, firstErr);
qry = new SqlFieldsQueryEx(q.sql(), false);
qry.setDistributedJoins(cliCtx.isDistributedJoins());
qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
qry.setCollocated(cliCtx.isCollocated());
qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
qry.setLazy(cliCtx.isLazy());
qry.setNestedTxMode(nestedTxMode);
qry.setAutoCommit(req.autoCommit());
qry.setSchema(schemaName);
}
assert qry != null;
qry.addBatchedArgs(q.args());
}
if (qry != null)
executeBatchedQuery(qry, updCntsAcc, firstErr);
if (req.isLastStreamBatch())
cliCtx.disableStreaming();
int updCnts[] = U.toIntArray(updCntsAcc);
if (firstErr.isEmpty())
return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, ClientListenerResponse.STATUS_SUCCESS, null));
else
return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, firstErr.getKey(), firstErr.getValue()));
}
/**
* Executes query and updates result counters.
*
* @param qry Query.
* @param updCntsAcc Per query rows updates counter.
* @param firstErr First error data - code and message.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> updCntsAcc,
IgniteBiTuple<Integer, String> firstErr) {
try {
if (cliCtx.isStream()) {
List<Long> cnt = ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(),
qry.batchedArguments());
for (int i = 0; i < cnt.size(); i++)
updCntsAcc.add(cnt.get(i).intValue());
return;
}
List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(null, qry, cliCtx, true, true);
for (FieldsQueryCursor<List<?>> cur : qryRes) {
if (cur instanceof BulkLoadContextCursor)
throw new IgniteSQLException("COPY command cannot be executed in batch mode.");
assert !((QueryCursorImpl)cur).isQuery();
Iterator<List<?>> it = cur.iterator();
if (it.hasNext()) {
int val = ((Long)it.next().get(0)).intValue();
updCntsAcc.add(val);
}
}
}
catch (Exception e) {
int code;
String msg;
if (e instanceof IgniteSQLException) {
BatchUpdateException batchCause = X.cause(e, BatchUpdateException.class);
if (batchCause != null) {
int[] updCntsOnErr = batchCause.getUpdateCounts();
for (int i = 0; i < updCntsOnErr.length; i++)
updCntsAcc.add(updCntsOnErr[i]);
msg = batchCause.getMessage();
code = batchCause.getErrorCode();
}
else {
for (int i = 0; i < qry.batchedArguments().size(); i++)
updCntsAcc.add(Statement.EXECUTE_FAILED);
msg = e.getMessage();
code = ((IgniteSQLException)e).statusCode();
}
}
else {
for (int i = 0; i < qry.batchedArguments().size(); i++)
updCntsAcc.add(Statement.EXECUTE_FAILED);
msg = e.getMessage();
code = IgniteQueryErrorCode.UNKNOWN;
}
if (firstErr.isEmpty())
firstErr.set(code, msg);
else
U.error(log, "Failed to execute batch query [qry=" + qry +']', e);
}
}
/**
* @param req Get tables metadata request.
* @return Response.
*/
private JdbcResponse getTablesMeta(JdbcMetaTablesRequest req) {
try {
List<JdbcTableMeta> meta = new ArrayList<>();
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (!matches(table.schemaName(), req.schemaName()))
continue;
if (!matches(table.tableName(), req.tableName()))
continue;
JdbcTableMeta tableMeta = new JdbcTableMeta(table.schemaName(), table.tableName(), "TABLE");
if (!meta.contains(tableMeta))
meta.add(tableMeta);
}
}
JdbcMetaTablesResult res = new JdbcMetaTablesResult(meta);
return new JdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* {@link OdbcQueryGetColumnsMetaRequest} command handler.
*
* @param req Get columns metadata request.
* @return Response.
*/
@SuppressWarnings("unchecked")
private JdbcResponse getColumnsMeta(JdbcMetaColumnsRequest req) {
try {
Collection<JdbcColumnMeta> meta = new LinkedHashSet<>();
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (!matches(table.schemaName(), req.schemaName()))
continue;
if (!matches(table.tableName(), req.tableName()))
continue;
for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
String colName = field.getKey();
if (!matches(colName, req.columnName()))
continue;
JdbcColumnMeta columnMeta;
if (protocolVer.compareTo(VER_2_7_0) >= 0) {
GridQueryProperty prop = table.property(colName);
columnMeta = new JdbcColumnMetaV4(table.schemaName(), table.tableName(),
field.getKey(), field.getValue(), !prop.notNull(), prop.defaultValue(),
prop.precision(), prop.scale());
}
else if (protocolVer.compareTo(VER_2_4_0) >= 0) {
GridQueryProperty prop = table.property(colName);
columnMeta = new JdbcColumnMetaV3(table.schemaName(), table.tableName(),
field.getKey(), field.getValue(), !prop.notNull(), prop.defaultValue());
}
else if (protocolVer.compareTo(VER_2_3_0) >= 0) {
GridQueryProperty prop = table.property(colName);
columnMeta = new JdbcColumnMetaV2(table.schemaName(), table.tableName(),
field.getKey(), field.getValue(), !prop.notNull());
}
else
columnMeta = new JdbcColumnMeta(table.schemaName(), table.tableName(),
field.getKey(), field.getValue());
if (!meta.contains(columnMeta))
meta.add(columnMeta);
}
}
}
JdbcMetaColumnsResult res;
if (protocolVer.compareTo(VER_2_7_0) >= 0)
res = new JdbcMetaColumnsResultV4(meta);
else if (protocolVer.compareTo(VER_2_4_0) >= 0)
res = new JdbcMetaColumnsResultV3(meta);
else if (protocolVer.compareTo(VER_2_3_0) >= 0)
res = new JdbcMetaColumnsResultV2(meta);
else
res = new JdbcMetaColumnsResult(meta);
return new JdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private ClientListenerResponse getIndexesMeta(JdbcMetaIndexesRequest req) {
try {
Collection<JdbcIndexMeta> meta = new HashSet<>();
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (!matches(table.schemaName(), req.schemaName()))
continue;
if (!matches(table.tableName(), req.tableName()))
continue;
for (GridQueryIndexDescriptor idxDesc : table.indexes().values())
meta.add(new JdbcIndexMeta(table.schemaName(), table.tableName(), idxDesc));
}
}
return new JdbcResponse(new JdbcMetaIndexesResult(meta));
}
catch (Exception e) {
U.error(log, "Failed to get parameters metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private ClientListenerResponse getParametersMeta(JdbcMetaParamsRequest req) {
try {
ParameterMetaData paramMeta = ctx.query().prepareNativeStatement(req.schemaName(), req.sql())
.getParameterMetaData();
int size = paramMeta.getParameterCount();
List<JdbcParameterMeta> meta = new ArrayList<>(size);
for (int i = 0; i < size; i++)
meta.add(new JdbcParameterMeta(paramMeta, i + 1));
JdbcMetaParamsResult res = new JdbcMetaParamsResult(meta);
return new JdbcResponse(res);
}
catch (Exception e) {
U.error(log, "Failed to get parameters metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private ClientListenerResponse getPrimaryKeys(JdbcMetaPrimaryKeysRequest req) {
try {
Collection<JdbcPrimaryKeyMeta> meta = new HashSet<>();
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (!matches(table.schemaName(), req.schemaName()))
continue;
if (!matches(table.tableName(), req.tableName()))
continue;
List<String> fields = new ArrayList<>();
for (String field : table.fields().keySet()) {
if (table.property(field).key())
fields.add(field);
}
final String keyName = table.keyFieldName() == null ?
"PK_" + table.schemaName() + "_" + table.tableName() :
table.keyFieldName();
if (fields.isEmpty()) {
meta.add(new JdbcPrimaryKeyMeta(table.schemaName(), table.tableName(), keyName,
Collections.singletonList("_KEY")));
}
else
meta.add(new JdbcPrimaryKeyMeta(table.schemaName(), table.tableName(), keyName, fields));
}
}
return new JdbcResponse(new JdbcMetaPrimaryKeysResult(meta));
}
catch (Exception e) {
U.error(log, "Failed to get parameters metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* @param req Request.
* @return Response.
*/
private ClientListenerResponse getSchemas(JdbcMetaSchemasRequest req) {
try {
String schemaPtrn = req.schemaName();
Set<String> schemas = new HashSet<>();
for (String cacheName : ctx.cache().publicCacheNames()) {
for (GridQueryTypeDescriptor table : ctx.query().types(cacheName)) {
if (matches(table.schemaName(), schemaPtrn))
schemas.add(table.schemaName());
}
}
return new JdbcResponse(new JdbcMetaSchemasResult(schemas));
}
catch (Exception e) {
U.error(log, "Failed to get schemas metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
return exceptionToResult(e);
}
}
/**
* Checks whether string matches SQL pattern.
*
* @param str String.
* @param ptrn Pattern.
* @return Whether string matches pattern.
*/
private static boolean matches(String str, String ptrn) {
return str != null && (F.isEmpty(ptrn) ||
str.matches(ptrn.replace("%", ".*").replace("_", ".")));
}
/**
* Create {@link JdbcResponse} bearing appropriate Ignite specific result code if possible
* from given {@link Exception}.
*
* @param e Exception to convert.
* @return resulting {@link JdbcResponse}.
*/
private JdbcResponse exceptionToResult(Exception e) {
if (e instanceof IgniteSQLException)
return new JdbcResponse(((IgniteSQLException) e).statusCode(), e.getMessage());
else
return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, e.toString());
}
/**
* Ordered batch worker.
*/
private class OrderedBatchWorker extends GridWorker {
/**
* Constructor.
*/
OrderedBatchWorker() {
super(ctx.igniteInstanceName(), "ordered-batch", JdbcRequestHandler.this.log);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
long nextBatchOrder = 0;
while (true) {
if (!cliCtx.isStream())
return;
JdbcOrderedBatchExecuteRequest req;
synchronized (orderedBatchesMux) {
req = orderedBatchesQueue.peek();
if (req == null || req.order() != nextBatchOrder) {
orderedBatchesMux.wait();
continue;
}
}
executeBatchOrdered(req);
nextBatchOrder++;
}
}
}
}