Merge branch '1.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 1.x-branch
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index a223c41..f7cd046 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -19,6 +19,7 @@
 package org.apache.storm.hive.bolt;
 
 import org.apache.storm.Config;
+import org.apache.storm.hive.common.HiveWriter;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.topology.TopologyBuilder;
@@ -45,7 +46,6 @@
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
@@ -306,6 +306,23 @@
         bolt.cleanup();
     }
 
+    private static class FlushFailureHiveBolt extends HiveBolt {
+
+        public FlushFailureHiveBolt(HiveOptions options) {
+            super(options);
+        }
+
+        @Override
+        void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure,
+                InterruptedException {
+            if (rollToNext) {
+                throw new InterruptedException();
+            } else {
+                super.flushAllWriters(false);
+            }
+        }
+    }
+
     @Test
     public void testNoAcksIfFlushFails() throws Exception
     {
@@ -316,24 +333,20 @@
                 .withTxnsPerBatch(2)
                 .withBatchSize(2);
 
-        HiveBolt spyBolt = Mockito.spy(new HiveBolt(hiveOptions));
+        HiveBolt failingBolt = new FlushFailureHiveBolt(hiveOptions);
 
-        //This forces a failure of all the flush attempts
-        doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
-
-
-        spyBolt.prepare(config, null, new OutputCollector(collector));
+        failingBolt.prepare(config, null, new OutputCollector(collector));
 
         Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
         Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
 
-        spyBolt.execute(tuple1);
-        spyBolt.execute(tuple2);
+        failingBolt.execute(tuple1);
+        failingBolt.execute(tuple2);
 
         verify(collector, never()).ack(tuple1);
         verify(collector, never()).ack(tuple2);
 
-        spyBolt.cleanup();
+        failingBolt.cleanup();
     }
 
     @Test