PHOENIX-5195 PHERF:- Handle batch failure in connection.commit() in WriteWorkload#upsertData
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index 4023383..c482b3f 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -24,7 +24,6 @@
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.sql.Timestamp;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -35,7 +34,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
import org.apache.phoenix.pherf.configuration.Column;
@@ -294,26 +292,32 @@
rowsCreated += result;
}
}
- connection.commit();
- duration = System.currentTimeMillis() - last;
- logger.info("Writer (" + Thread.currentThread().getName()
- + ") committed Batch. Total " + getBatchSize()
- + " rows for this thread (" + this.hashCode() + ") in ("
- + duration + ") Ms");
+ try {
+ connection.commit();
+ duration = System.currentTimeMillis() - last;
+ logger.info("Writer (" + Thread.currentThread().getName()
+ + ") committed Batch. Total " + getBatchSize()
+ + " rows for this thread (" + this.hashCode() + ") in ("
+ + duration + ") Ms");
- if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
- dataLoadThreadTime
- .add(tableName, Thread.currentThread().getName(), i,
- System.currentTimeMillis() - logStartTime);
- logStartTime = System.currentTimeMillis();
+ if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) {
+ dataLoadThreadTime.add(tableName,
+ Thread.currentThread().getName(), i,
+ System.currentTimeMillis() - logStartTime);
+ }
+ } catch (SQLException e) {
+ logger.warn("SQLException in commit operation", e);
}
+ logStartTime = System.currentTimeMillis();
// Pause for throttling if configured to do so
Thread.sleep(threadSleepDuration);
// Re-compute the start time for the next batch
last = System.currentTimeMillis();
}
}
+ } catch (SQLException e) {
+ throw e;
} finally {
// Need to keep the statement open to send the remaining batch of updates
if (!useBatchApi && stmt != null) {