blob: 9a2e1320203c6fa7bccfb10c375b97d81aa1d0d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.driver.jdbc.core.statement;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
import org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.rule.attribute.resoure.StorageConnectorReusableRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
import java.sql.Connection;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* ShardingSphere prepared statement.
*/
@HighFrequencyInvocation
public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
@Getter
private final ShardingSphereConnection connection;
private final MetaDataContexts metaDataContexts;
private final String sql;
private final List<PreparedStatement> statements;
private final List<List<Object>> parameterSets;
private final SQLStatement sqlStatement;
private final SQLStatementContext sqlStatementContext;
private final String databaseName;
private final StatementOption statementOption;
@Getter
private final ParameterMetaData parameterMetaData;
@Getter(AccessLevel.PROTECTED)
private final DriverExecutor executor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
private final Collection<Comparable<?>> generatedValues = new LinkedList<>();
private final KernelProcessor kernelProcessor;
private final boolean statementsCacheable;
private final TrafficRule trafficRule;
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
@Getter
private final boolean selectContainsEnhancedTable;
private ExecutionContext executionContext;
private Map<String, Integer> columnLabelAndIndexMap;
private ResultSet currentResultSet;
private String trafficInstanceId;
private boolean useFederation;
private final HintValueContext hintValueContext;
private ResultSet currentBatchGeneratedKeysResultSet;
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);
}
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);
}
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys, null);
}
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final String[] columns) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, true, columns);
}
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency,
final int resultSetHoldability) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability, false, null);
}
private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
final String[] columns) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new EmptySQLException().toSQLException();
}
this.connection = connection;
metaDataContexts = connection.getContextManager().getMetaDataContexts();
hintValueContext = SQLHintUtils.extractHint(sql);
this.sql = SQLHintUtils.removeHint(sql);
statements = new ArrayList<>();
parameterSets = new ArrayList<>();
SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType());
sqlStatement = sqlParserEngine.parse(this.sql, true);
sqlStatementContext = new SQLBindEngine(metaDataContexts.getMetaData(), connection.getDatabaseName(), hintValueContext).bind(sqlStatement, Collections.emptyList());
databaseName = sqlStatementContext.getTablesContext().getDatabaseName().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
executor = new DriverExecutor(connection);
JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext());
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, databaseName, connection.getProcessId());
kernelProcessor = new KernelProcessor();
statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData());
trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
statementManager = new StatementManager();
}
private boolean isStatementsCacheable(final RuleMetaData databaseRuleMetaData) {
return databaseRuleMetaData.getAttributes(StorageConnectorReusableRuleAttribute.class).size() == databaseRuleMetaData.getRules().size() && !HintManager.isInstantiated();
}
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
if (statementsCacheable && !statements.isEmpty()) {
resetParameters();
return statements.iterator().next().executeQuery();
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
}
useFederation = decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
if (useFederation) {
return executeFederationQuery(queryContext);
}
executionContext = createExecutionContext(queryContext);
result = doExecuteQuery(executionContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
handleExceptionInTransaction(connection, metaDataContexts);
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionContext) throws SQLException {
List<QueryResult> queryResults = executeQuery0(executionContext);
MergedResult mergedResult = mergeQuery(queryResults, executionContext.getSqlStatementContext());
List<ResultSet> resultSets = getResultSets();
if (null == columnLabelAndIndexMap) {
columnLabelAndIndexMap = ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext, selectContainsEnhancedTable, resultSets.get(0).getMetaData());
}
return new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap);
}
private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData);
}
private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
connection.handleAutoCommit();
}
}
private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
ShardingSpherePreconditions.checkState(!context.getInputGroups().isEmpty() && !context.getInputGroups().iterator().next().getInputs().isEmpty(), EmptyTrafficExecutionUnitException::new);
return context.getInputGroups().iterator().next().getInputs().iterator().next();
}
private Optional<String> getInstanceIdAndSet(final QueryContext queryContext) {
Optional<String> result = connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
if (!result.isPresent()) {
result = getInstanceId(queryContext);
}
if (connection.isHoldTransaction() && result.isPresent()) {
connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
}
return result;
}
private Optional<String> getInstanceId(final QueryContext queryContext) {
InstanceContext instanceContext = connection.getContextManager().getInstanceContext();
return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
? new TrafficEngine(trafficRule, instanceContext).dispatch(queryContext, connection.isHoldTransaction())
: Optional.empty();
}
private void resetParameters() throws SQLException {
parameterSets.clear();
parameterSets.add(getParameters());
replaySetParameter();
}
private List<QueryResult> executeQuery0(final ExecutionContext executionContext) throws SQLException {
if (hasRawExecutionRule()) {
return executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
executionContext.getQueryContext(), new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
cacheStatements(executionGroupContext.getInputGroups());
return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(),
new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown()));
}
private ResultSet executeFederationQuery(final QueryContext queryContext) {
PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager,
statementOption, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
}
@Override
public int executeUpdate() throws SQLException {
try {
if (statementsCacheable && !statements.isEmpty()) {
resetParameters();
return statements.iterator().next().executeUpdate();
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
}
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
return accumulate(results);
}
return executeUpdateWithExecutionContext(executionContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
handleExceptionInTransaction(connection, metaDataContexts);
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
clearBatch();
}
}
private int useDriverToExecuteUpdate(final ExecutionContext executionContext) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
cacheStatements(executionGroupContext.getInputGroups());
return executor.getRegularExecutor().executeUpdate(executionGroupContext,
executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
}
private int accumulate(final Collection<ExecuteResult> results) {
int result = 0;
for (ExecuteResult each : results) {
result += ((UpdateResult) each).getUpdateCount();
}
return result;
}
private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) {
@Override
protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return ((PreparedStatement) statement).executeUpdate();
}
@Override
protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
return Optional.empty();
}
};
}
@Override
public boolean execute() throws SQLException {
try {
if (statementsCacheable && !statements.isEmpty()) {
resetParameters();
return statements.iterator().next().execute();
}
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute());
}
useFederation = decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
if (useFederation) {
ResultSet resultSet = executeFederationQuery(queryContext);
return null != resultSet;
}
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
return executeWithExecutionContext(executionContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
handleExceptionInTransaction(connection, metaDataContexts);
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
clearBatch();
}
}
private boolean hasRawExecutionRule() {
return !metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
}
private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecutionContext executionContext) throws SQLException {
return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
? executeWithImplicitCommitTransaction(() -> useDriverToExecute(executionContext))
: useDriverToExecute(executionContext);
}
private boolean executeWithImplicitCommitTransaction(final ImplicitTransactionCallback<Boolean> callback) throws SQLException {
boolean result;
try {
connection.setAutoCommit(false);
result = callback.execute();
connection.commit();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
private int executeUpdateWithExecutionContext(final ExecutionContext executionContext) throws SQLException {
return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
? executeUpdateWithImplicitCommitTransaction(() -> useDriverToExecuteUpdate(executionContext))
: useDriverToExecuteUpdate(executionContext);
}
private int executeUpdateWithImplicitCommitTransaction(final ImplicitTransactionCallback<Integer> callback) throws SQLException {
int result;
try {
connection.setAutoCommit(false);
result = callback.execute();
connection.commit();
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
connection.rollback();
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
connection.setAutoCommit(true);
}
return result;
}
private boolean useDriverToExecute(final ExecutionContext executionContext) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
cacheStatements(executionGroupContext.getInputGroups());
return executor.getRegularExecutor().execute(executionGroupContext,
executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
}
private JDBCExecutorCallback<Boolean> createExecuteCallback() {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) {
@Override
protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return ((PreparedStatement) statement).execute();
}
@Override
protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
return Optional.empty();
}
};
}
private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
}
@Override
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
if (null != trafficInstanceId) {
return executor.getTrafficExecutor().getResultSet();
}
if (useFederation) {
return executor.getSqlFederationEngine().getResultSet();
}
if (executionContext.getSqlStatementContext() instanceof SelectStatementContext
|| executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
List<ResultSet> resultSets = getResultSets();
if (resultSets.isEmpty()) {
return currentResultSet;
}
SQLStatementContext sqlStatementContext = executionContext.getSqlStatementContext();
MergedResult mergedResult = mergeQuery(getQueryResults(resultSets), sqlStatementContext);
if (null == columnLabelAndIndexMap) {
columnLabelAndIndexMap = ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext, selectContainsEnhancedTable, resultSets.get(0).getMetaData());
}
currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap);
}
return currentResultSet;
}
private List<ResultSet> getResultSets() throws SQLException {
List<ResultSet> result = new ArrayList<>(statements.size());
for (Statement each : statements) {
if (null != each.getResultSet()) {
result.add(each.getResultSet());
}
}
return result;
}
private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
List<QueryResult> result = new ArrayList<>(resultSets.size());
for (ResultSet each : resultSets) {
if (null != each) {
result.add(new JDBCStreamQueryResult(each));
}
}
return result;
}
private ExecutionContext createExecutionContext(final QueryContext queryContext) {
RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(databaseName);
SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());
ExecutionContext result = kernelProcessor.generateExecutionContext(
queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
return result;
}
private ExecutionContext createExecutionContext(final QueryContext queryContext, final String trafficInstanceId) {
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
return new ExecutionContext(queryContext, Collections.singletonList(executionUnit), new RouteContext());
}
private QueryContext createQueryContext() {
List<Object> params = new ArrayList<>(getParameters());
if (sqlStatementContext instanceof ParameterAware) {
((ParameterAware) sqlStatementContext).setUpParameters(params);
}
return new QueryContext(sqlStatementContext, sql, params, hintValueContext, true);
}
private MergedResult mergeQuery(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
MergeEngine mergeEngine = new MergeEngine(metaDataContexts.getMetaData().getGlobalRuleMetaData(), metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
return mergeEngine.merge(queryResults, sqlStatementContext);
}
private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
each.getInputs().forEach(eachInput -> {
statements.add((PreparedStatement) eachInput.getStorageResource());
parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
});
}
replay();
}
private void replay() throws SQLException {
replaySetParameter();
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);
}
}
private void replaySetParameter() throws SQLException {
for (int i = 0; i < statements.size(); i++) {
replaySetParameter(statements.get(i), parameterSets.get(i));
}
}
private void clearPrevious() {
statements.clear();
parameterSets.clear();
generatedValues.clear();
}
private Optional<GeneratedKeyContext> findGeneratedKey(final ExecutionContext executionContext) {
return executionContext.getSqlStatementContext() instanceof InsertStatementContext
? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext()
: Optional.empty();
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
if (null != currentBatchGeneratedKeysResultSet) {
return currentBatchGeneratedKeysResultSet;
}
Optional<GeneratedKeyContext> generatedKey = findGeneratedKey(executionContext);
if (generatedKey.isPresent() && statementOption.isReturnGeneratedKeys() && !generatedValues.isEmpty()) {
return new GeneratedKeysResultSet(getGeneratedKeysColumnName(generatedKey.get().getColumnName()), generatedValues.iterator(), this);
}
for (PreparedStatement each : statements) {
ResultSet resultSet = each.getGeneratedKeys();
while (resultSet.next()) {
generatedValues.add((Comparable<?>) resultSet.getObject(1));
}
}
String columnName = generatedKey.map(GeneratedKeyContext::getColumnName).orElse(null);
return new GeneratedKeysResultSet(getGeneratedKeysColumnName(columnName), generatedValues.iterator(), this);
}
private String getGeneratedKeysColumnName(final String columnName) {
return metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType() instanceof MySQLDatabaseType ? "GENERATED_KEY" : columnName;
}
@Override
public void addBatch() {
try {
QueryContext queryContext = createQueryContext();
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
executionContext = null == trafficInstanceId ? createExecutionContext(queryContext) : createExecutionContext(queryContext, trafficInstanceId);
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
} finally {
currentResultSet = null;
clearParameters();
}
}
@Override
public int[] executeBatch() throws SQLException {
if (null == executionContext) {
return new int[0];
}
try {
// TODO add raw SQL executor
return doExecuteBatch(batchPreparedStatementExecutor);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
handleExceptionInTransaction(connection, metaDataContexts);
throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
} finally {
clearBatch();
}
}
private int[] doExecuteBatch(final BatchPreparedStatementExecutor batchExecutor) throws SQLException {
initBatchPreparedStatementExecutor(batchExecutor);
int[] result = batchExecutor.executeBatch(executionContext.getSqlStatementContext());
if (statementOption.isReturnGeneratedKeys() && generatedValues.isEmpty()) {
List<Statement> batchPreparedStatementExecutorStatements = batchExecutor.getStatements();
for (Statement statement : batchPreparedStatementExecutorStatements) {
statements.add((PreparedStatement) statement);
}
currentBatchGeneratedKeysResultSet = getGeneratedKeys();
statements.clear();
}
return result;
}
private void initBatchPreparedStatementExecutor(final BatchPreparedStatementExecutor batchExecutor) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, metaDataContexts.getMetaData().getProps()
.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), connection.getDatabaseConnectionManager(), statementManager, statementOption,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
List<ExecutionUnit> executionUnits = new ArrayList<>(batchExecutor.getBatchExecutionUnits().size());
for (BatchExecutionUnit each : batchExecutor.getBatchExecutionUnits()) {
ExecutionUnit executionUnit = each.getExecutionUnit();
executionUnits.add(executionUnit);
}
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), executionUnits, new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))));
setBatchParametersForStatements(batchExecutor);
}
private void setBatchParametersForStatements(final BatchPreparedStatementExecutor batchExecutor) throws SQLException {
for (Statement each : batchExecutor.getStatements()) {
List<List<Object>> paramSet = batchExecutor.getParameterSet(each);
for (List<Object> eachParams : paramSet) {
replaySetParameter((PreparedStatement) each, eachParams);
((PreparedStatement) each).addBatch();
}
}
}
@Override
public void clearBatch() {
currentResultSet = null;
batchPreparedStatementExecutor.clear();
clearParameters();
}
@SuppressWarnings("MagicConstant")
@Override
public int getResultSetType() {
return statementOption.getResultSetType();
}
@SuppressWarnings("MagicConstant")
@Override
public int getResultSetConcurrency() {
return statementOption.getResultSetConcurrency();
}
@Override
public int getResultSetHoldability() {
return statementOption.getResultSetHoldability();
}
@Override
public boolean isAccumulate() {
for (DataNodeRuleAttribute each : metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)) {
if (each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames())) {
return true;
}
}
return false;
}
@Override
public Collection<PreparedStatement> getRoutedStatements() {
return statements;
}
}