blob: 591fb0a32cd21a8238e412d23cf34ceea1f2a3b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.jdbc2;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.util.typedef.F;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.FETCH_FORWARD;
import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
import static org.apache.ignite.internal.jdbc2.JdbcUtils.convertToSqlException;
/**
* JDBC statement implementation.
*/
public class JdbcStatement implements Statement {
/** Default fetch size. */
private static final int DFLT_FETCH_SIZE = 1024;
/** Connection. */
protected final JdbcConnection conn;
/** Closed flag. */
private boolean closed;
/** Rows limit. */
private int maxRows;
/** Query arguments. */
protected ArrayList<Object> args;
/** Fetch size. */
private int fetchSize = DFLT_FETCH_SIZE;
/** Batch of statements. */
private List<String> batch;
/** Results. */
protected List<JdbcResultSet> results;
/** Current result set index. */
protected int curRes = 0;
/**
* Creates new statement.
*
* @param conn Connection.
*/
JdbcStatement(JdbcConnection conn) {
assert conn != null;
this.conn = conn;
}
/** {@inheritDoc} */
@Override public ResultSet executeQuery(String sql) throws SQLException {
execute0(sql, true);
return getResultSet();
}
/**
* @param sql SQL query.
* @param isQuery Expected type of statements are contained in the query.
* @throws SQLException On error.
*/
private void executeMultipleStatement(String sql, Boolean isQuery) throws SQLException {
ensureNotClosed();
closeResults();
if (F.isEmpty(sql))
throw new SQLException("SQL query is empty");
Ignite ignite = conn.ignite();
UUID nodeId = conn.nodeId();
boolean loc = nodeId == null;
JdbcQueryMultipleStatementsTask qryTask;
if (conn.isMultipleStatementsTaskV3Supported()) {
qryTask = new JdbcQueryMultipleStatementsTaskV3(loc ? ignite : null, conn.schemaName(),
sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(),
conn.isCollocatedQuery(), conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(),
conn.isMultipleStatementsAllowed(), conn.clientDescriptor());
}
else if (!conn.isMultipleStatementsAllowed() && conn.isMultipleStatementsTaskV2Supported()) {
qryTask = new JdbcQueryMultipleStatementsNotAllowTask(loc ? ignite : null, conn.schemaName(),
sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(), conn.isCollocatedQuery(),
conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
}
else {
qryTask = new JdbcQueryMultipleStatementsTask(loc ? ignite : null, conn.schemaName(),
sql, isQuery, loc, getArgs(), fetchSize, conn.isLocalQuery(), conn.isCollocatedQuery(),
conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy());
}
try {
List<JdbcStatementResultInfo> rsInfos =
loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
results = new ArrayList<>(rsInfos.size());
for (JdbcStatementResultInfo rsInfo : rsInfos) {
if (rsInfo.isQuery())
results.add(new JdbcResultSet(true, rsInfo.queryId(), this, null, null, null, null, false));
else
results.add(new JdbcResultSet(this, rsInfo.updateCount()));
}
}
catch (Exception e) {
throw convertToSqlException(e, "Failed to query Ignite.");
}
}
/**
* @param sql SQL query.
* @param isQuery Expected type of statements are contained in the query.
* @throws SQLException On error.
*/
private void executeSingle(String sql, Boolean isQuery) throws SQLException {
ensureNotClosed();
Ignite ignite = conn.ignite();
UUID nodeId = conn.nodeId();
UUID uuid = UUID.randomUUID();
boolean loc = nodeId == null;
if (!conn.isDmlSupported())
if (isQuery != null && !isQuery)
throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
else
isQuery = true;
JdbcQueryTask qryTask = JdbcQueryTaskV3.createTask(loc ? ignite : null, conn.cacheName(), conn.schemaName(),
sql, isQuery, loc, getArgs(), fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(),
conn.isDistributedJoins(), conn.isEnforceJoinOrder(), conn.isLazy(), false, conn.skipReducerOnUpdate());
try {
JdbcQueryTaskResult qryRes =
loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
JdbcResultSet rs = new JdbcResultSet(qryRes.isQuery(), uuid, this, qryRes.getTbls(), qryRes.getCols(),
qryRes.getTypes(), qryRes.getRows(), qryRes.isFinished());
rs.setFetchSize(fetchSize);
results = Collections.singletonList(rs);
curRes = 0;
}
catch (Exception e) {
throw convertToSqlException(e, "Failed to query Ignite.");
}
}
/**
* Sets parameters to query object. Logic should be consistent with {@link JdbcQueryTask#call()} and {@link
* JdbcQueryMultipleStatementsTask#call()}. Parameters are determined from context: connection state and this
* statement state.
*
* @param qry query which parameters to set up
*/
protected void setupQuery(SqlFieldsQuery qry) {
qry.setPageSize(fetchSize);
qry.setLocal(conn.nodeId() == null);
qry.setCollocated(conn.isCollocatedQuery());
qry.setDistributedJoins(conn.isDistributedJoins());
qry.setEnforceJoinOrder(conn.isEnforceJoinOrder());
qry.setLazy(conn.isLazy());
qry.setSchema(conn.schemaName());
}
/**
* @param sql SQL query.
* @param isQuery Expected type of statements are contained in the query.
* @throws SQLException On error.
*/
protected void execute0(String sql, Boolean isQuery) throws SQLException {
if (conn.isMultipleStatementsSupported())
executeMultipleStatement(sql, isQuery);
else
executeSingle(sql, isQuery);
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql) throws SQLException {
execute0(sql, false);
return getUpdateCount();
}
/** {@inheritDoc} */
@Override public void close() throws SQLException {
conn.statements.remove(this);
closeInternal();
}
/**
* Marks statement as closed and closes all result sets.
* @throws SQLException On error.
*/
void closeInternal() throws SQLException {
closeResults();
closed = true;
}
/** {@inheritDoc} */
@Override public int getMaxFieldSize() throws SQLException {
ensureNotClosed();
return 0;
}
/** {@inheritDoc} */
@Override public void setMaxFieldSize(int max) throws SQLException {
ensureNotClosed();
throw new SQLFeatureNotSupportedException("Field size limitation is not supported.");
}
/** {@inheritDoc} */
@Override public int getMaxRows() throws SQLException {
ensureNotClosed();
return maxRows;
}
/** {@inheritDoc} */
@Override public void setMaxRows(int maxRows) throws SQLException {
ensureNotClosed();
this.maxRows = maxRows;
}
/** {@inheritDoc} */
@Override public void setEscapeProcessing(boolean enable) throws SQLException {
ensureNotClosed();
}
/** {@inheritDoc} */
@Override public int getQueryTimeout() throws SQLException {
ensureNotClosed();
throw new SQLFeatureNotSupportedException("Query timeout is not supported.");
}
/** {@inheritDoc} */
@Override public void setQueryTimeout(int timeout) throws SQLException {
ensureNotClosed();
throw new SQLFeatureNotSupportedException("Query timeout is not supported.");
}
/** {@inheritDoc} */
@Override public void cancel() throws SQLException {
ensureNotClosed();
throw new SQLFeatureNotSupportedException("Cancellation is not supported.");
}
/** {@inheritDoc} */
@Override public SQLWarning getWarnings() throws SQLException {
ensureNotClosed();
return null;
}
/** {@inheritDoc} */
@Override public void clearWarnings() throws SQLException {
ensureNotClosed();
}
/** {@inheritDoc} */
@Override public void setCursorName(String name) throws SQLException {
ensureNotClosed();
throw new SQLFeatureNotSupportedException("Updates are not supported.");
}
/** {@inheritDoc} */
@Override public boolean execute(String sql) throws SQLException {
execute0(sql, null);
return results.get(0).isQuery();
}
/** {@inheritDoc} */
@Override public ResultSet getResultSet() throws SQLException {
JdbcResultSet rs = nextResultSet();
if (rs == null)
return null;
if (!rs.isQuery()) {
curRes--;
return null;
}
return rs;
}
/** {@inheritDoc} */
@Override public int getUpdateCount() throws SQLException {
JdbcResultSet rs = nextResultSet();
if (rs == null)
return -1;
if (rs.isQuery()) {
curRes--;
return -1;
}
return (int)rs.updateCount();
}
/** {@inheritDoc} */
@Override public boolean getMoreResults() throws SQLException {
ensureNotClosed();
return getMoreResults(CLOSE_CURRENT_RESULT);
}
/** {@inheritDoc} */
@Override public void setFetchDirection(int direction) throws SQLException {
ensureNotClosed();
if (direction != FETCH_FORWARD)
throw new SQLFeatureNotSupportedException("Only forward direction is supported");
}
/** {@inheritDoc} */
@Override public int getFetchDirection() throws SQLException {
ensureNotClosed();
return FETCH_FORWARD;
}
/** {@inheritDoc} */
@Override public void setFetchSize(int fetchSize) throws SQLException {
ensureNotClosed();
if (fetchSize < 0)
throw new SQLException("Fetch size must be greater or equal zero.");
this.fetchSize = fetchSize;
}
/** {@inheritDoc} */
@Override public int getFetchSize() throws SQLException {
ensureNotClosed();
return fetchSize;
}
/** {@inheritDoc} */
@Override public int getResultSetConcurrency() throws SQLException {
ensureNotClosed();
return CONCUR_READ_ONLY;
}
/** {@inheritDoc} */
@Override public int getResultSetType() throws SQLException {
ensureNotClosed();
return TYPE_FORWARD_ONLY;
}
/** {@inheritDoc} */
@Override public void addBatch(String sql) throws SQLException {
ensureNotClosed();
if (F.isEmpty(sql))
throw new SQLException("SQL query is empty");
if (batch == null)
batch = new ArrayList<>();
batch.add(sql);
}
/** {@inheritDoc} */
@Override public void clearBatch() throws SQLException {
ensureNotClosed();
batch = null;
}
/** {@inheritDoc} */
@Override public int[] executeBatch() throws SQLException {
ensureNotClosed();
List<String> batch = this.batch;
this.batch = null;
return doBatchUpdate(null, batch, null);
}
/**
* Runs batch of update commands.
*
* @param command SQL command.
* @param batch Batch of SQL commands.
* @param batchArgs Batch of SQL parameters.
* @return Number of affected rows.
* @throws SQLException If failed.
*/
protected int[] doBatchUpdate(String command, List<String> batch, List<List<Object>> batchArgs)
throws SQLException {
closeResults();
if ((F.isEmpty(command) || F.isEmpty(batchArgs)) && F.isEmpty(batch))
throw new SQLException("Batch is empty.");
Ignite ignite = conn.ignite();
UUID nodeId = conn.nodeId();
boolean loc = nodeId == null;
if (!conn.isDmlSupported())
throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
JdbcBatchUpdateTask task = new JdbcBatchUpdateTask(loc ? ignite : null, conn.cacheName(),
conn.schemaName(), command, batch, batchArgs, getFetchSize(), conn.isLocalQuery(),
conn.isCollocatedQuery(), conn.isDistributedJoins());
try {
int[] res = loc ? task.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(task);
long updateCnt = F.isEmpty(res) ? -1 : res[res.length - 1];
results = Collections.singletonList(new JdbcResultSet(this, updateCnt));
curRes = 0;
return res;
}
catch (Exception e) {
throw convertToSqlException(e, "Failed to query Ignite.");
}
}
/** {@inheritDoc} */
@Override public Connection getConnection() throws SQLException {
ensureNotClosed();
return conn;
}
/** {@inheritDoc} */
@Override public boolean getMoreResults(int curr) throws SQLException {
ensureNotClosed();
if (results != null) {
assert curRes <= results.size() : "Invalid results state: [resultsCount=" + results.size() +
", curRes=" + curRes + ']';
switch (curr) {
case CLOSE_CURRENT_RESULT:
if (curRes > 0)
results.get(curRes - 1).close();
break;
case CLOSE_ALL_RESULTS:
for (int i = 0; i < curRes; ++i)
results.get(i).close();
break;
case KEEP_CURRENT_RESULT:
break;
default:
throw new SQLException("Invalid 'current' parameter.");
}
}
return (results != null && curRes < results.size());
}
/** {@inheritDoc} */
@Override public ResultSet getGeneratedKeys() throws SQLException {
ensureNotClosed();
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
ensureNotClosed();
if (autoGeneratedKeys == RETURN_GENERATED_KEYS)
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return executeUpdate(sql);
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql, int[] colIndexes) throws SQLException {
ensureNotClosed();
if (!F.isEmpty(colIndexes))
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return executeUpdate(sql);
}
/** {@inheritDoc} */
@Override public int executeUpdate(String sql, String[] colNames) throws SQLException {
ensureNotClosed();
if (!F.isEmpty(colNames))
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return executeUpdate(sql);
}
/** {@inheritDoc} */
@Override public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
ensureNotClosed();
if (autoGeneratedKeys == RETURN_GENERATED_KEYS)
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return execute(sql);
}
/** {@inheritDoc} */
@Override public boolean execute(String sql, int[] colIndexes) throws SQLException {
ensureNotClosed();
if (!F.isEmpty(colIndexes))
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return execute(sql);
}
/** {@inheritDoc} */
@Override public boolean execute(String sql, String[] colNames) throws SQLException {
ensureNotClosed();
if (!F.isEmpty(colNames))
throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
return execute(sql);
}
/** {@inheritDoc} */
@Override public int getResultSetHoldability() throws SQLException {
ensureNotClosed();
return HOLD_CURSORS_OVER_COMMIT;
}
/** {@inheritDoc} */
@Override public boolean isClosed() throws SQLException {
return closed;
}
/** {@inheritDoc} */
@Override public void setPoolable(boolean poolable) throws SQLException {
ensureNotClosed();
if (poolable)
throw new SQLFeatureNotSupportedException("Pooling is not supported.");
}
/** {@inheritDoc} */
@Override public boolean isPoolable() throws SQLException {
ensureNotClosed();
return false;
}
/** {@inheritDoc} */
@Override public <T> T unwrap(Class<T> iface) throws SQLException {
if (!isWrapperFor(iface))
throw new SQLException("Statement is not a wrapper for " + iface.getName());
return (T)this;
}
/** {@inheritDoc} */
@Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface != null && iface == Statement.class;
}
/** {@inheritDoc} */
@Override public void closeOnCompletion() throws SQLException {
throw new SQLFeatureNotSupportedException("closeOnCompletion is not supported.");
}
/** {@inheritDoc} */
@Override public boolean isCloseOnCompletion() throws SQLException {
ensureNotClosed();
return false;
}
/**
* @return Args for current statement
*/
protected final Object[] getArgs() {
return args != null ? args.toArray() : null;
}
/**
* Ensures that statement is not closed.
*
* @throws SQLException If statement is closed.
*/
void ensureNotClosed() throws SQLException {
if (closed)
throw new SQLException("Connection is closed.", SqlStateCode.CONNECTION_CLOSED);
}
/**
* Get last result set if any.
*
* @return Result set or null.
* @throws SQLException If failed.
*/
private JdbcResultSet nextResultSet() throws SQLException {
ensureNotClosed();
if (results == null || curRes >= results.size())
return null;
else
return results.get(curRes++);
}
/**
* Close results.
*
* @throws SQLException On error.
*/
private void closeResults() throws SQLException {
if (results != null) {
for (JdbcResultSet rs : results)
rs.close();
results = null;
curRes = 0;
}
}
}