PHOENIX-6821: Optimize batching in auto-commit mode (#1570)
* PHOENIX-6821: Optimize batching in auto-commit mode
This change also includes the following to make the batch functionality
more consistent and confirmant to the JDBC spec:
- Disallow the use of DQL in a batch by throwing BatchUpdateException.
- Replace the use of custom BatchUpdateExecution exception with the java.sql.BatchUpdateException and propagate update counts through it.
- Add unit test coverage for executeBatch()
- Add tests for delete and further enhance existing tests
- Review feedback changes
Squashes the following commits to avoid patch errors during PR validation builds:
* b9eb9bf48 Addressed additional review feedback
* 54973f799 Add tests for delete and further enhance existing tests
* 7548551c0 Restoring prior version for reload4j.version
* b29d8d252 Add unit test coverage for executeBatch()
* 6e0c825c8 Review feedback changes
* f129a1504 Reverted the code that prevents mixing of non-batch commit after a batch commit as it is broken
* af3975068 Fix checkstyle errors in the lines touched in this PR.
* 7236deb5c Merge branch 'master' into PHOENIX-6821
* 5c6ac9392 PHOENIX-6821: Optimize batching in auto-commit mode
* Fix checkstyle errors
* Fix checkstyle errors
* Remove extraneous whitespace
* Fix checkstyle errors
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 0fb46b1..52a7553 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -17,16 +17,19 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.util.PhoenixRuntime.REQUEST_METRIC_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.TestUtil.closeStatement;
import static org.apache.phoenix.util.TestUtil.closeStmtAndConn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.math.RoundingMode;
+import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
@@ -36,6 +39,7 @@
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.client.Result;
@@ -45,6 +49,8 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.DateUtil;
@@ -55,6 +61,7 @@
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
@Category(ParallelStatsDisabledTest.class)
@@ -454,23 +461,25 @@
closeStmtAndConn(stmt, conn);
}
}
-
-
- @Test
- public void testBatchedUpsert() throws Exception {
+
+ private void testBatchedUpsert(boolean autocommit) throws Exception {
String tableName = generateUniqueName();
Properties props = new Properties();
+ props.setProperty(REQUEST_METRIC_ATTRIB, "true");
+ props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
Connection conn = null;
PreparedStatement pstmt = null;
+ Statement stmt = null;
try {
conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
} finally {
closeStmtAndConn(pstmt, conn);
}
-
+
try {
conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autocommit);
pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
pstmt.setString(1, "a");
pstmt.setInt(2, 1);
@@ -478,12 +487,22 @@
pstmt.setString(1, "b");
pstmt.setInt(2, 2);
pstmt.addBatch();
+ pstmt.setString(1, "c");
+ pstmt.setInt(2, 3);
+ pstmt.addBatch();
pstmt.executeBatch();
- conn.commit();
+ if (!autocommit) {
+ conn.commit();
+ }
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ Map<String, Map<MetricType, Long>> mutationMetrics = pConn.getMutationMetrics();
+ Assert.assertEquals(3, (long) mutationMetrics.get(tableName).get(MetricType.MUTATION_BATCH_SIZE));
+ Assert.assertEquals(3, (long) mutationMetrics.get(tableName).get(MetricType.UPSERT_MUTATION_SQL_COUNTER));
+ Assert.assertEquals(autocommit, conn.getAutoCommit());
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(pstmt, conn);
}
-
+
try {
conn = DriverManager.getConnection(getUrl(), props);
pstmt = conn.prepareStatement("select * from " + tableName);
@@ -494,49 +513,161 @@
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals(2, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals(3, rs.getInt(2));
assertFalse(rs.next());
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(pstmt, conn);
}
-
- conn = DriverManager.getConnection(getUrl(), props);
- Statement stmt = conn.createStatement();
- try {
- stmt.addBatch("upsert into " + tableName + " values ('c', 3)");
- stmt.addBatch("select count(*) from " + tableName);
- stmt.addBatch("upsert into " + tableName + " values ('a', 4)");
- ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- int[] result = stmt.executeBatch();
- assertEquals(3,result.length);
- assertEquals(result[0], 1);
- assertEquals(result[1], -2);
- assertEquals(result[2], 1);
- conn.commit();
- } finally {
- closeStmtAndConn(pstmt, conn);
- }
-
+
try {
conn = DriverManager.getConnection(getUrl(), props);
- pstmt = conn.prepareStatement("select * from " + tableName);
- ResultSet rs = pstmt.executeQuery();
+ conn.setAutoCommit(autocommit);
+ stmt = conn.createStatement();
+ stmt.addBatch("upsert into " + tableName + " values ('d', 4)");
+ stmt.addBatch("upsert into " + tableName + " values ('a', 5)");
+ ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ int[] result = stmt.executeBatch();
+ assertEquals(2, result.length);
+ assertEquals(result[0], 1);
+ assertEquals(result[1], 1);
+ conn.commit();
+ } finally {
+ closeStmtAndConn(stmt, conn);
+ }
+
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select * from " + tableName);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
- assertEquals(4, rs.getInt(2));
+ assertEquals(5, rs.getInt(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals(3, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("d", rs.getString(1));
+ assertEquals(4, rs.getInt(2));
assertFalse(rs.next());
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(stmt, conn);
+ }
+
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autocommit);
+ stmt = conn.createStatement();
+ stmt.addBatch("delete from " + tableName + " where v <= 4");
+ stmt.addBatch("delete from " + tableName + " where v = 5");
+ int[] result = stmt.executeBatch();
+ assertEquals(2, result.length);
+ assertEquals(result[0], 3);
+ assertEquals(result[1], 1);
+ conn.commit();
+ } finally {
+ closeStmtAndConn(stmt, conn);
+ }
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ pstmt = conn.prepareStatement("select count(*) from " + tableName);
+ ResultSet rs = pstmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ } finally {
+ closeStmtAndConn(stmt, conn);
+ }
+
+ }
+
+ @Test
+ public void testBatchedUpsert() throws Exception {
+ testBatchedUpsert(false);
+ }
+
+ @Test
+ public void testBatchedUpsertAutoCommit() throws Exception {
+ testBatchedUpsert(true);
+ }
+
+ @Test
+ public void testBatchedUpsertMultipleBatches() throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+ PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
+ pstmt.setString(1, "a");
+ pstmt.setInt(2, 1);
+ pstmt.addBatch();
+ pstmt.executeBatch();
+ pstmt.setString(1, "b");
+ pstmt.setInt(2, 2);
+ pstmt.addBatch();
+ pstmt.executeBatch();
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
}
}
-
+
+ private void testBatchRollback(boolean autocommit) throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+ conn.setAutoCommit(autocommit);
+ PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
+ pstmt.setString(1, "a");
+ pstmt.setInt(2, 1);
+ pstmt.addBatch();
+ pstmt.executeBatch();
+ conn.rollback();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+ assertTrue(rs.next());
+ assertEquals(autocommit ? 1 : 0, rs.getInt(1));
+ }
+ }
+
+ @Test
+ public void testBatchRollback() throws Exception {
+ testBatchRollback(false);
+ }
+
+ @Test
+ public void testBatchNoRollbackWithAutoCommit() throws Exception {
+ testBatchRollback(true);
+ }
+
+ @Test
+ public void testDQLFailsInBatch() throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+ Statement stmt = conn.createStatement();
+ stmt.addBatch("select * from " + tableName);
+ BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+ assertEquals("java.sql.BatchUpdateException: ERROR 1151 (XCL51): A batch operation can't include a statement that produces result sets.",
+ ex.getMessage());
+ }
+ }
+
private static Date toDate(String dateString) {
return DateUtil.parseDate(dateString);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
deleted file mode 100644
index d3cd82b..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.exception;
-
-import java.sql.SQLException;
-
-public class BatchUpdateExecution extends SQLException {
- private static final long serialVersionUID = 1L;
- private static SQLExceptionCode code = SQLExceptionCode.BATCH_EXCEPTION;
- private final int batchIndex;
-
- public BatchUpdateExecution(Throwable cause, int batchIndex) {
- super(new SQLExceptionInfo.Builder(code).build().toString(),
- code.getSQLState(), code.getErrorCode(), cause);
- this.batchIndex = batchIndex;
- }
-
- public int getBatchIndex() {
- return batchIndex;
- }
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d39e67b..67dd263 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.exception;
+import java.sql.BatchUpdateException;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Map;
@@ -454,6 +455,8 @@
"Duplicate ENCODED_QUALIFIER."),
MISSING_CQ(1150, "XCL49",
"Missing ENCODED_QUALIFIER."),
+ EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+ + "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
/**
@@ -634,15 +637,16 @@
}
public static interface Factory {
- public static final Factory DEFAULT = new Factory() {
+ Factory DEFAULT = new Factory() {
@Override
public SQLException newException(SQLExceptionInfo info) {
- return new SQLException(info.toString(), info.getCode().getSQLState(), info.getCode().getErrorCode(), info.getRootCause());
+ return new SQLException(info.toString(), info.getCode().getSQLState(),
+ info.getCode().getErrorCode(), info.getRootCause());
}
};
- public static final Factory SYNTAX_ERROR = new Factory() {
+ Factory SYNTAX_ERROR = new Factory() {
@Override
public SQLException newException(SQLExceptionInfo info) {
@@ -650,7 +654,16 @@
}
};
- public SQLException newException(SQLExceptionInfo info);
+ Factory BATCH_UPDATE_ERROR = new Factory() {
+
+ @Override
+ public SQLException newException(SQLExceptionInfo info) {
+ return new BatchUpdateException(info.toString(), info.getCode().getSQLState(),
+ info.getCode().getErrorCode(), (int[]) null, info.getRootCause());
+ }
+
+ };
+ SQLException newException(SQLExceptionInfo info);
}
private static final Map<Integer,SQLExceptionCode> errorCodeMap = Maps.newHashMapWithExpectedSize(SQLExceptionCode.values().length);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index f524999..eaf0b37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -956,7 +956,7 @@
}
}
} catch(Throwable e) {
- if(table != null) {
+ if (table != null) {
TableMetricsManager.updateMetricsForSystemCatalogTableMethod(table.getTableName().toString(),
NUM_METADATA_LOOKUP_FAILURES, 1);
}
@@ -1507,13 +1507,13 @@
if (allUpsertsMutations ^ allDeletesMutations) {
//success cases are updated for both cases autoCommit=true and conn.commit explicit
- if(areAllBatchesSuccessful){
+ if (areAllBatchesSuccessful){
TableMetricsManager
.updateMetricsMethod(htableNameStr, allUpsertsMutations ? UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER :
DELETE_AGGREGATE_SUCCESS_SQL_COUNTER, 1);
}
//Failures cases are updated only for conn.commit explicit case.
- if(!areAllBatchesSuccessful && !connection.getAutoCommit()){
+ if (!areAllBatchesSuccessful && !connection.getAutoCommit()){
TableMetricsManager.updateMetricsMethod(htableNameStr, allUpsertsMutations ? UPSERT_AGGREGATE_FAILURE_SQL_COUNTER :
DELETE_AGGREGATE_FAILURE_SQL_COUNTER, 1);
}
@@ -2243,4 +2243,7 @@
timeInExecuteMutationMap.clear();
}
+ public boolean isEmpty() {
+ return mutationsMap != null ? mutationsMap.isEmpty() : true;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index f72f271..9a00f9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -159,11 +159,23 @@
return compileMutation(statement, query);
}
- boolean execute(boolean batched) throws SQLException {
+ void executeForBatch() throws SQLException {
throwIfUnboundParameters();
- if (!batched && statement.getOperation().isMutation() && !batch.isEmpty()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
- .build().buildException();
+ if (!statement.getOperation().isMutation()) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET)
+ .build().buildException();
+ }
+ executeMutation(statement, createAuditQueryLogger(statement, query));
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ throwIfUnboundParameters();
+ if (statement.getOperation().isMutation() && !batch.isEmpty()) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+ .build().buildException();
}
if (statement.getOperation().isMutation()) {
executeMutation(statement, createAuditQueryLogger(statement,query));
@@ -171,12 +183,6 @@
}
executeQuery(statement, createQueryLogger(statement,query));
return true;
-
- }
-
- @Override
- public boolean execute() throws SQLException {
- return execute(false);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 3aaceca..44e2e98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -46,6 +46,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Reader;
+import java.sql.BatchUpdateException;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -101,7 +102,6 @@
import org.apache.phoenix.compile.TraceQueryPlan;
import org.apache.phoenix.compile.UpsertCompiler;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.BatchUpdateExecution;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.UpgradeRequiredException;
@@ -543,7 +543,7 @@
isDelete = stmt instanceof ExecutableDeleteStatement;
isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
- if(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
+ if (!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
}
if (plan.getTargetRef().getTable().isTransactional()) {
@@ -567,7 +567,7 @@
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
connection.incrementStatementExecutionCounter();
- if(queryLogger.isAuditLoggingEnabled()) {
+ if (queryLogger.isAuditLoggingEnabled()) {
queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
@@ -649,7 +649,7 @@
}, PhoenixContextExecutor.inContext(),
Tracing.withTracing(connection, this.toString()));
} catch (Exception e) {
- if(queryLogger.isAuditLoggingEnabled()) {
+ if (queryLogger.isAuditLoggingEnabled()) {
queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
@@ -1021,7 +1021,7 @@
@SuppressWarnings("unchecked")
@Override
public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
- if(!getUdfParseNodes().isEmpty()) {
+ if (!getUdfParseNodes().isEmpty()) {
stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
}
DeleteCompiler compiler = new DeleteCompiler(stmt, this.getOperation());
@@ -1989,26 +1989,41 @@
/**
* Execute the current batch of statements. If any exception occurs
- * during execution, a org.apache.phoenix.exception.BatchUpdateException
- * is thrown which includes the index of the statement within the
- * batch when the exception occurred.
+ * during execution, a {@link java.sql.BatchUpdateException}
+ * is thrown which compposes the update counts for statements executed so
+ * far.
*/
@Override
public int[] executeBatch() throws SQLException {
int i = 0;
+ int[] returnCodes = new int [batch.size()];
+ Arrays.fill(returnCodes, -1);
+ boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
try {
- int[] returnCodes = new int [batch.size()];
for (i = 0; i < returnCodes.length; i++) {
PhoenixPreparedStatement statement = batch.get(i);
- returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
+ statement.executeForBatch();
+ returnCodes[i] = statement.getUpdateCount();
}
// Flush all changes in batch if auto flush is true
flushIfNecessary();
// If we make it all the way through, clear the batch
clearBatch();
+ if (autoCommit) {
+ connection.commit();
+ }
return returnCodes;
- } catch (Throwable t) {
- throw new BatchUpdateExecution(t,i);
+ } catch (SQLException t) {
+ if (i == returnCodes.length) {
+ // Exception after for loop, perhaps in commit(), discard returnCodes.
+ throw new BatchUpdateException(t);
+ } else {
+ returnCodes[i] = Statement.EXECUTE_FAILED;
+ throw new BatchUpdateException(returnCodes, t);
+ }
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
}
@@ -2091,7 +2106,7 @@
TableName tableName = null;
if (stmt instanceof ExecutableSelectStatement) {
TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
- if(from instanceof NamedTableNode) {
+ if (from instanceof NamedTableNode) {
tableName = ((NamedTableNode)from).getName();
}
} else if (stmt instanceof ExecutableUpsertStatement) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
index 56a524c..616e3a0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
@@ -85,97 +85,4 @@
assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
}
}
-
- @Test
- /**
- * Validates that if a user sets the query timeout via the
- * stmt.setQueryTimeout() JDBC method, we correctly store the timeout
- * in both milliseconds and seconds.
- */
- public void testSettingQueryTimeoutViaJdbc() throws Exception {
- // Arrange
- Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Act
- stmt.setQueryTimeout(3);
-
- // Assert
- assertEquals(3, stmt.getQueryTimeout());
- assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- /**
- * Validates if a user sets the timeout to zero that we store the timeout
- * in millis as the Integer.MAX_VALUE.
- */
- public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
- // Arrange
- Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Act
- stmt.setQueryTimeout(0);
-
- // Assert
- assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
- assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- /**
- * Validates that is negative value is supplied we set the timeout to the default.
- */
- public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
- // Arrange
- Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
- PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
- int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
- QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
-
- // Act
- stmt.setQueryTimeout(-1);
-
- // Assert
- assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
- assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- /**
- * Validates that setting custom phoenix query timeout using
- * the phoenix.query.timeoutMs config property is honored.
- */
- public void testCustomQueryTimeout() throws Exception {
- // Arrange
- Properties connectionProperties = new Properties();
- connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
- Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Assert
- assertEquals(3, stmt.getQueryTimeout());
- assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- public void testZeroCustomQueryTimeout() throws Exception {
- // Arrange
- Properties connectionProperties = new Properties();
- connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
- Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Assert
- assertEquals(0, stmt.getQueryTimeout());
- assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
- }
-
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
new file mode 100644
index 0000000..2af97b7
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+public class PhoenixStatementTest extends BaseConnectionlessQueryTest {
+
+ @Test
+ public void testMutationUsingExecuteQueryShouldFail() throws Exception {
+ Properties connectionProperties = new Properties();
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ try {
+ stmt.executeQuery("DELETE FROM " + ATABLE);
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.EXECUTE_QUERY_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
+ }
+ }
+
+ @Test
+ public void testQueriesUsingExecuteUpdateShouldFail() throws Exception {
+ Properties connectionProperties = new Properties();
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ try {
+ stmt.executeUpdate("SELECT * FROM " + ATABLE);
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
+ }
+ }
+
+ @Test
+ /**
+ * Validates that if a user sets the query timeout via the
+ * stmt.setQueryTimeout() JDBC method, we correctly store the timeout
+ * in both milliseconds and seconds.
+ */
+ public void testSettingQueryTimeoutViaJdbc() throws Exception {
+ // Arrange
+ Connection connection = DriverManager.getConnection(getUrl());
+ Statement stmt = connection.createStatement();
+ PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+ // Act
+ stmt.setQueryTimeout(3);
+
+ // Assert
+ assertEquals(3, stmt.getQueryTimeout());
+ assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
+ }
+
+ @Test
+ /**
+ * Validates if a user sets the timeout to zero that we store the timeout
+ * in millis as the Integer.MAX_VALUE.
+ */
+ public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
+ // Arrange
+ Connection connection = DriverManager.getConnection(getUrl());
+ Statement stmt = connection.createStatement();
+ PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+ // Act
+ stmt.setQueryTimeout(0);
+
+ // Assert
+ assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
+ assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
+ }
+
+ @Test
+ /**
+ * Validates that is negative value is supplied we set the timeout to the default.
+ */
+ public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
+ // Arrange
+ Connection connection = DriverManager.getConnection(getUrl());
+ Statement stmt = connection.createStatement();
+ PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+ PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
+ int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+
+ // Act
+ stmt.setQueryTimeout(-1);
+
+ // Assert
+ assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
+ assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
+ }
+
+ @Test
+ /**
+ * Validates that setting custom phoenix query timeout using
+ * the phoenix.query.timeoutMs config property is honored.
+ */
+ public void testCustomQueryTimeout() throws Exception {
+ // Arrange
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+ // Assert
+ assertEquals(3, stmt.getQueryTimeout());
+ assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
+ }
+
+ @Test
+ public void testZeroCustomQueryTimeout() throws Exception {
+ // Arrange
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+ // Assert
+ assertEquals(0, stmt.getQueryTimeout());
+ assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
+ }
+
+ @Test
+ public void testExecuteBatchWithFailedStatement() throws Exception {
+ // Arrange
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class));
+ Whitebox.setInternalState(stmt, "connection", connSpy);
+ List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+ mock(PhoenixPreparedStatement.class),
+ mock(PhoenixPreparedStatement.class),
+ mock(PhoenixPreparedStatement.class));
+ Whitebox.setInternalState(stmt, "batch", batch);
+ final String exMsg = "TEST";
+ when(batch.get(0).getUpdateCount()).thenReturn(1);
+ doThrow(new SQLException(exMsg)).when(batch.get(1)).executeForBatch();
+ // However, we don't expect this to be called.
+ when(batch.get(1).getUpdateCount()).thenReturn(1);
+
+ // Act & Assert
+ BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+ assertEquals(exMsg, ex.getCause().getMessage());
+ int[] updateCounts = ex.getUpdateCounts();
+ assertEquals(3, updateCounts.length);
+ assertEquals(1, updateCounts[0]);
+ assertEquals(Statement.EXECUTE_FAILED, updateCounts[1]);
+ assertEquals(-1, updateCounts[2]);
+ verify(connSpy, never()).commit(); // Ensure commit was never called.
+ }
+
+ @Test
+ public void testExecuteBatchWithCommitFailure() throws Exception {
+ // Arrange
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class));
+ Whitebox.setInternalState(stmt, "connection", connSpy);
+ List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+ mock(PhoenixPreparedStatement.class));
+ Whitebox.setInternalState(stmt, "batch", batch);
+ final String exMsg = "TEST";
+ doThrow(new SQLException(exMsg)).when(connSpy).commit();
+ when(connSpy.getAutoCommit()).thenReturn(true);
+
+ // Act & Assert
+ BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+ assertEquals(exMsg, ex.getCause().getMessage());
+ assertNull(ex.getUpdateCounts());
+ }
+
+}