PHOENIX-6821: Optimize batching in auto-commit mode (#1570)

* PHOENIX-6821: Optimize batching in auto-commit mode

This change also includes the following to make the batch functionality
more consistent and confirmant to the JDBC spec:
- Disallow the use of DQL in a batch by throwing BatchUpdateException.
- Replace the use of custom BatchUpdateExecution exception with the java.sql.BatchUpdateException and propagate update counts through it.
- Add unit test coverage for executeBatch()
- Add tests for delete and further enhance existing tests
- Review feedback changes

Squashes the following commits to avoid patch errors during PR validation builds:

* b9eb9bf48 Addressed additional review feedback
* 54973f799 Add tests for delete and further enhance existing tests
* 7548551c0 Restoring prior version for reload4j.version
* b29d8d252 Add unit test coverage for executeBatch()
* 6e0c825c8 Review feedback changes
* f129a1504 Reverted the code that prevents mixing of non-batch commit after a batch commit as it is broken
* af3975068 Fix checkstyle errors in the lines touched in this PR.
* 7236deb5c Merge branch 'master' into PHOENIX-6821
* 5c6ac9392 PHOENIX-6821: Optimize batching in auto-commit mode

* Fix checkstyle errors

* Fix checkstyle errors

* Remove extraneous whitespace

* Fix checkstyle errors
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 0fb46b1..52a7553 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -17,16 +17,19 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.util.PhoenixRuntime.REQUEST_METRIC_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
 import static org.apache.phoenix.util.TestUtil.closeStmtAndConn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
@@ -36,6 +39,7 @@
 import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.client.Result;
@@ -45,6 +49,8 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.DateUtil;
@@ -55,6 +61,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
 
 
 @Category(ParallelStatsDisabledTest.class)
@@ -454,23 +461,25 @@
              closeStmtAndConn(stmt, conn);
         }
     }
-        
-    
-    @Test
-    public void testBatchedUpsert() throws Exception {
+
+    private void testBatchedUpsert(boolean autocommit) throws Exception {
         String tableName = generateUniqueName();
         Properties props = new Properties();
+        props.setProperty(REQUEST_METRIC_ATTRIB, "true");
+        props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
         Connection conn = null;
         PreparedStatement pstmt = null;
+        Statement stmt = null;
         try {
             conn = DriverManager.getConnection(getUrl(), props);
             conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
         } finally {
             closeStmtAndConn(pstmt, conn);
         }
-        
+
         try {
             conn = DriverManager.getConnection(getUrl(), props);
+            conn.setAutoCommit(autocommit);
             pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
             pstmt.setString(1, "a");
             pstmt.setInt(2, 1);
@@ -478,12 +487,22 @@
             pstmt.setString(1, "b");
             pstmt.setInt(2, 2);
             pstmt.addBatch();
+            pstmt.setString(1, "c");
+            pstmt.setInt(2, 3);
+            pstmt.addBatch();
             pstmt.executeBatch();
-            conn.commit();
+            if (!autocommit) {
+                conn.commit();
+            }
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            Map<String, Map<MetricType, Long>> mutationMetrics = pConn.getMutationMetrics();
+            Assert.assertEquals(3, (long) mutationMetrics.get(tableName).get(MetricType.MUTATION_BATCH_SIZE));
+            Assert.assertEquals(3, (long) mutationMetrics.get(tableName).get(MetricType.UPSERT_MUTATION_SQL_COUNTER));
+            Assert.assertEquals(autocommit, conn.getAutoCommit());
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(pstmt, conn);
         }
-        
+
         try {
             conn = DriverManager.getConnection(getUrl(), props);
             pstmt = conn.prepareStatement("select * from " + tableName);
@@ -494,49 +513,161 @@
             assertTrue(rs.next());
             assertEquals("b", rs.getString(1));
             assertEquals(2, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
             assertFalse(rs.next());
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(pstmt, conn);
         }
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        Statement stmt = conn.createStatement();
-        try {
-            stmt.addBatch("upsert into " + tableName + " values ('c', 3)");
-            stmt.addBatch("select count(*) from " + tableName);
-            stmt.addBatch("upsert into " + tableName + " values ('a', 4)");
-            ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            int[] result = stmt.executeBatch();
-            assertEquals(3,result.length);
-            assertEquals(result[0], 1);
-            assertEquals(result[1], -2);
-            assertEquals(result[2], 1);
-            conn.commit();
-        } finally {
-             closeStmtAndConn(pstmt, conn);
-        }
-        
+
         try {
             conn = DriverManager.getConnection(getUrl(), props);
-            pstmt = conn.prepareStatement("select * from " + tableName);
-            ResultSet rs = pstmt.executeQuery();
+            conn.setAutoCommit(autocommit);
+            stmt = conn.createStatement();
+            stmt.addBatch("upsert into " + tableName + " values ('d', 4)");
+            stmt.addBatch("upsert into " + tableName + " values ('a', 5)");
+            ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            int[] result = stmt.executeBatch();
+            assertEquals(2, result.length);
+            assertEquals(result[0], 1);
+            assertEquals(result[1], 1);
+            conn.commit();
+        } finally {
+            closeStmtAndConn(stmt, conn);
+        }
+
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("select * from " + tableName);
             assertTrue(rs.next());
             assertEquals("a", rs.getString(1));
-            assertEquals(4, rs.getInt(2));
+            assertEquals(5, rs.getInt(2));
             assertTrue(rs.next());
             assertEquals("b", rs.getString(1));
             assertEquals(2, rs.getInt(2));
             assertTrue(rs.next());
             assertEquals("c", rs.getString(1));
             assertEquals(3, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals("d", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
             assertFalse(rs.next());
         } finally {
-             closeStmtAndConn(pstmt, conn);
+            closeStmtAndConn(stmt, conn);
+        }
+
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            conn.setAutoCommit(autocommit);
+            stmt = conn.createStatement();
+            stmt.addBatch("delete from " + tableName + " where v <= 4");
+            stmt.addBatch("delete from " + tableName + " where v = 5");
+            int[] result = stmt.executeBatch();
+            assertEquals(2, result.length);
+            assertEquals(result[0], 3);
+            assertEquals(result[1], 1);
+            conn.commit();
+        } finally {
+            closeStmtAndConn(stmt, conn);
+        }
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            pstmt = conn.prepareStatement("select count(*) from " + tableName);
+            ResultSet rs = pstmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+        } finally {
+            closeStmtAndConn(stmt, conn);
+        }
+
+    }
+
+    @Test
+    public void testBatchedUpsert() throws Exception {
+        testBatchedUpsert(false);
+    }
+
+    @Test
+    public void testBatchedUpsertAutoCommit() throws Exception {
+        testBatchedUpsert(true);
+    }
+
+    @Test
+    public void testBatchedUpsertMultipleBatches() throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+            PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
+            pstmt.setString(1, "a");
+            pstmt.setInt(2, 1);
+            pstmt.addBatch();
+            pstmt.executeBatch();
+            pstmt.setString(1, "b");
+            pstmt.setInt(2, 2);
+            pstmt.addBatch();
+            pstmt.executeBatch();
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
         }
     }
-    
+
+    private void testBatchRollback(boolean autocommit) throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+            conn.setAutoCommit(autocommit);
+            PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
+            pstmt.setString(1, "a");
+            pstmt.setInt(2, 1);
+            pstmt.addBatch();
+            pstmt.executeBatch();
+            conn.rollback();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+            assertTrue(rs.next());
+            assertEquals(autocommit ? 1 : 0, rs.getInt(1));
+        }
+    }
+
+    @Test
+    public void testBatchRollback() throws Exception {
+        testBatchRollback(false);
+    }
+
+    @Test
+    public void testBatchNoRollbackWithAutoCommit() throws Exception {
+        testBatchRollback(true);
+    }
+
+    @Test
+    public void testDQLFailsInBatch() throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+            Statement stmt = conn.createStatement();
+            stmt.addBatch("select * from " + tableName);
+            BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+            assertEquals("java.sql.BatchUpdateException: ERROR 1151 (XCL51): A batch operation can't include a statement that produces result sets.",
+                    ex.getMessage());
+        }
+    }
+
     private static Date toDate(String dateString) {
         return DateUtil.parseDate(dateString);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
deleted file mode 100644
index d3cd82b..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.exception;
-
-import java.sql.SQLException;
-
-public class BatchUpdateExecution extends SQLException {
-    private static final long serialVersionUID = 1L;
-    private static SQLExceptionCode code = SQLExceptionCode.BATCH_EXCEPTION;
-    private final int batchIndex;
-
-    public BatchUpdateExecution(Throwable cause, int batchIndex) {
-        super(new SQLExceptionInfo.Builder(code).build().toString(),
-                code.getSQLState(), code.getErrorCode(), cause);
-        this.batchIndex = batchIndex;
-    }
-
-    public int getBatchIndex() {
-        return batchIndex;
-    }
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d39e67b..67dd263 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.exception;
 
+import java.sql.BatchUpdateException;
 import java.sql.SQLException;
 import java.sql.SQLTimeoutException;
 import java.util.Map;
@@ -454,6 +455,8 @@
             "Duplicate ENCODED_QUALIFIER."),
     MISSING_CQ(1150, "XCL49",
             "Missing ENCODED_QUALIFIER."),
+    EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+            + "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
 
 
     /**
@@ -634,15 +637,16 @@
     }
 
     public static interface Factory {
-        public static final Factory DEFAULT = new Factory() {
+        Factory DEFAULT = new Factory() {
 
             @Override
             public SQLException newException(SQLExceptionInfo info) {
-                return new SQLException(info.toString(), info.getCode().getSQLState(), info.getCode().getErrorCode(), info.getRootCause());
+                return new SQLException(info.toString(), info.getCode().getSQLState(),
+                        info.getCode().getErrorCode(), info.getRootCause());
             }
             
         };
-        public static final Factory SYNTAX_ERROR = new Factory() {
+        Factory SYNTAX_ERROR = new Factory() {
 
             @Override
             public SQLException newException(SQLExceptionInfo info) {
@@ -650,7 +654,16 @@
             }
             
         };
-        public SQLException newException(SQLExceptionInfo info);
+        Factory BATCH_UPDATE_ERROR = new Factory() {
+
+            @Override
+            public SQLException newException(SQLExceptionInfo info) {
+                return new BatchUpdateException(info.toString(), info.getCode().getSQLState(),
+                        info.getCode().getErrorCode(), (int[]) null, info.getRootCause());
+            }
+
+        };
+        SQLException newException(SQLExceptionInfo info);
     }
     
     private static final Map<Integer,SQLExceptionCode> errorCodeMap = Maps.newHashMapWithExpectedSize(SQLExceptionCode.values().length);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index f524999..eaf0b37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -956,7 +956,7 @@
                 }
             }
         } catch(Throwable e) {
-            if(table != null) {
+            if (table != null) {
                 TableMetricsManager.updateMetricsForSystemCatalogTableMethod(table.getTableName().toString(),
                         NUM_METADATA_LOOKUP_FAILURES, 1);
             }
@@ -1507,13 +1507,13 @@
 
                     if (allUpsertsMutations ^ allDeletesMutations) {
                         //success cases are updated for both cases autoCommit=true and conn.commit explicit
-                        if(areAllBatchesSuccessful){
+                        if (areAllBatchesSuccessful){
                             TableMetricsManager
                                     .updateMetricsMethod(htableNameStr, allUpsertsMutations ? UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER :
                                             DELETE_AGGREGATE_SUCCESS_SQL_COUNTER, 1);
                         }
                         //Failures cases are updated only for conn.commit explicit case.
-                        if(!areAllBatchesSuccessful && !connection.getAutoCommit()){
+                        if (!areAllBatchesSuccessful && !connection.getAutoCommit()){
                             TableMetricsManager.updateMetricsMethod(htableNameStr, allUpsertsMutations ? UPSERT_AGGREGATE_FAILURE_SQL_COUNTER :
                                     DELETE_AGGREGATE_FAILURE_SQL_COUNTER, 1);
                         }
@@ -2243,4 +2243,7 @@
         timeInExecuteMutationMap.clear();
     }
 
+    public boolean isEmpty() {
+        return mutationsMap != null ? mutationsMap.isEmpty() : true;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index f72f271..9a00f9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -159,11 +159,23 @@
         return compileMutation(statement, query);
     }
 
-    boolean execute(boolean batched) throws SQLException {
+    void executeForBatch() throws SQLException {
         throwIfUnboundParameters();
-        if (!batched && statement.getOperation().isMutation() && !batch.isEmpty()) {
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
-            .build().buildException();
+        if (!statement.getOperation().isMutation()) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET)
+                    .build().buildException();
+        }
+        executeMutation(statement, createAuditQueryLogger(statement, query));
+    }
+
+    @Override
+    public boolean execute() throws SQLException {
+        throwIfUnboundParameters();
+        if (statement.getOperation().isMutation() && !batch.isEmpty()) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+                    .build().buildException();
         }
         if (statement.getOperation().isMutation()) {
             executeMutation(statement, createAuditQueryLogger(statement,query));
@@ -171,12 +183,6 @@
         }
         executeQuery(statement, createQueryLogger(statement,query));
         return true;
-        
-    }
-
-    @Override
-    public boolean execute() throws SQLException {
-        return execute(false);
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 3aaceca..44e2e98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -46,6 +46,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.Reader;
+import java.sql.BatchUpdateException;
 import java.sql.ParameterMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -101,7 +102,6 @@
 import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.BatchUpdateExecution;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.UpgradeRequiredException;
@@ -543,7 +543,7 @@
                                 isDelete = stmt instanceof ExecutableDeleteStatement;
                                 isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
                                 if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
-                                    if(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
+                                    if (!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
                                         tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
                                     }
                                     if (plan.getTargetRef().getTable().isTransactional()) {
@@ -567,7 +567,7 @@
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
                                 connection.incrementStatementExecutionCounter();
-                                if(queryLogger.isAuditLoggingEnabled()) {
+                                if (queryLogger.isAuditLoggingEnabled()) {
                                     queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
                                     queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
                                     queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
@@ -649,7 +649,7 @@
                     }, PhoenixContextExecutor.inContext(),
                         Tracing.withTracing(connection, this.toString()));
         } catch (Exception e) {
-            if(queryLogger.isAuditLoggingEnabled()) {
+            if (queryLogger.isAuditLoggingEnabled()) {
                 queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
                 queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
                 queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
@@ -1021,7 +1021,7 @@
         @SuppressWarnings("unchecked")
         @Override
         public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
-            if(!getUdfParseNodes().isEmpty()) {
+            if (!getUdfParseNodes().isEmpty()) {
                 stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
             }
 		    DeleteCompiler compiler = new DeleteCompiler(stmt, this.getOperation());
@@ -1989,26 +1989,41 @@
 
     /**
      * Execute the current batch of statements. If any exception occurs
-     * during execution, a org.apache.phoenix.exception.BatchUpdateException
-     * is thrown which includes the index of the statement within the
-     * batch when the exception occurred.
+     * during execution, a {@link java.sql.BatchUpdateException}
+     * is thrown which compposes the update counts for statements executed so
+     * far.
      */
     @Override
     public int[] executeBatch() throws SQLException {
         int i = 0;
+        int[] returnCodes = new int [batch.size()];
+        Arrays.fill(returnCodes, -1);
+        boolean autoCommit = connection.getAutoCommit();
+        connection.setAutoCommit(false);
         try {
-            int[] returnCodes = new int [batch.size()];
             for (i = 0; i < returnCodes.length; i++) {
                 PhoenixPreparedStatement statement = batch.get(i);
-                returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
+                statement.executeForBatch();
+                returnCodes[i] = statement.getUpdateCount();
             }
             // Flush all changes in batch if auto flush is true
             flushIfNecessary();
             // If we make it all the way through, clear the batch
             clearBatch();
+            if (autoCommit) {
+                connection.commit();
+            }
             return returnCodes;
-        } catch (Throwable t) {
-            throw new BatchUpdateExecution(t,i);
+        } catch (SQLException t) {
+            if (i == returnCodes.length) {
+                // Exception after for loop, perhaps in commit(), discard returnCodes.
+                throw new BatchUpdateException(t);
+            } else {
+                returnCodes[i] = Statement.EXECUTE_FAILED;
+                throw new BatchUpdateException(returnCodes, t);
+            }
+        } finally {
+            connection.setAutoCommit(autoCommit);
         }
     }
 
@@ -2091,7 +2106,7 @@
         TableName tableName = null;
         if (stmt instanceof ExecutableSelectStatement) {
             TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
-            if(from instanceof NamedTableNode) {
+            if (from instanceof NamedTableNode) {
                 tableName = ((NamedTableNode)from).getName();
             }
         } else if (stmt instanceof ExecutableUpsertStatement) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
index 56a524c..616e3a0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
@@ -85,97 +85,4 @@
             assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
         }
     }
-    
-    @Test
-    /**
-     * Validates that if a user sets the query timeout via the
-     * stmt.setQueryTimeout() JDBC method, we correctly store the timeout
-     * in both milliseconds and seconds.
-     */
-    public void testSettingQueryTimeoutViaJdbc() throws Exception {
-        // Arrange
-        Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        
-        // Act
-        stmt.setQueryTimeout(3);
-    
-        // Assert
-        assertEquals(3, stmt.getQueryTimeout());
-        assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    /**
-     * Validates if a user sets the timeout to zero that we store the timeout
-     * in millis as the Integer.MAX_VALUE. 
-     */
-    public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
-        // Arrange
-        Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        
-        // Act
-        stmt.setQueryTimeout(0);
-    
-        // Assert
-        assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
-        assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    /**
-     * Validates that is negative value is supplied we set the timeout to the default.
-     */
-    public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
-        // Arrange
-        Connection connection = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-        PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
-        int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, 
-            QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
-        
-        // Act
-        stmt.setQueryTimeout(-1);
-    
-        // Assert
-        assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
-        assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    /**
-     * Validates that setting custom phoenix query timeout using
-     * the phoenix.query.timeoutMs config property is honored.
-     */
-    public void testCustomQueryTimeout() throws Exception {
-        // Arrange
-        Properties connectionProperties = new Properties();
-        connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
-        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-    
-        // Assert
-        assertEquals(3, stmt.getQueryTimeout());
-        assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
-    }
-    
-    @Test
-    public void testZeroCustomQueryTimeout() throws Exception {
-        // Arrange
-        Properties connectionProperties = new Properties();
-        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
-        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
-        PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
-        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-    
-        // Assert
-        assertEquals(0, stmt.getQueryTimeout());
-        assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
-    }
-
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
new file mode 100644
index 0000000..2af97b7
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+public class PhoenixStatementTest extends BaseConnectionlessQueryTest {
+
+    @Test
+    public void testMutationUsingExecuteQueryShouldFail() throws Exception {
+        Properties connectionProperties = new Properties();
+        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+        Statement stmt = connection.createStatement();
+        try {
+            stmt.executeQuery("DELETE FROM " + ATABLE);
+            fail();
+        } catch(SQLException e) {
+            assertEquals(SQLExceptionCode.EXECUTE_QUERY_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testQueriesUsingExecuteUpdateShouldFail() throws Exception {
+        Properties connectionProperties = new Properties();
+        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+        Statement stmt = connection.createStatement();
+        try {
+            stmt.executeUpdate("SELECT * FROM " + ATABLE);
+            fail();
+        } catch(SQLException e) {
+            assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
+        }
+    }
+
+    @Test
+    /**
+     * Validates that if a user sets the query timeout via the
+     * stmt.setQueryTimeout() JDBC method, we correctly store the timeout
+     * in both milliseconds and seconds.
+     */
+    public void testSettingQueryTimeoutViaJdbc() throws Exception {
+        // Arrange
+        Connection connection = DriverManager.getConnection(getUrl());
+        Statement stmt = connection.createStatement();
+        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+        // Act
+        stmt.setQueryTimeout(3);
+
+        // Assert
+        assertEquals(3, stmt.getQueryTimeout());
+        assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
+    }
+
+    @Test
+    /**
+     * Validates if a user sets the timeout to zero that we store the timeout
+     * in millis as the Integer.MAX_VALUE.
+     */
+    public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
+        // Arrange
+        Connection connection = DriverManager.getConnection(getUrl());
+        Statement stmt = connection.createStatement();
+        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+        // Act
+        stmt.setQueryTimeout(0);
+
+        // Assert
+        assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
+        assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
+    }
+
+    @Test
+    /**
+     * Validates that is negative value is supplied we set the timeout to the default.
+     */
+    public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
+        // Arrange
+        Connection connection = DriverManager.getConnection(getUrl());
+        Statement stmt = connection.createStatement();
+        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+        PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
+        int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
+            QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+
+        // Act
+        stmt.setQueryTimeout(-1);
+
+        // Assert
+        assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
+        assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
+    }
+
+    @Test
+    /**
+     * Validates that setting custom phoenix query timeout using
+     * the phoenix.query.timeoutMs config property is honored.
+     */
+    public void testCustomQueryTimeout() throws Exception {
+        // Arrange
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
+        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+        Statement stmt = connection.createStatement();
+        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+        // Assert
+        assertEquals(3, stmt.getQueryTimeout());
+        assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
+    }
+
+    @Test
+    public void testZeroCustomQueryTimeout() throws Exception {
+        // Arrange
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+        Statement stmt = connection.createStatement();
+        PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
+
+        // Assert
+        assertEquals(0, stmt.getQueryTimeout());
+        assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
+    }
+
+    @Test
+    public void testExecuteBatchWithFailedStatement() throws Exception {
+        // Arrange
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+        Statement stmt = connection.createStatement();
+        PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class));
+        Whitebox.setInternalState(stmt, "connection", connSpy);
+        List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+                mock(PhoenixPreparedStatement.class),
+                mock(PhoenixPreparedStatement.class),
+                mock(PhoenixPreparedStatement.class));
+        Whitebox.setInternalState(stmt, "batch", batch);
+        final String exMsg = "TEST";
+        when(batch.get(0).getUpdateCount()).thenReturn(1);
+        doThrow(new SQLException(exMsg)).when(batch.get(1)).executeForBatch();
+        // However, we don't expect this to be called.
+        when(batch.get(1).getUpdateCount()).thenReturn(1);
+
+        // Act & Assert
+        BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+        assertEquals(exMsg, ex.getCause().getMessage());
+        int[] updateCounts = ex.getUpdateCounts();
+        assertEquals(3, updateCounts.length);
+        assertEquals(1, updateCounts[0]);
+        assertEquals(Statement.EXECUTE_FAILED, updateCounts[1]);
+        assertEquals(-1, updateCounts[2]);
+        verify(connSpy, never()).commit(); // Ensure commit was never called.
+    }
+
+    @Test
+    public void testExecuteBatchWithCommitFailure() throws Exception {
+        // Arrange
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+        Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+        Statement stmt = connection.createStatement();
+        PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class));
+        Whitebox.setInternalState(stmt, "connection", connSpy);
+        List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+                mock(PhoenixPreparedStatement.class));
+        Whitebox.setInternalState(stmt, "batch", batch);
+        final String exMsg = "TEST";
+        doThrow(new SQLException(exMsg)).when(connSpy).commit();
+        when(connSpy.getAutoCommit()).thenReturn(true);
+
+        // Act & Assert
+        BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+        assertEquals(exMsg, ex.getCause().getMessage());
+        assertNull(ex.getUpdateCounts());
+    }
+
+}