Merge branch '1.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 1.x-branch
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index a223c41..f7cd046 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -19,6 +19,7 @@
package org.apache.storm.hive.bolt;
import org.apache.storm.Config;
+import org.apache.storm.hive.common.HiveWriter;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.TopologyBuilder;
@@ -45,7 +46,6 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
@@ -306,6 +306,23 @@
bolt.cleanup();
}
+ private static class FlushFailureHiveBolt extends HiveBolt {
+
+ public FlushFailureHiveBolt(HiveOptions options) {
+ super(options);
+ }
+
+ @Override
+ void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure,
+ InterruptedException {
+ if (rollToNext) {
+ throw new InterruptedException();
+ } else {
+ super.flushAllWriters(false);
+ }
+ }
+ }
+
@Test
public void testNoAcksIfFlushFails() throws Exception
{
@@ -316,24 +333,20 @@
.withTxnsPerBatch(2)
.withBatchSize(2);
- HiveBolt spyBolt = Mockito.spy(new HiveBolt(hiveOptions));
+ HiveBolt failingBolt = new FlushFailureHiveBolt(hiveOptions);
- //This forces a failure of all the flush attempts
- doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true);
-
-
- spyBolt.prepare(config, null, new OutputCollector(collector));
+ failingBolt.prepare(config, null, new OutputCollector(collector));
Tuple tuple1 = generateTestTuple(1,"SJC","Sunnyvale","CA");
Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
- spyBolt.execute(tuple1);
- spyBolt.execute(tuple2);
+ failingBolt.execute(tuple1);
+ failingBolt.execute(tuple2);
verify(collector, never()).ack(tuple1);
verify(collector, never()).ack(tuple2);
- spyBolt.cleanup();
+ failingBolt.cleanup();
}
@Test